You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/16 16:39:10 UTC

[incubator-doris] branch dev-1.0.1 updated (162b8492e1 -> c41618a0b6)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a change to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


    from 162b8492e1 [Bug] (fix) DeltaWriter::mem_consumption() coredump (#9245)
     new 0d52bad20a [refactor] some code cleanup for min/max function. (#8874)
     new 3684be8a52 [fix](function) fix last_value get wrong result when have order by clause (#9247)
     new 75fbc9d8ad [fix](broker-load) can't load parquet file with column name case sensitive with Doris column (#9358)
     new e0ed006785 [fix]Stream load 307 temporary redirection authentication information is lost (#9363)
     new 7c20a8722f [fix][compaction]  Rowset::end_version null pointer(#9379)
     new 343f97f028 [performance][query]improve the performance of DISTINCT aggregation by using flat hash set replace unordered set (#9401)
     new 0c73dc85a6 [Bug] Missing error tablet list when close_wait return error (#9418)
     new e5c46b78d2 [fix](binlog-load) binlog load fails because txn exceeds the default value (#9471)
     new 6116616284 [refactor](backend) Refactor the logic of selecting Backend in FE. (#9478)
     new 59daed7591 [fix](http) Hardening Recommendations Disable TRACE/TRAC methods (#9479)
     new 71beda2664 [fix](Function) fix case when function return null with abs function (#9493)
     new 72e80756e0 [Bug] (load) Broker load kerberos auth fail (#9494)
     new ea5865c3ff [fix](storage) fix core for string predicate in storage layer (#9500)
     new ff4f4a3f1b [bugfix](load) fix coredump in ordinal index flush (#9518)
     new 7f12cfe240 [fix](lateral-view) Error view includes lateral view (#9530)
     new 53fb90f04f [enhancement][betarowset]optimize lz4 compress and decompress speed by reusing context (#9566)
     new c41618a0b6 [fix](planner)VecNotImplException thrown when query need rewrite and some slot cannot changed to nullable (#9589)

The 17 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exprs/aggregate_functions.cpp               |  45 +---
 be/src/exprs/case_expr.cpp                         |   1 +
 be/src/exprs/case_expr.h                           |   1 +
 be/src/olap/compaction.cpp                         |   7 +-
 be/src/olap/delta_writer.cpp                       |  12 +-
 be/src/olap/delta_writer.h                         |   4 +-
 be/src/olap/rowset/segment_v2/binary_dict_page.cpp |   6 +-
 be/src/olap/rowset/segment_v2/bitshuffle_page.h    |   2 +-
 be/src/olap/rowset/segment_v2/column_reader.cpp    |   4 +-
 be/src/olap/rowset/segment_v2/column_reader.h      |   2 +-
 be/src/olap/rowset/segment_v2/column_writer.cpp    |   8 +-
 be/src/olap/rowset/segment_v2/column_writer.h      |   2 +-
 .../rowset/segment_v2/indexed_column_reader.cpp    |   4 +-
 .../olap/rowset/segment_v2/indexed_column_reader.h |   2 +-
 .../rowset/segment_v2/indexed_column_writer.cpp    |   6 +-
 .../olap/rowset/segment_v2/indexed_column_writer.h |   2 +-
 be/src/olap/rowset/segment_v2/page_builder.h       |   4 +-
 be/src/runtime/load_channel.cpp                    |   3 +-
 be/src/runtime/mem_pool.cpp                        |  11 +
 be/src/runtime/mem_pool.h                          |   5 +
 be/src/runtime/tablets_channel.cpp                 |   7 +-
 be/src/runtime/tablets_channel.h                   |   5 +-
 be/src/util/block_compression.cpp                  |  93 ++++---
 be/src/util/block_compression.h                    |   4 +-
 .../aggregate_function_window.h                    |   6 +-
 be/src/vec/columns/predicate_column.h              |  24 +-
 be/test/util/block_compression_test.cpp            |  12 +-
 be/test/vec/function/function_bitmap_test.cpp      | 135 +++++------
 .../org/apache/doris/analysis/AnalyticExpr.java    |   5 +-
 .../java/org/apache/doris/analysis/Analyzer.java   |  35 ---
 .../org/apache/doris/analysis/InlineViewRef.java   |  21 +-
 .../java/org/apache/doris/analysis/QueryStmt.java  |   3 +-
 .../java/org/apache/doris/analysis/TableRef.java   |  28 ++-
 .../java/org/apache/doris/backup/RestoreJob.java   |   9 +-
 .../java/org/apache/doris/catalog/Catalog.java     |   9 +-
 .../java/org/apache/doris/catalog/OlapTable.java   |   3 +-
 .../main/java/org/apache/doris/common/Config.java  |  10 +-
 .../apache/doris/common/util/VectorizedUtil.java   |  29 ++-
 .../httpv2/interceptor/ServletTraceIterceptor.java |  61 +++++
 .../org/apache/doris/httpv2/rest/LoadAction.java   |  53 ++--
 .../doris/httpv2/rest/RestBaseController.java      |  18 +-
 .../apache/doris/httpv2/util/LoadSubmitter.java    |  19 +-
 .../src/main/java/org/apache/doris/load/Load.java  |   8 +-
 .../doris/load/sync/canal/CanalSyncChannel.java    |  94 +++++---
 .../org/apache/doris/planner/BrokerScanNode.java   |   4 +-
 .../apache/doris/planner/StreamLoadScanNode.java   |   7 +-
 .../apache/doris/qe/InsertStreamTxnExecutor.java   |  14 +-
 .../java/org/apache/doris/qe/MultiLoadMgr.java     |  14 +-
 .../org/apache/doris/system/BeSelectionPolicy.java | 131 ++++++++++
 .../org/apache/doris/system/SystemInfoService.java | 251 +++++--------------
 .../doris/transaction/DatabaseTransactionMgr.java  |   2 +-
 .../org/apache/doris/backup/RestoreJobTest.java    | 132 +---------
 .../org/apache/doris/catalog/CreateTableTest.java  |   6 +-
 .../apache/doris/catalog/ModifyBackendTest.java    |   6 +-
 .../doris/load/sync/canal/CanalSyncDataTest.java   |   7 +-
 .../java/org/apache/doris/qe/MultiLoadMgrTest.java |  13 +-
 .../apache/doris/system/SystemInfoServiceTest.java | 268 +++++++++++++++++++++
 .../doris/broker/hdfs/FileSystemManager.java       |   6 +-
 ...test_select_constant.out => test_case_when.out} |   5 +-
 ...variance_agg.out => test_last_value_window.out} |   9 +-
 .../suites/correctness/test_case_when.groovy       |  67 ++++++
 .../correctness/test_last_value_window.groovy      |  51 ++++
 62 files changed, 1071 insertions(+), 744 deletions(-)
 create mode 100644 fe/fe-core/src/main/java/org/apache/doris/httpv2/interceptor/ServletTraceIterceptor.java
 create mode 100644 fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
 create mode 100644 fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
 copy regression-test/data/correctness/{test_select_constant.out => test_case_when.out} (72%)
 copy regression-test/data/correctness/{test_select_variance_agg.out => test_last_value_window.out} (57%)
 create mode 100644 regression-test/suites/correctness/test_case_when.groovy
 create mode 100644 regression-test/suites/correctness/test_last_value_window.groovy


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 14/17: [bugfix](load) fix coredump in ordinal index flush (#9518)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit ff4f4a3f1bfccd03d15903146bcac69626ffc6c5
Author: yixiutt <10...@users.noreply.github.com>
AuthorDate: Thu May 12 21:10:49 2022 +0800

    [bugfix](load) fix coredump in ordinal index flush (#9518)
    
    commit #9123 introduce the bug. bitshuffle page return error when
    page is full, so scalar column write cannot switch to next page, which make
    ordinal index is null when flush.
    
    All page builder should return ok when page full, and column writer procedure
    shoud be append_data, check is_page_full, switch to next page
    
    Co-authored-by: yixiutt <yi...@selectdb.com>
---
 be/src/olap/rowset/segment_v2/binary_dict_page.cpp | 6 +++---
 be/src/olap/rowset/segment_v2/bitshuffle_page.h    | 2 +-
 be/src/olap/rowset/segment_v2/page_builder.h       | 4 +++-
 3 files changed, 7 insertions(+), 5 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp
index 9cee35f997..7a62f74c6a 100644
--- a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp
+++ b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp
@@ -76,13 +76,13 @@ Status BinaryDictPageBuilder::add(const uint8_t* vals, size_t* count) {
         }
 
         for (int i = 0; i < *count; ++i, ++src) {
+            if (is_page_full()) {
+                break;
+            }
             auto iter = _dictionary.find(*src);
             if (iter != _dictionary.end()) {
                 value_code = iter->second;
             } else {
-                if (_dict_builder->is_page_full()) {
-                    break;
-                }
                 Slice dict_item(src->data, src->size);
                 if (src->size > 0) {
                     char* item_mem = (char*)_pool.allocate(src->size);
diff --git a/be/src/olap/rowset/segment_v2/bitshuffle_page.h b/be/src/olap/rowset/segment_v2/bitshuffle_page.h
index 9bada3f764..bfd85217f5 100644
--- a/be/src/olap/rowset/segment_v2/bitshuffle_page.h
+++ b/be/src/olap/rowset/segment_v2/bitshuffle_page.h
@@ -105,7 +105,7 @@ public:
         DCHECK(!_finished);
         if (_remain_element_capacity <= 0) {
             *count = 0;
-            return Status::RuntimeError("page is full.");
+            return Status::OK();
         }
         int to_add = std::min<int>(_remain_element_capacity, *count);
         int to_add_size = to_add * SIZE_OF_TYPE;
diff --git a/be/src/olap/rowset/segment_v2/page_builder.h b/be/src/olap/rowset/segment_v2/page_builder.h
index 26eec4b430..ab74ad7fca 100644
--- a/be/src/olap/rowset/segment_v2/page_builder.h
+++ b/be/src/olap/rowset/segment_v2/page_builder.h
@@ -49,7 +49,9 @@ public:
     // Add a sequence of values to the page.
     // The number of values actually added will be returned through count, which may be less
     // than requested if the page is full.
-    //
+
+    // check page if full before truly add, return ok when page is full so that column write
+    // will switch to next page
     // vals size should be decided according to the page build type
     // TODO make sure vals is naturally-aligned to its type so that impls can use aligned load
     // instead of memcpy to copy values.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 01/17: [refactor] some code cleanup for min/max function. (#8874)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 0d52bad20a6be9a5252c1cbaf7fed280100641f9
Author: zhannngchen <48...@users.noreply.github.com>
AuthorDate: Sat May 7 20:39:44 2022 +0800

    [refactor] some code cleanup for min/max function. (#8874)
---
 be/src/exprs/aggregate_functions.cpp               | 36 +++-------------------
 .../main/java/org/apache/doris/common/Config.java  | 10 +++---
 2 files changed, 9 insertions(+), 37 deletions(-)

diff --git a/be/src/exprs/aggregate_functions.cpp b/be/src/exprs/aggregate_functions.cpp
index 506a87a821..700851aad0 100644
--- a/be/src/exprs/aggregate_functions.cpp
+++ b/be/src/exprs/aggregate_functions.cpp
@@ -655,22 +655,6 @@ void AggregateFunctions::min(FunctionContext*, const DecimalV2Val& src, DecimalV
     }
 }
 
-template <>
-void AggregateFunctions::min(FunctionContext*, const LargeIntVal& src, LargeIntVal* dst) {
-    if (src.is_null) {
-        return;
-    }
-
-    if (dst->is_null) {
-        *dst = src;
-        return;
-    }
-
-    if (src.val < dst->val) {
-        dst->val = src.val;
-    }
-}
-
 template <>
 void AggregateFunctions::max(FunctionContext*, const DecimalV2Val& src, DecimalV2Val* dst) {
     if (src.is_null) {
@@ -689,22 +673,6 @@ void AggregateFunctions::max(FunctionContext*, const DecimalV2Val& src, DecimalV
     }
 }
 
-template <>
-void AggregateFunctions::max(FunctionContext*, const LargeIntVal& src, LargeIntVal* dst) {
-    if (src.is_null) {
-        return;
-    }
-
-    if (dst->is_null) {
-        *dst = src;
-        return;
-    }
-
-    if (src.val > dst->val) {
-        dst->val = src.val;
-    }
-}
-
 void AggregateFunctions::init_null_string(FunctionContext* c, StringVal* dst) {
     dst->is_null = true;
     dst->ptr = nullptr;
@@ -2676,6 +2644,8 @@ template void AggregateFunctions::min<SmallIntVal>(FunctionContext*, const Small
 template void AggregateFunctions::min<IntVal>(FunctionContext*, const IntVal& src, IntVal* dst);
 template void AggregateFunctions::min<BigIntVal>(FunctionContext*, const BigIntVal& src,
                                                  BigIntVal* dst);
+template void AggregateFunctions::min<LargeIntVal>(FunctionContext*, const LargeIntVal& src,
+                                                   LargeIntVal* dst);
 template void AggregateFunctions::min<FloatVal>(FunctionContext*, const FloatVal& src,
                                                 FloatVal* dst);
 template void AggregateFunctions::min<DoubleVal>(FunctionContext*, const DoubleVal& src,
@@ -2724,6 +2694,8 @@ template void AggregateFunctions::max<SmallIntVal>(FunctionContext*, const Small
 template void AggregateFunctions::max<IntVal>(FunctionContext*, const IntVal& src, IntVal* dst);
 template void AggregateFunctions::max<BigIntVal>(FunctionContext*, const BigIntVal& src,
                                                  BigIntVal* dst);
+template void AggregateFunctions::max<LargeIntVal>(FunctionContext*, const LargeIntVal& src,
+                                                   LargeIntVal* dst);
 template void AggregateFunctions::max<FloatVal>(FunctionContext*, const FloatVal& src,
                                                 FloatVal* dst);
 template void AggregateFunctions::max<DoubleVal>(FunctionContext*, const DoubleVal& src,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index b70ec7c5a9..82552ff2b3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -335,16 +335,16 @@ public class Config extends ConfigBase {
      * and one thread can handle the read and write of many sockets, so the number of thread pools is small.
      *
      * For most projects, only 1-2 acceptors threads are needed, and 2 to 4 selectors threads are sufficient.
-     * Workers are obstructive business logic, often have more database operations, and require a large number of threads. T
-     * he specific number depends on the proportion of QPS and IO events of the application. The higher the QPS,
-     * the more threads are required, the higher the proportion of IO,
-     * the more threads waiting, and the more total threads required.
+     * Workers are obstructive business logic, often have more database operations, and require a large number of
+     * threads. The specific number depends on the proportion of QPS and IO events of the application. The higher the
+     * QPS, the more threads are required, the higher the proportion of IO, the more threads waiting, and the more
+     * total threads required.
      */
     @ConfField public static int jetty_server_acceptors = 2;
     @ConfField public static int jetty_server_selectors = 4;
     @ConfField public static int jetty_server_workers = 0;
     /**
-     * jetty Maximum number of bytes in put or post method,default:100MB
+     * Jetty maximum number of bytes in put or post method,default:100MB
      */
     @ConfField public static int jetty_server_max_http_post_size = 100 * 1024 * 1024;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 05/17: [fix][compaction] Rowset::end_version null pointer(#9379)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 7c20a8722f9d268284a9bfa4f8a979fd4f99164b
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Fri May 6 14:40:08 2022 +0800

    [fix][compaction]  Rowset::end_version null pointer(#9379)
---
 be/src/olap/compaction.cpp | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index f0dc268570..35fce9be5c 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -134,7 +134,12 @@ OLAPStatus Compaction::do_compaction_impl(int64_t permits) {
     int64_t current_max_version;
     {
         ReadLock rdlock(_tablet->get_header_lock());
-        current_max_version = _tablet->rowset_with_max_version()->end_version();
+        RowsetSharedPtr max_rowset = _tablet->rowset_with_max_version();
+        if (max_rowset == nullptr) {
+            current_max_version = -1;
+        } else {
+            current_max_version = _tablet->rowset_with_max_version()->end_version();
+        }
     }
 
     LOG(INFO) << "succeed to do " << compaction_name() << ". tablet=" << _tablet->full_name()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 15/17: [fix](lateral-view) Error view includes lateral view (#9530)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 7f12cfe240cc6bed73ca46b93c106f09c6a48917
Author: EmmyMiao87 <52...@qq.com>
AuthorDate: Sat May 14 09:57:08 2022 +0800

    [fix](lateral-view) Error view includes lateral view (#9530)
    
    Fixed #9529
    
    When the lateral view based on a inline view which belongs to a view,
    Doris could not resolve the column of lateral view in query.
    When a query uses a view, it mainly refers to the string representation of the view.
    That is, if the view's string representation is wrong, the view is wrong.
    The string representation of the inline view lacks the handling of the lateral view.
    This leads to query errors when using such views.
    This PR mainly fixes the string representation of inline views.
---
 .../org/apache/doris/analysis/InlineViewRef.java   | 21 +++++++---------
 .../java/org/apache/doris/analysis/QueryStmt.java  |  3 +--
 .../java/org/apache/doris/analysis/TableRef.java   | 28 ++++++++++++----------
 3 files changed, 25 insertions(+), 27 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InlineViewRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/InlineViewRef.java
index 8df63b4492..a8ae56b61c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InlineViewRef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InlineViewRef.java
@@ -437,26 +437,23 @@ public class InlineViewRef extends TableRef {
     }
 
     @Override
-    public String tableRefToSql() {
+    public String tableNameToSql() {
         // Enclose the alias in quotes if Hive cannot parse it without quotes.
         // This is needed for view compatibility between Impala and Hive.
-        String aliasSql = null;
-        String alias = getExplicitAlias();
-        if (alias != null) {
-            aliasSql = ToSqlUtils.getIdentSql(alias);
-        }
-
         if (view != null) {
             // FIXME: this may result in a sql cache problem
             // See pr #6736 and issue #6735
-            return name.toSql() + (aliasSql == null ? "" : " " + aliasSql);
+            return super.tableNameToSql();
         }
 
+        String aliasSql = null;
+        String alias = getExplicitAlias();
+        if (alias != null) {
+            aliasSql = ToSqlUtils.getIdentSql(alias);
+        }
         StringBuilder sb = new StringBuilder();
-        sb.append("(")
-                .append(queryStmt.toSql())
-                .append(") ")
-                .append(aliasSql);
+        sb.append("(").append(queryStmt.toSql()).append(") ")
+            .append(aliasSql);
 
         return sb.toString();
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/QueryStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/QueryStmt.java
index 4888ab7566..1617ec4aeb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/QueryStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/QueryStmt.java
@@ -203,8 +203,7 @@ public abstract class QueryStmt extends StatementBase {
      * (3) a mix of correlated table refs and table refs rooted at those refs
      *     (the statement is 'self-contained' with respect to correlation)
      */
-    public List<TupleId> getCorrelatedTupleIds(Analyzer analyzer)
-            throws AnalysisException {
+    public List<TupleId> getCorrelatedTupleIds(Analyzer analyzer) throws AnalysisException {
         // Correlated tuple ids of this stmt.
         List<TupleId> correlatedTupleIds = Lists.newArrayList();
         // First correlated and absolute table refs. Used for error detection/reporting.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
index a911d27227..d78aac9c46 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
@@ -624,20 +624,12 @@ public class TableRef implements ParseNode, Writable {
     /**
      * Return the table ref presentation to be used in the toSql string
      */
+    // tbl1
+    // tbl1 alias_tbl1
+    // tbl1 alias_tbl1 lateral view explode_split(k1, ",") tmp1 as e1
+    // (select xxx from xxx) t1 alias_tbl1 xxx
     public String tableRefToSql() {
-        String aliasSql = null;
-        String alias = getExplicitAlias();
-        if (alias != null) aliasSql = ToSqlUtils.getIdentSql(alias);
-
-        // TODO(zc):
-        // List<String> path = rawPath_;
-        // if (resolvedPath_ != null) path = resolvedPath_.getFullyQualifiedRawPath();
-        // return ToSqlUtils.getPathSql(path) + ((aliasSql != null) ? " " + aliasSql : "");
-
-        // tbl1
-        // tbl1 alias_tbl1
-        // tbl1 alias_tbl1 lateral view explode_split(k1, ",") tmp1 as e1
-        String tblName = name.toSql() + ((aliasSql != null) ? " " + aliasSql : "");
+        String tblName = tableNameToSql();
         if (lateralViewRefs != null) {
             for (LateralViewRef viewRef : lateralViewRefs) {
                 tblName += " " + viewRef.toSql();
@@ -646,6 +638,16 @@ public class TableRef implements ParseNode, Writable {
         return tblName;
     }
 
+    protected String tableNameToSql() {
+        String aliasSql = null;
+        String alias = getExplicitAlias();
+        if (alias != null) {
+            aliasSql = ToSqlUtils.getIdentSql(alias);
+        }
+        String tblName = name.toSql() + ((aliasSql != null) ? " " + aliasSql : "");
+        return tblName;
+    }
+
     @Override
     public String toSql() {
         if (joinOp == null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 03/17: [fix](broker-load) can't load parquet file with column name case sensitive with Doris column (#9358)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 75fbc9d8adbe18232c550cce21c0fef11f92fc6c
Author: deardeng <56...@qq.com>
AuthorDate: Wed May 11 22:27:03 2022 -0700

    [fix](broker-load) can't load parquet file with column name case sensitive with Doris column (#9358)
---
 fe/fe-core/src/main/java/org/apache/doris/load/Load.java          | 8 ++++++--
 .../src/main/java/org/apache/doris/planner/BrokerScanNode.java    | 4 ++--
 .../main/java/org/apache/doris/planner/StreamLoadScanNode.java    | 7 +++++--
 3 files changed, 13 insertions(+), 6 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
index 905f950737..ea381977e2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
@@ -1049,8 +1049,12 @@ public class Load {
         for (ImportColumnDesc importColumnDesc : copiedColumnExprs) {
             // make column name case match with real column name
             String columnName = importColumnDesc.getColumnName();
-            String realColName = tbl.getColumn(columnName) == null ? columnName
-                    : tbl.getColumn(columnName).getName();
+            String realColName;
+            if (tbl.getColumn(columnName) == null || importColumnDesc.getExpr() == null) {
+                realColName = columnName;
+            } else {
+                realColName = tbl.getColumn(columnName).getName();
+            }
             if (importColumnDesc.getExpr() != null) {
                 Expr expr = transformHadoopFunctionExpr(tbl, realColName, importColumnDesc.getExpr());
                 exprsByName.put(realColName, expr);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
index 499ef83728..a6f26c4920 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
@@ -240,8 +240,8 @@ public class BrokerScanNode extends LoadScanNode {
      */
     private void initColumns(ParamCreateContext context) throws UserException {
         context.srcTupleDescriptor = analyzer.getDescTbl().createTupleDescriptor();
-        context.slotDescByName = Maps.newHashMap();
-        context.exprMap = Maps.newHashMap();
+        context.slotDescByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+        context.exprMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
 
         // for load job, column exprs is got from file group
         // for query, there is no column exprs, they will be got from table's schema in "Load.initColumns"
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
index 200e232e2e..9f787ecf4b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
@@ -66,8 +66,11 @@ public class StreamLoadScanNode extends LoadScanNode {
     private TupleDescriptor srcTupleDesc;
     private TBrokerScanRange brokerScanRange;
 
-    private Map<String, SlotDescriptor> slotDescByName = Maps.newHashMap();
-    private Map<String, Expr> exprsByName = Maps.newHashMap();
+    // If use case sensitive map, for example,
+    // the column name 「A」 in the table and the mapping '(a) set (A = a)' in load sql,
+    // Slotdescbyname stores「a」, later will use 「a」to get table's 「A」 column info, will throw exception.
+    private final Map<String, SlotDescriptor> slotDescByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+    private final Map<String, Expr> exprsByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
 
     // used to construct for streaming loading
     public StreamLoadScanNode(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 04/17: [fix]Stream load 307 temporary redirection authentication information is lost (#9363)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit e0ed0067850500be8a8159265e0b797ce39d4db0
Author: jiafeng.zhang <zh...@gmail.com>
AuthorDate: Sat May 7 19:22:45 2022 +0800

    [fix]Stream load 307 temporary redirection authentication information is lost (#9363)
---
 .../apache/doris/httpv2/rest/RestBaseController.java   | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java
index e2763caa93..5a8c7aa1b7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java
@@ -75,11 +75,21 @@ public class RestBaseController extends BaseController {
         String userInfo = null;
         if (!Strings.isNullOrEmpty(request.getHeader("Authorization"))) {
             ActionAuthorizationInfo authInfo = getAuthorizationInfo(request);
-            // Fix username@cluster:passwod is modified to cluster: username:passwod causes authentication failure
-            // @see https://github.com/apache/incubator-doris/issues/8100
+            //username@cluster:password
+            //This is a Doris-specific parsing format in the parseAuthInfo of BaseController.
+            //This is to go directly to BE, but in fact,
+            //BE still needs to take this authentication information and send RPC
+            // to FE to parse the authentication information,
+            //so in the end, the format of this authentication information is parsed on the FE side.
+            //The normal format for fullUserName is actually default_cluster:username
+            //I don't know why the format username@default_cluster is used in parseAuthInfo.
+            //It is estimated that it is compatible with the standard format of username:password.
+            //So here we feel that we can assemble it completely by hand.
+            String clusterName = ConnectContext.get() == null ?
+                SystemInfoService.DEFAULT_CLUSTER : ConnectContext.get().getClusterName();
             userInfo = ClusterNamespace.getNameFromFullName(authInfo.fullUserName) +
-                    "@" + ClusterNamespace.getClusterNameFromFullName(authInfo.fullUserName) +
-                    ":" + authInfo.password;
+                "@" + clusterName  +
+                ":" + authInfo.password;
         }
         try {
             urlObj = new URI(urlStr);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 13/17: [fix](storage) fix core for string predicate in storage layer (#9500)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit ea5865c3ffc9f8ecfc66343d7aa20825cabb2200
Author: wangbo <wa...@apache.org>
AuthorDate: Thu May 12 15:41:39 2022 +0800

    [fix](storage) fix core for string predicate in storage layer (#9500)
    
    
    
    Co-authored-by: Wang Bo <wa...@meituan.com>
---
 be/src/vec/columns/predicate_column.h | 24 ++++++++++++++++++++++--
 1 file changed, 22 insertions(+), 2 deletions(-)

diff --git a/be/src/vec/columns/predicate_column.h b/be/src/vec/columns/predicate_column.h
index 7db73b9d0a..eec3f1def7 100644
--- a/be/src/vec/columns/predicate_column.h
+++ b/be/src/vec/columns/predicate_column.h
@@ -230,17 +230,35 @@ public:
     void insert_many_binary_data(char* data_array, uint32_t* len_array,
                                  uint32_t* start_offset_array, size_t num) override {
         if constexpr (std::is_same_v<T, StringValue>) {
+            if (_pool == nullptr) {
+                _pool.reset(new MemPool("PredicateStringColumn"));
+            }
+
+            size_t total_mem_size = 0;
+            for (size_t i = 0; i < num; i++) {
+                total_mem_size += len_array[i];
+            }
+
+            char* destination = (char*)_pool->allocate(total_mem_size);
             for (size_t i = 0; i < num; i++) {
                 uint32_t len = len_array[i];
                 uint32_t start_offset = start_offset_array[i];
-                insert_string_value(data_array + start_offset, len);
+                memcpy(destination, data_array + start_offset, len);
+                StringValue sv(destination, len);
+                data.push_back_without_reserve(sv);
+                destination += len;
             }
         }
     }
 
     void insert_default() override { data.push_back(T()); }
 
-    void clear() override { data.clear(); }
+    void clear() override {
+        data.clear();
+        if (_pool != nullptr) {
+            _pool->clear();
+        }
+    }
 
     size_t byte_size() const override { return data.size() * sizeof(T); }
 
@@ -410,6 +428,8 @@ public:
 
 private:
     Container data;
+    // manages the memory for slice's data(For string type)
+    std::unique_ptr<MemPool> _pool;
 };
 using ColumnStringValue = PredicateColumnType<StringValue>;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 11/17: [fix](Function) fix case when function return null with abs function (#9493)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 71beda2664f32efdc1b6c6fc495b92846f0f8138
Author: zhangstar333 <87...@users.noreply.github.com>
AuthorDate: Sat May 14 09:50:45 2022 +0800

    [fix](Function) fix case when function return null with abs function (#9493)
---
 be/src/exprs/case_expr.cpp                         |  1 +
 be/src/exprs/case_expr.h                           |  1 +
 .../data/correctness/test_case_when.out            |  5 ++
 .../suites/correctness/test_case_when.groovy       | 67 ++++++++++++++++++++++
 4 files changed, 74 insertions(+)

diff --git a/be/src/exprs/case_expr.cpp b/be/src/exprs/case_expr.cpp
index e1290fc7b0..4ab3475048 100644
--- a/be/src/exprs/case_expr.cpp
+++ b/be/src/exprs/case_expr.cpp
@@ -218,6 +218,7 @@ CASE_COMPUTE_FN_WRAPPER(TinyIntVal, tiny_int_val)
 CASE_COMPUTE_FN_WRAPPER(SmallIntVal, small_int_val)
 CASE_COMPUTE_FN_WRAPPER(IntVal, int_val)
 CASE_COMPUTE_FN_WRAPPER(BigIntVal, big_int_val)
+CASE_COMPUTE_FN_WRAPPER(LargeIntVal, large_int_val)
 CASE_COMPUTE_FN_WRAPPER(FloatVal, float_val)
 CASE_COMPUTE_FN_WRAPPER(DoubleVal, double_val)
 CASE_COMPUTE_FN_WRAPPER(StringVal, string_val)
diff --git a/be/src/exprs/case_expr.h b/be/src/exprs/case_expr.h
index 8bcbc48269..83d4d24508 100644
--- a/be/src/exprs/case_expr.h
+++ b/be/src/exprs/case_expr.h
@@ -36,6 +36,7 @@ public:
     virtual SmallIntVal get_small_int_val(ExprContext* ctx, TupleRow* row) override;
     virtual IntVal get_int_val(ExprContext* ctx, TupleRow* row) override;
     virtual BigIntVal get_big_int_val(ExprContext* ctx, TupleRow* row) override;
+    virtual LargeIntVal get_large_int_val(ExprContext* context, TupleRow*) override;
     virtual FloatVal get_float_val(ExprContext* ctx, TupleRow* row) override;
     virtual DoubleVal get_double_val(ExprContext* ctx, TupleRow* row) override;
     virtual StringVal get_string_val(ExprContext* ctx, TupleRow* row) override;
diff --git a/regression-test/data/correctness/test_case_when.out b/regression-test/data/correctness/test_case_when.out
new file mode 100644
index 0000000000..d4db120b76
--- /dev/null
+++ b/regression-test/data/correctness/test_case_when.out
@@ -0,0 +1,5 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !select_default --
+2	01	-1
+3	00	1
+
diff --git a/regression-test/suites/correctness/test_case_when.groovy b/regression-test/suites/correctness/test_case_when.groovy
new file mode 100644
index 0000000000..f5afe3026f
--- /dev/null
+++ b/regression-test/suites/correctness/test_case_when.groovy
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_case_when") {
+    def tableName = "dws_scan_qrcode_user_ts"
+
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName}
+        (
+            dt DATE NOT NULL ,
+            hour_time INT NOT NULL ,
+            merchant_id INT NOT NULL ,
+            channel_id char(5) NOT NULL ,
+            station_type char(5) NULL ,
+            station_name varchar(55) NULL ,
+            source char(5) NULL ,
+            passenger_flow BIGINT SUM DEFAULT '1' ,
+            user_id bitmap BITMAP_UNION ,
+            price BIGINT SUM ,
+            discount BIGINT SUM 
+        )
+        AGGREGATE KEY(dt,hour_time, merchant_id,channel_id,station_type,station_name,`source`)
+        DISTRIBUTED BY HASH(dt,hour_time,merchant_id,channel_id) BUCKETS 1
+        PROPERTIES("replication_num" = "1");
+    """
+
+    sql " INSERT INTO ${tableName} (`dt`, `hour_time`, `merchant_id`, `channel_id`, `station_type`, `station_name`, `source`, `passenger_flow`, `user_id`, `price`, `discount`) VALUES ('2019-01-01', 1, 45010002, '01', '00', 'xx站', '', 1, to_bitmap(0), 300, 300); "
+    sql " INSERT INTO ${tableName} (`dt`, `hour_time`, `merchant_id`, `channel_id`, `station_type`, `station_name`, `source`, `passenger_flow`, `user_id`, `price`, `discount`) VALUES ('2019-01-01', 1, 45010002, '01', '00', 'xxx站', '', 3, to_bitmap(0), 400, 400); "
+    sql " INSERT INTO ${tableName} (`dt`, `hour_time`, `merchant_id`, `channel_id`, `station_type`, `station_name`, `source`, `passenger_flow`, `user_id`, `price`, `discount`) VALUES ('2019-01-01', 2, 45010002, '00', '01', 'xx站', 'CHL', 1, to_bitmap(0), NULL, 23); "
+    sql " INSERT INTO ${tableName} (`dt`, `hour_time`, `merchant_id`, `channel_id`, `station_type`, `station_name`, `source`, `passenger_flow`, `user_id`, `price`, `discount`) VALUES ('2019-01-01', 3, 45010002, '00', '00', 'xx站', 'CHL', 1, to_bitmap(0), NULL, NULL); "
+    sql " INSERT INTO ${tableName} (`dt`, `hour_time`, `merchant_id`, `channel_id`, `station_type`, `station_name`, `source`, `passenger_flow`, `user_id`, `price`, `discount`) VALUES ('2019-01-01', 3, 45010002, '01', '00', 'xxxx站', '', 4, to_bitmap(0), 60, 60); "
+    sql " INSERT INTO ${tableName} (`dt`, `hour_time`, `merchant_id`, `channel_id`, `station_type`, `station_name`, `source`, `passenger_flow`, `user_id`, `price`, `discount`) VALUES ('2019-01-01', 3, 45010002, '01', '00', 'xxxx站', '', 2, to_bitmap(0), 200, 200); "
+    sql " INSERT INTO ${tableName} (`dt`, `hour_time`, `merchant_id`, `channel_id`, `station_type`, `station_name`, `source`, `passenger_flow`, `user_id`, `price`, `discount`) VALUES ('2019-01-01', 4, 45010002, '01', '00', 'xxxx站', '', 5, to_bitmap(0), 1000, 1000); "
+    sql " INSERT INTO ${tableName} (`dt`, `hour_time`, `merchant_id`, `channel_id`, `station_type`, `station_name`, `source`, `passenger_flow`, `user_id`, `price`, `discount`) VALUES ('2019-01-01', 4, 45010002, '01', '00', 'xxx站', '', 1, to_bitmap(0), 20, 20); "
+        
+
+    // not_vectorized
+    sql """ set enable_vectorized_engine = false """
+
+    qt_select_default """ 
+    select  hour_time as date_hour, station_type,
+            CASE WHEN station_type = '00' THEN sum(passenger_flow)
+            ELSE -ABS(sum(passenger_flow))
+            end passenger_flow
+            from ${tableName}
+            where dt = '2019-01-01'
+            and merchant_id in (45010002, 45010003)
+            and channel_id = '00'
+            group by hour_time, station_type; 
+    """
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 08/17: [fix](binlog-load) binlog load fails because txn exceeds the default value (#9471)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit e5c46b78d29717e5f125d3ac7e841b06b66f04e5
Author: jiafeng.zhang <zh...@gmail.com>
AuthorDate: Thu May 12 13:31:22 2022 +0800

    [fix](binlog-load) binlog load fails because txn exceeds the default value (#9471)
    
    binlog load Because txn exceeds the default value, resume is a failure,
    and a friendly prompt message is given to the user, instead of prompting success now,
    it still fails after a while, and the user will feel inexplicable
    Issue Number: close #9468
---
 .../doris/load/sync/canal/CanalSyncChannel.java    | 94 +++++++++++++---------
 .../doris/transaction/DatabaseTransactionMgr.java  |  2 +-
 2 files changed, 57 insertions(+), 39 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
index 2b71619dcf..5d0774b54a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.DuplicatedRequestException;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.UserException;
@@ -41,6 +42,7 @@ import org.apache.doris.thrift.TStreamLoadPutRequest;
 import org.apache.doris.thrift.TTxnParams;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.BeginTransactionException;
+import org.apache.doris.transaction.DatabaseTransactionMgr;
 import org.apache.doris.transaction.GlobalTransactionMgr;
 import org.apache.doris.transaction.TransactionEntry;
 import org.apache.doris.transaction.TransactionState;
@@ -121,53 +123,69 @@ public class CanalSyncChannel extends SyncChannel {
                     + "_batch" + batchId + "_" + currentTime;
             String targetColumn = Joiner.on(",").join(columns) + "," + DELETE_COLUMN;
             GlobalTransactionMgr globalTransactionMgr = Catalog.getCurrentGlobalTransactionMgr();
-            TransactionEntry txnEntry = txnExecutor.getTxnEntry();
-            TTxnParams txnConf = txnEntry.getTxnConf();
-            TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING;
-            TStreamLoadPutRequest request = null;
-            try {
-                long txnId = globalTransactionMgr.beginTransaction(db.getId(), Lists.newArrayList(tbl.getId()), label,
-                        new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond);
-                String authCodeUuid = Catalog.getCurrentGlobalTransactionMgr().getTransactionState(
+            DatabaseTransactionMgr databaseTransactionMgr = globalTransactionMgr.getDatabaseTransactionMgr(db.getId());
+            if (databaseTransactionMgr.getRunningTxnNums() < Config.max_running_txn_num_per_db) {
+                TransactionEntry txnEntry = txnExecutor.getTxnEntry();
+                TTxnParams txnConf = txnEntry.getTxnConf();
+                TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING;
+                TStreamLoadPutRequest request = null;
+                try {
+                    long txnId = globalTransactionMgr.beginTransaction(db.getId(),
+                        Lists.newArrayList(tbl.getId()), label,
+                        new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE,
+                            FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond);
+                    String authCodeUuid = Catalog.getCurrentGlobalTransactionMgr().getTransactionState(
                         db.getId(), txnId).getAuthCode();
-                request = new TStreamLoadPutRequest()
+                    request = new TStreamLoadPutRequest()
                         .setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl())
                         .setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
                         .setThriftRpcTimeoutMs(5000).setLoadId(txnExecutor.getLoadId())
                         .setMergeType(TMergeType.MERGE).setDeleteCondition(DELETE_CONDITION)
                         .setColumns(targetColumn);
-                txnConf.setTxnId(txnId).setAuthCodeUuid(authCodeUuid);
-                txnEntry.setLabel(label);
-                txnExecutor.setTxnId(txnId);
-            } catch (DuplicatedRequestException e) {
-                LOG.warn("duplicate request for sync channel. channel: {}, request id: {}, txn: {}, table: {}",
+                    txnConf.setTxnId(txnId).setAuthCodeUuid(authCodeUuid);
+                    txnEntry.setLabel(label);
+                    txnExecutor.setTxnId (txnId);
+                } catch (DuplicatedRequestException e) {
+                    LOG.warn ("duplicate request for sync channel. channel: {}, request id: {}, txn: {}, table: {}",
                         id, e.getDuplicatedRequestId(), e.getTxnId(), targetTable);
-                txnExecutor.setTxnId(e.getTxnId());
-            } catch (LabelAlreadyUsedException e) {
-                // this happens when channel re-consume same batch, we should just pass through it without begin a new txn
-                LOG.warn("Label already used in channel {}, label: {}, table: {}, batch: {}", id, label, targetTable, batchId);
-                return;
-            } catch (AnalysisException | BeginTransactionException e) {
-                LOG.warn("encounter an error when beginning txn in channel {}, table: {}", id, targetTable);
-                throw e;
-            } catch (UserException e) {
-                LOG.warn("encounter an error when creating plan in channel {}, table: {}", id, targetTable);
-                throw e;
-            }
-            try {
-                // async exec begin transaction
-                long txnId = txnExecutor.getTxnId();
-                if (txnId != -1L) {
-                    this.txnExecutor.beginTransaction(request);
-                    LOG.info("begin txn in channel {}, table: {}, label:{}, txn id: {}", id, targetTable, label, txnExecutor.getTxnId());
+                    txnExecutor.setTxnId(e.getTxnId());
+                } catch (LabelAlreadyUsedException e) {
+                    // this happens when channel re-consume same batch,
+                    // we should just pass through it without begin a new txn
+                    LOG.warn ("Label already used in channel {}, label: {}, table: {}, batch: {}",
+                        id, label, targetTable, batchId);
+                    return;
+                } catch (AnalysisException | BeginTransactionException e) {
+                    LOG.warn ("encounter an error when beginning txn in channel {}, table: {}",
+                        id, targetTable);
+                    throw e;
+                } catch (UserException e) {
+                    LOG.warn ("encounter an error when creating plan in channel {}, table: {}",
+                        id, targetTable);
+                    throw e;
                 }
-            } catch (TException e) {
-                LOG.warn("Failed to begin txn in channel {}, table: {}, txn: {}, msg:{}", id, targetTable, txnExecutor.getTxnId(), e.getMessage());
-                throw e;
-            } catch (TimeoutException | InterruptedException | ExecutionException e) {
-                LOG.warn("Error occur while waiting begin txn response in channel {}, table: {}, txn: {}, msg:{}",
+                try {
+                    // async exec begin transaction
+                    long txnId = txnExecutor.getTxnId();
+                    if ( txnId != - 1L ) {
+                        this.txnExecutor.beginTransaction (request);
+                        LOG.info ("begin txn in channel {}, table: {}, label:{}, txn id: {}",
+                            id, targetTable, label, txnExecutor.getTxnId());
+                    }
+                } catch ( TException e) {
+                    LOG.warn ("Failed to begin txn in channel {}, table: {}, txn: {}, msg:{}",
+                        id, targetTable, txnExecutor.getTxnId(), e.getMessage());
+                    throw e;
+                } catch ( TimeoutException | InterruptedException | ExecutionException e) {
+                    LOG.warn ("Error occur while waiting begin txn response in channel {}, table: {}, txn: {}, msg:{}",
                         id, targetTable, txnExecutor.getTxnId(), e.getMessage());
-                throw e;
+                    throw e;
+                }
+            } else {
+                String failMsg = "current running txns on db " + db.getId() + " is "
+                    + databaseTransactionMgr.getRunningTxnNums() + ", larger than limit " + Config.max_running_txn_num_per_db;
+                LOG.warn(failMsg);
+                throw new BeginTransactionException(failMsg);
             }
         }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 9ce906fb79..bf530c5ef4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -195,7 +195,7 @@ public class DatabaseTransactionMgr {
         return labelToTxnIds.get(label);
     }
 
-    protected int getRunningTxnNums() {
+    public int getRunningTxnNums() {
         return runningTxnNums;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 07/17: [Bug] Missing error tablet list when close_wait return error (#9418)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 0c73dc85a6aabb30b4104f46fea1fbdf3a24587e
Author: pengxiangyu <di...@163.com>
AuthorDate: Sun May 8 06:45:28 2022 +0800

    [Bug] Missing error tablet list when close_wait return error (#9418)
---
 be/src/olap/delta_writer.cpp       | 12 ++++++++++--
 be/src/olap/delta_writer.h         |  4 +++-
 be/src/runtime/load_channel.cpp    |  3 ++-
 be/src/runtime/tablets_channel.cpp |  7 +++++--
 be/src/runtime/tablets_channel.h   |  5 ++++-
 5 files changed, 24 insertions(+), 7 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 8446340d5a..198019561f 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -279,7 +279,9 @@ OLAPStatus DeltaWriter::close() {
     return OLAP_SUCCESS;
 }
 
-OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec, bool is_broken) {
+OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
+                               google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors,
+                               bool is_broken) {
     std::lock_guard<std::mutex> l(_lock);
     DCHECK(_is_init)
             << "delta writer is supposed be to initialized before close_wait() being called";
@@ -289,7 +291,13 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf
     }
 
     // return error if previous flush failed
-    RETURN_NOT_OK(_flush_token->wait());
+    OLAPStatus st = _flush_token->wait();
+    if (st != OLAP_SUCCESS) {
+        PTabletError* tablet_error = tablet_errors->Add();
+        tablet_error->set_tablet_id(_tablet->tablet_id());
+        tablet_error->set_msg("flush failed");
+        return st;
+    }
     DCHECK_EQ(_mem_tracker->consumption(), 0);
 
     // use rowset meta manager to save meta
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index c765d03115..cf5a2729d2 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -67,7 +67,9 @@ public:
     OLAPStatus close();
     // wait for all memtables to be flushed.
     // mem_consumption() should be 0 after this function returns.
-    OLAPStatus close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec, bool is_broken);
+    OLAPStatus close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
+                      google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors,
+                      bool is_broken);
 
     // abandon current memtable and wait for all pending-flushing memtables to be destructed.
     // mem_consumption() should be 0 after this function returns.
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index db523f2aa7..715762bd18 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -99,7 +99,8 @@ Status LoadChannel::add_batch(const PTabletWriterAddBatchRequest& request,
         bool finished = false;
         RETURN_IF_ERROR(channel->close(request.sender_id(), request.backend_id(), 
                                        &finished, request.partition_ids(),
-                                       response->mutable_tablet_vec()));
+                                       response->mutable_tablet_vec(),
+                                       response->mutable_tablet_errors()));
         if (finished) {
             std::lock_guard<std::mutex> l(_lock);
             _tablets_channels.erase(index_id);
diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp
index 75f9936e9b..926cba7aaa 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -149,7 +149,8 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& request,
 
 Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished,
                              const google::protobuf::RepeatedField<int64_t>& partition_ids,
-                             google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec) {
+                             google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
+                             google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors) {
     std::lock_guard<std::mutex> l(_lock);
     if (_state == kFinished) {
         return _close_status;
@@ -197,7 +198,9 @@ Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished,
         for (auto writer : need_wait_writers) {
             // close may return failed, but no need to handle it here.
             // tablet_vec will only contains success tablet, and then let FE judge it.
-            writer->close_wait(tablet_vec, (_broken_tablets.find(writer->tablet_id()) != _broken_tablets.end()));
+            writer->close_wait(
+                    tablet_vec, tablet_errors,
+                    (_broken_tablets.find(writer->tablet_id()) != _broken_tablets.end()));
         }
         // TODO(gaodayue) clear and destruct all delta writers to make sure all memory are freed
         // DCHECK_EQ(_mem_tracker->consumption(), 0);
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index e99ac6264b..360242ae88 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#pragma once
+
 #include <cstdint>
 #include <unordered_map>
 #include <utility>
@@ -69,7 +71,8 @@ public:
     // no-op when this channel has been closed or cancelled
     Status close(int sender_id, int64_t backend_id, bool* finished,
                  const google::protobuf::RepeatedField<int64_t>& partition_ids,
-                 google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec);
+                 google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
+                 google::protobuf::RepeatedPtrField<PTabletError>* tablet_error);
 
     // no-op when this channel has been closed or cancelled
     Status cancel();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 17/17: [fix](planner)VecNotImplException thrown when query need rewrite and some slot cannot changed to nullable (#9589)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit c41618a0b61f8a8226ccb5dd18b7a5ce45941c4f
Author: morrySnow <10...@users.noreply.github.com>
AuthorDate: Mon May 16 22:34:02 2022 +0800

    [fix](planner)VecNotImplException thrown when query need rewrite and some slot cannot changed to nullable (#9589)
---
 .../java/org/apache/doris/analysis/Analyzer.java   | 35 ----------------------
 .../apache/doris/common/util/VectorizedUtil.java   | 29 +++++++++---------
 2 files changed, 14 insertions(+), 50 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
index 0cf48b6afb..4fceedf384 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
@@ -298,17 +298,6 @@ public class Analyzer {
 
         private final long autoBroadcastJoinThreshold;
 
-        /**
-         * This property is mainly used to store the vectorized switch of the current query.
-         * true: the vectorization of the current query is turned on
-         * false: the vectorization of the current query is turned off.
-         * It is different from the vectorized switch`enableVectorizedEngine` of the session.
-         * It is only valid for a single query, while the session switch is valid for all queries in the session.
-         * It cannot be set directly by the user, only by inheritance from session`enableVectorizedEngine`
-         * or internal adjustment of the system.
-         */
-        private boolean enableQueryVec;
-
         public GlobalState(Catalog catalog, ConnectContext context) {
             this.catalog = catalog;
             this.context = context;
@@ -359,9 +348,6 @@ public class Analyzer {
                 // autoBroadcastJoinThreshold is a "final" field, must set an initial value for it
                 autoBroadcastJoinThreshold = 0;
             }
-            if (context != null) {
-                enableQueryVec = context.getSessionVariable().enableVectorizedEngine();
-            }
         }
     }
 
@@ -666,27 +652,6 @@ public class Analyzer {
         return globalState.mvExprRewriter;
     }
 
-    /**
-     * Only the top-level `query vec` value of the query analyzer represents the value of the entire query.
-     * Other sub-analyzers cannot represent the value of `query vec`.
-     * @return
-     */
-    public boolean enableQueryVec() {
-        if (ancestors.isEmpty()) {
-            return globalState.enableQueryVec;
-        } else {
-            return ancestors.get(ancestors.size() - 1).enableQueryVec();
-        }
-    }
-
-    /**
-     * Since analyzer cannot get sub-analyzers from top to bottom.
-     * So I can only set the `query vec` variable of the top level analyzer of query to true.
-     */
-    public void disableQueryVec() {
-        globalState.enableQueryVec = false;
-    }
-
     /**
      * Return descriptor of registered table/alias.
      *
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java
index 4b793a0e9b..d8fc1f55f3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java
@@ -17,9 +17,12 @@
 
 package org.apache.doris.common.util;
 
-import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.SetVar;
+import org.apache.doris.analysis.StringLiteral;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.qe.VariableMgr;
 
 public class VectorizedUtil {
     /**
@@ -33,15 +36,7 @@ public class VectorizedUtil {
         if (connectContext == null) {
             return false;
         }
-        StmtExecutor stmtExecutor = connectContext.getExecutor();
-        if (stmtExecutor == null) {
-            return connectContext.getSessionVariable().enableVectorizedEngine();
-        }
-        Analyzer analyzer = stmtExecutor.getAnalyzer();
-        if (analyzer == null) {
-            return connectContext.getSessionVariable().enableVectorizedEngine();
-        }
-        return analyzer.enableQueryVec();
+        return connectContext.getSessionVariable().enableVectorizedEngine();
     }
 
     /**
@@ -63,11 +58,15 @@ public class VectorizedUtil {
         if (connectContext == null) {
             return;
         }
-        Analyzer analyzer = connectContext.getExecutor().getAnalyzer();
-        if (analyzer == null) {
-            return;
+        SessionVariable sessionVariable = connectContext.getSessionVariable();
+        sessionVariable.setIsSingleSetVar(true);
+        try {
+            VariableMgr.setVar(sessionVariable, new SetVar(
+                    "enable_vectorized_engine",
+                    new StringLiteral("false")));
+        } catch (DdlException e) {
+            // do nothing
         }
-        analyzer.disableQueryVec();
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 09/17: [refactor](backend) Refactor the logic of selecting Backend in FE. (#9478)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 61166162843c1937debfa5479176a6c80646fc76
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Wed May 11 09:40:57 2022 +0800

    [refactor](backend) Refactor the logic of selecting Backend in FE. (#9478)
    
    There are many places in FE where a group of BE nodes needs to be selected according to certain requirements. for example:
    1. When creating replicas for a tablet.
    2. When selecting a BE to execute Insert.
    3. When Stream Load forwards http requests to BE nodes.
    
    These operations all have the same logic. So this CL mainly changes:
    1. Create a new `BeSelectionPolicy` class to describe the set of conditions for selecting BE.
    2. The logic of selecting BE nodes in `SystemInfoService` has been refactored, and the following two methods are used uniformly:
        1. `selectBackendIdsByPolicy`: Select the required number of BE nodes according to the `BeSelectionPolicy`.
        2. `selectBackendIdsForReplicaCreation`: Select the BE node for the replica creation operation.
    
    Note that there are some changes here:
    For the replica creation operation, the round-robin method was used to select BE nodes before,
    but now it is changed to `random` selection for the following reasons:
    1. Although the previous logic is round-robin, it is actually random.
    2. The final diff of the random algorithm will not be greater than 5%, so it can be considered that the random algorithm
         can distribute the data evenly.
---
 .../java/org/apache/doris/backup/RestoreJob.java   |   9 +-
 .../java/org/apache/doris/catalog/Catalog.java     |   9 +-
 .../java/org/apache/doris/catalog/OlapTable.java   |   3 +-
 .../org/apache/doris/httpv2/rest/LoadAction.java   |  53 ++--
 .../apache/doris/httpv2/util/LoadSubmitter.java    |  19 +-
 .../apache/doris/qe/InsertStreamTxnExecutor.java   |  14 +-
 .../java/org/apache/doris/qe/MultiLoadMgr.java     |  14 +-
 .../org/apache/doris/system/BeSelectionPolicy.java | 131 ++++++++++
 .../org/apache/doris/system/SystemInfoService.java | 251 +++++--------------
 .../org/apache/doris/backup/RestoreJobTest.java    | 132 +---------
 .../org/apache/doris/catalog/CreateTableTest.java  |   6 +-
 .../apache/doris/catalog/ModifyBackendTest.java    |   6 +-
 .../doris/load/sync/canal/CanalSyncDataTest.java   |   7 +-
 .../java/org/apache/doris/qe/MultiLoadMgrTest.java |  13 +-
 .../apache/doris/system/SystemInfoServiceTest.java | 268 +++++++++++++++++++++
 15 files changed, 525 insertions(+), 410 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 30878a29ca..f29ab14be1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -74,9 +74,6 @@ import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.thrift.TStorageType;
 import org.apache.doris.thrift.TTaskType;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
@@ -87,6 +84,9 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Table.Cell;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -1018,7 +1018,8 @@ public class RestoreJob extends AbstractJob {
 
                 // replicas
                 try {
-                    Map<Tag, List<Long>> beIds = Catalog.getCurrentSystemInfo().chooseBackendIdByFilters(replicaAlloc, clusterName, null);
+                    Map<Tag, List<Long>> beIds = Catalog.getCurrentSystemInfo()
+                            .selectBackendIdsForReplicaCreation(replicaAlloc, clusterName, null);
                     for (Map.Entry<Tag, List<Long>> entry : beIds.entrySet()) {
                         for (Long beId : entry.getValue()) {
                             long newReplicaId = catalog.getNextId();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 7621d0008c..3bb59eac55 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -270,6 +270,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.codehaus.jackson.map.ObjectMapper;
 
+import javax.annotation.Nullable;
 import java.io.BufferedReader;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -296,8 +297,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
-import javax.annotation.Nullable;
-
 public class Catalog {
     private static final Logger LOG = LogManager.getLogger(Catalog.class);
     // 0 ~ 9999 used for qe
@@ -4524,10 +4523,12 @@ public class Catalog {
                 // This is the first colocate table in the group, or just a normal table,
                 // randomly choose backends
                 if (!Config.disable_storage_medium_check) {
-                    chosenBackendIds = getCurrentSystemInfo().chooseBackendIdByFilters(replicaAlloc, clusterName,
+                    chosenBackendIds =
+                            getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, clusterName,
                             tabletMeta.getStorageMedium());
                 } else {
-                    chosenBackendIds = getCurrentSystemInfo().chooseBackendIdByFilters(replicaAlloc, clusterName, null);
+                    chosenBackendIds =
+                            getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, clusterName, null);
                 }
 
                 for (Map.Entry<Tag, List<Long>> entry : chosenBackendIds.entrySet()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 4b015c685d..253d53521e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -500,7 +500,8 @@ public class OlapTable extends Table {
 
                     // replicas
                     try {
-                        Map<Tag, List<Long>> tag2beIds = Catalog.getCurrentSystemInfo().chooseBackendIdByFilters(
+                        Map<Tag, List<Long>> tag2beIds =
+                                Catalog.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(
                                 replicaAlloc, db.getClusterName(), null);
                         for (Map.Entry<Tag, List<Long>> entry3 : tag2beIds.entrySet()) {
                             for (Long beId : entry3.getValue()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index b9c85dbdfb..ebdda12926 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -21,12 +21,14 @@ import org.apache.doris.catalog.Catalog;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.LoadException;
 import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
 import org.apache.doris.httpv2.entity.RestBaseResult;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.system.Backend;
+import org.apache.doris.system.BeSelectionPolicy;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TNetworkAddress;
 
@@ -41,12 +43,10 @@ import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.servlet.view.RedirectView;
 
-import java.util.List;
-
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
-
 import io.netty.handler.codec.http.HttpHeaderNames;
+import java.util.List;
 
 @RestController
 public class LoadAction extends RestBaseController {
@@ -145,21 +145,7 @@ public class LoadAction extends RestBaseController {
                     return new RestBaseResult(e.getMessage());
                 }
             } else {
-                // Choose a backend sequentially.
-                SystemInfoService.BeAvailablePredicate beAvailablePredicate =
-                        new SystemInfoService.BeAvailablePredicate(false, false, true);
-                List<Long> backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(
-                        1, beAvailablePredicate, false, clusterName, null, null);
-                if (backendIds == null) {
-                    return new RestBaseResult(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
-                }
-
-                Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0));
-                if (backend == null) {
-                    return new RestBaseResult(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
-                }
-
-                redirectAddr = new TNetworkAddress(backend.getHost(), backend.getHttpPort());
+                redirectAddr = selectRedirectBackend(clusterName);
             }
 
             LOG.info("redirect load action to destination={}, stream: {}, db: {}, tbl: {}, label: {}",
@@ -194,22 +180,7 @@ public class LoadAction extends RestBaseController {
                 return new RestBaseResult("No transaction operation(\'commit\' or \'abort\') selected.");
             }
 
-            // Choose a backend sequentially.
-            SystemInfoService.BeAvailablePredicate beAvailablePredicate =
-                    new SystemInfoService.BeAvailablePredicate(false, false, true);
-            List<Long> backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(
-                    1, beAvailablePredicate, false, clusterName, null, null);
-            if (backendIds == null) {
-                return new RestBaseResult(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
-            }
-
-            Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0));
-            if (backend == null) {
-                return new RestBaseResult(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
-            }
-
-            TNetworkAddress redirectAddr = new TNetworkAddress(backend.getHost(), backend.getHttpPort());
-
+            TNetworkAddress redirectAddr = selectRedirectBackend(clusterName);
             LOG.info("redirect stream load 2PC action to destination={}, db: {}, txn: {}, operation: {}",
                     redirectAddr.toString(), dbName, request.getHeader(TXN_ID_KEY), txnOperation);
 
@@ -220,4 +191,18 @@ public class LoadAction extends RestBaseController {
             return new RestBaseResult(e.getMessage());
         }
     }
+
+    private TNetworkAddress selectRedirectBackend(String clusterName) throws LoadException {
+        BeSelectionPolicy policy = new BeSelectionPolicy.Builder().setCluster(clusterName).needLoadAvailable().build();
+        List<Long> backendIds = Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
+        if (backendIds.isEmpty()) {
+            throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
+        }
+
+        Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0));
+        if (backend == null) {
+            throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
+        }
+        return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java
index 1a715cf0e3..31aeba813d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java
@@ -19,10 +19,11 @@ package org.apache.doris.httpv2.util;
 
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.cluster.ClusterNamespace;
-import org.apache.doris.common.DdlException;
+import org.apache.doris.common.LoadException;
 import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.httpv2.rest.UploadAction;
 import org.apache.doris.system.Backend;
+import org.apache.doris.system.BeSelectionPolicy;
 import org.apache.doris.system.SystemInfoService;
 
 import com.google.common.base.Strings;
@@ -136,19 +137,15 @@ public class LoadSubmitter {
             return file;
         }
 
-        private Backend selectOneBackend() throws DdlException {
-            SystemInfoService.BeAvailablePredicate beAvailablePredicate =
-                    new SystemInfoService.BeAvailablePredicate(false, false, true);
-            List<Long> backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(
-                    1, beAvailablePredicate, false,
-                    SystemInfoService.DEFAULT_CLUSTER, null, null);
-            if (backendIds == null) {
-                throw new DdlException("No alive backend");
+        private Backend selectOneBackend() throws LoadException {
+            BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().build();
+            List<Long> backendIds = Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
+            if (backendIds.isEmpty()) {
+                throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
             }
-
             Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0));
             if (backend == null) {
-                throw new DdlException("No alive backend");
+                throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
             }
             return backend;
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
index 2cb8fe4ea6..ea0cb6cfda 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
@@ -26,7 +26,7 @@ import org.apache.doris.proto.Types;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.system.Backend;
-import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.system.BeSelectionPolicy;
 import org.apache.doris.task.StreamLoadTask;
 import org.apache.doris.thrift.TBrokerRangeDesc;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
@@ -63,13 +63,11 @@ public class InsertStreamTxnExecutor {
         StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request);
         StreamLoadPlanner planner = new StreamLoadPlanner(txnEntry.getDb(), (OlapTable) txnEntry.getTable(), streamLoadTask);
         TExecPlanFragmentParams tRequest = planner.plan(streamLoadTask.getId());
-        SystemInfoService.BeAvailablePredicate beAvailablePredicate =
-                new SystemInfoService.BeAvailablePredicate(false, true, true);
-        List<Long> beIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(
-                1, beAvailablePredicate, false,
-                txnEntry.getDb().getClusterName(), null, null);
-        if (beIds == null || beIds.isEmpty()) {
-            throw new UserException("there is no backend load available or scanNode backend available.");
+        BeSelectionPolicy policy = new BeSelectionPolicy.Builder().setCluster(txnEntry.getDb().getClusterName())
+                .needLoadAvailable().needQueryAvailable().build();
+        List<Long> beIds = Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
+        if (beIds.isEmpty()) {
+            throw new UserException("No available backend to match the policy: " + policy);
         }
 
         tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
index fd8cf80cac..606d68e6c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
@@ -41,6 +41,7 @@ import org.apache.doris.load.loadv2.JobState;
 import org.apache.doris.load.loadv2.LoadJob;
 import org.apache.doris.load.loadv2.LoadTask;
 import org.apache.doris.system.Backend;
+import org.apache.doris.system.BeSelectionPolicy;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TMiniLoadRequest;
 import org.apache.doris.thrift.TNetworkAddress;
@@ -91,14 +92,13 @@ public class MultiLoadMgr {
             if (infoMap.containsKey(multiLabel)) {
                 throw new LabelAlreadyUsedException(label);
             }
-            MultiLoadDesc multiLoadDesc = new MultiLoadDesc(multiLabel, properties);
-            SystemInfoService.BeAvailablePredicate beAvailablePredicate =
-                    new SystemInfoService.BeAvailablePredicate(false, false, true);
-            List<Long> backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(1,
-                    beAvailablePredicate, false, ConnectContext.get().getClusterName(), null, null);
-            if (backendIds == null) {
-                throw new DdlException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
+            BeSelectionPolicy policy = new BeSelectionPolicy.Builder().setCluster(ConnectContext.get().getClusterName())
+                    .needLoadAvailable().build();
+            List<Long> backendIds = Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
+            if (backendIds.isEmpty()) {
+                throw new DdlException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + " policy: " + policy);
             }
+            MultiLoadDesc multiLoadDesc = new MultiLoadDesc(multiLabel, properties);
             multiLoadDesc.setBackendId(backendIds.get(0));
             infoMap.put(multiLabel, multiLoadDesc);
         } finally {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
new file mode 100644
index 0000000000..e995c0aff5
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.system;
+
+import org.apache.doris.resource.Tag;
+import org.apache.doris.thrift.TStorageMedium;
+
+import com.google.common.collect.Sets;
+
+import java.util.Set;
+
+/**
+ * Selection policy for building BE nodes
+ */
+public class BeSelectionPolicy {
+    public String cluster = SystemInfoService.DEFAULT_CLUSTER;
+    public boolean needScheduleAvailable = false;
+    public boolean needQueryAvailable = false;
+    public boolean needLoadAvailable = false;
+    // Resource tag. Empty means no need to consider resource tag.
+    public Set<Tag> resourceTags = Sets.newHashSet();
+    // storage medium. null means no need to consider storage medium.
+    public TStorageMedium storageMedium = null;
+    // Check if disk usage reaches limit. false means no need to check.
+    public boolean checkDiskUsage = false;
+    // If set to false, do not select backends on same host.
+    public boolean allowOnSameHost = false;
+
+    private BeSelectionPolicy() {
+
+    }
+
+    public static class Builder {
+        private BeSelectionPolicy policy;
+        public Builder() {
+            policy = new BeSelectionPolicy();
+        }
+
+        public Builder setCluster(String cluster) {
+            policy.cluster = cluster;
+            return this;
+        }
+
+        public Builder needScheduleAvailable() {
+            policy.needScheduleAvailable = true;
+            return this;
+        }
+
+        public Builder needQueryAvailable() {
+            policy.needQueryAvailable = true;
+            return this;
+        }
+
+        public Builder needLoadAvailable() {
+            policy.needLoadAvailable = true;
+            return this;
+        }
+
+        public Builder addTags(Set<Tag> tags) {
+            policy.resourceTags.addAll(tags);
+            return this;
+        }
+
+        public Builder setStorageMedium(TStorageMedium medium) {
+            policy.storageMedium = medium;
+            return this;
+        }
+
+        public Builder needCheckDiskUsage() {
+            policy.checkDiskUsage = true;
+            return this;
+        }
+
+        public Builder allowOnSameHost() {
+            policy.allowOnSameHost = true;
+            return this;
+        }
+
+        public BeSelectionPolicy build() {
+            return policy;
+        }
+    }
+
+    public boolean isMatch(Backend backend) {
+        if (needScheduleAvailable && !backend.isScheduleAvailable()
+                || needQueryAvailable && !backend.isQueryAvailable()
+                || needLoadAvailable && !backend.isLoadAvailable()
+                || !resourceTags.isEmpty() && !resourceTags.contains(backend.getTag())
+                || storageMedium != null && !backend.hasSpecifiedStorageMedium(storageMedium)) {
+            return false;
+        }
+
+        if (checkDiskUsage) {
+            if (storageMedium == null && backend.diskExceedLimit()) {
+                return false;
+            }
+            if (storageMedium != null && backend.diskExceedLimitByStorageMedium(storageMedium)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("cluster|query|load|schedule|tags|medium: ");
+        sb.append(cluster).append("|");
+        sb.append(needQueryAvailable).append("|");
+        sb.append(needLoadAvailable).append("|");
+        sb.append(needScheduleAvailable).append("|");
+        sb.append(resourceTags).append("|");
+        sb.append(storageMedium);
+        return sb.toString();
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index 39e577f55a..82571cd3d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -40,7 +40,6 @@ import org.apache.doris.thrift.TStorageMedium;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
@@ -61,10 +60,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 public class SystemInfoService {
     private static final Logger LOG = LogManager.getLogger(SystemInfoService.class);
@@ -75,42 +72,16 @@ public class SystemInfoService {
 
     public static final String NO_SCAN_NODE_BACKEND_AVAILABLE_MSG = "There is no scanNode Backend available.";
 
-    public static class BeAvailablePredicate {
-        private boolean scheduleAvailable;
+    private volatile ImmutableMap<Long, Backend> idToBackendRef = ImmutableMap.of();
+    private volatile ImmutableMap<Long, AtomicLong> idToReportVersionRef = ImmutableMap.of();
 
-        private boolean queryAvailable;
-
-        private boolean loadAvailable;
-
-        public BeAvailablePredicate(boolean scheduleAvailable, boolean queryAvailable, boolean loadAvailable) {
-            this.scheduleAvailable = scheduleAvailable;
-            this.queryAvailable = queryAvailable;
-            this.loadAvailable = loadAvailable;
-        }
-
-        public boolean isMatch(Backend backend) {
-            if (scheduleAvailable && !backend.isScheduleAvailable() || queryAvailable && !backend.isQueryAvailable() ||
-                    loadAvailable && !backend.isLoadAvailable()) {
-                return false;
-            }
-            return true;
-        }
-    }
-
-    private volatile ImmutableMap<Long, Backend> idToBackendRef;
-    private volatile ImmutableMap<Long, AtomicLong> idToReportVersionRef;
-
-    // last backend id used by round robin for sequential choosing backends for
-    // tablet creation
-    private ConcurrentHashMap<String, Long> lastBackendIdForCreationMap;
-    // last backend id used by round robin for sequential choosing backends in
-    // other jobs
-    private ConcurrentHashMap<String, Long> lastBackendIdForOtherMap;
+    // last backend id used by round robin for sequential selecting backends for replica creation
+    private Map<Tag, Long> lastBackendIdForReplicaCreation = Maps.newConcurrentMap();
 
     private long lastBackendIdForCreation = -1;
     private long lastBackendIdForOther = -1;
 
-    private volatile ImmutableMap<Long, DiskInfo> pathHashToDishInfoRef;
+    private volatile ImmutableMap<Long, DiskInfo> pathHashToDishInfoRef = ImmutableMap.of();
 
     // sort host backends list by num of backends, descending
     private static final Comparator<List<Backend>> hostBackendsListComparator = new Comparator<List<Backend>>() {
@@ -124,15 +95,6 @@ public class SystemInfoService {
         }
     };
 
-    public SystemInfoService() {
-        idToBackendRef = ImmutableMap.<Long, Backend>of();
-        idToReportVersionRef = ImmutableMap.<Long, AtomicLong>of();
-
-        lastBackendIdForCreationMap = new ConcurrentHashMap<String, Long>();
-        lastBackendIdForOtherMap = new ConcurrentHashMap<String, Long>();
-        pathHashToDishInfoRef = ImmutableMap.<Long, DiskInfo>of();
-    }
-
     // for deploy manager
     public void addBackends(List<Pair<String, Integer>> hostPortPairs, boolean isFree) throws UserException {
         addBackends(hostPortPairs, isFree, "", Tag.DEFAULT_BACKEND_TAG);
@@ -432,9 +394,6 @@ public class SystemInfoService {
             LOG.warn("not enough available backends. require :" + instanceNum + " get:" + chosenBackendIds.size());
             return null;
         }
-
-        lastBackendIdForCreationMap.put(clusterName, (long) -1);
-        lastBackendIdForOtherMap.put(clusterName, (long) -1);
         return chosenBackendIds;
     }
 
@@ -462,9 +421,6 @@ public class SystemInfoService {
                 }
             }
         }
-
-        lastBackendIdForCreationMap.remove(clusterName);
-        lastBackendIdForOtherMap.remove(clusterName);
     }
 
     /**
@@ -779,21 +735,35 @@ public class SystemInfoService {
     }
 
 
-    // Find enough backend to allocate replica of a tablet.
-    // filters include: tag, cluster, storage medium
-    public Map<Tag, List<Long>> chooseBackendIdByFilters(ReplicaAllocation replicaAlloc, String clusterName, TStorageMedium storageMedium)
+    /**
+     * Select a set of backends for replica creation.
+     * The following parameters need to be considered when selecting backends.
+     *
+     * @param replicaAlloc
+     * @param clusterName
+     * @param storageMedium
+     * @return return the selected backend ids group by tag.
+     * @throws DdlException
+     */
+    public Map<Tag, List<Long>> selectBackendIdsForReplicaCreation(
+            ReplicaAllocation replicaAlloc, String clusterName, TStorageMedium storageMedium)
             throws DdlException {
         Map<Tag, List<Long>> chosenBackendIds = Maps.newHashMap();
         Map<Tag, Short> allocMap = replicaAlloc.getAllocMap();
         short totalReplicaNum = 0;
-        BeAvailablePredicate beAvailablePredicate = new BeAvailablePredicate(true, false, false);
+
         for (Map.Entry<Tag, Short> entry : allocMap.entrySet()) {
-            List<Long> beIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(entry.getValue(),
-                    beAvailablePredicate, true, clusterName, storageMedium, entry.getKey());
-            if (beIds == null) {
-                throw new DdlException("Failed to find enough host with storage medium and tag("
-                        + (storageMedium == null ? "NaN" : storageMedium) + "/" + entry.getKey()
-                        + ") in all backends. need: " + entry.getValue());
+            BeSelectionPolicy.Builder builder = new BeSelectionPolicy.Builder().setCluster(clusterName)
+                    .needScheduleAvailable().needCheckDiskUsage().addTags(Sets.newHashSet(entry.getKey()))
+                    .setStorageMedium(storageMedium);
+            if (FeConstants.runningUnitTest || Config.allow_replica_on_same_host) {
+                builder.allowOnSameHost();
+            }
+
+            BeSelectionPolicy policy = builder.build();
+            List<Long> beIds = selectBackendIdsByPolicy(policy, entry.getValue());
+            if (beIds.isEmpty()) {
+                throw new DdlException("Failed to find " + entry.getValue() + " backends for policy: " + policy);
             }
             chosenBackendIds.put(entry.getKey(), beIds);
             totalReplicaNum += beIds.size();
@@ -802,61 +772,34 @@ public class SystemInfoService {
         return chosenBackendIds;
     }
 
-    public List<Long> seqChooseBackendIdsByStorageMediumAndTag(int backendNum, BeAvailablePredicate beAvailablePredicate,
-                                                               boolean isCreate, String clusterName,
-                                                               TStorageMedium storageMedium, Tag tag) {
-        Stream<Backend> beStream = getClusterBackends(clusterName).stream();
-        if (storageMedium == null) {
-            beStream = beStream.filter(v -> !v.diskExceedLimit());
-        } else {
-            beStream = beStream.filter(v -> !v.diskExceedLimitByStorageMedium(storageMedium));
+    /**
+     * Select a set of backends by the given policy.
+     *
+     * @param policy
+     * @param number number of backends which need to be selected.
+     * @return return #number of backend ids,
+     * or empty set if no backends match the policy, or the number of matched backends is less than "number";
+     */
+    public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) {
+        List<Backend> candidates =
+                idToBackendRef.values().stream().filter(policy::isMatch).collect(Collectors.toList());
+        if (candidates.size() < number) {
+            return Lists.newArrayList();
         }
-        if (tag != null) {
-            beStream = beStream.filter(v -> v.getTag().equals(tag));
+        // If only need one Backend, just return a random one.
+        if (number == 1) {
+            Collections.shuffle(candidates);
+            return Lists.newArrayList(candidates.get(0).getId());
         }
-        final List<Backend> backends = beStream.collect(Collectors.toList());
-        return seqChooseBackendIds(backendNum, beAvailablePredicate, isCreate, clusterName, backends);
-    }
 
-    // choose backends by round robin
-    // return null if not enough backend
-    // use synchronized to run serially
-    public synchronized List<Long> seqChooseBackendIds(int backendNum, BeAvailablePredicate beAvailablePredicate,
-                                                       boolean isCreate, String clusterName,
-                                                       final List<Backend> srcBackends) {
-        long lastBackendId;
-
-        if (clusterName.equals(DEFAULT_CLUSTER)) {
-            if (isCreate) {
-                lastBackendId = lastBackendIdForCreation;
-            } else {
-                lastBackendId = lastBackendIdForOther;
-            }
-        } else {
-            if (isCreate) {
-                if (lastBackendIdForCreationMap.containsKey(clusterName)) {
-                    lastBackendId = lastBackendIdForCreationMap.get(clusterName);
-                } else {
-                    lastBackendId = -1;
-                    lastBackendIdForCreationMap.put(clusterName, lastBackendId);
-                }
-            } else {
-                if (lastBackendIdForOtherMap.containsKey(clusterName)) {
-                    lastBackendId = lastBackendIdForOtherMap.get(clusterName);
-                } else {
-                    lastBackendId = -1;
-                    lastBackendIdForOtherMap.put(clusterName, lastBackendId);
-                }
-            }
+        if (policy.allowOnSameHost) {
+            Collections.shuffle(candidates);
+            return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList());
         }
 
-        // host -> BE list
-        List<Backend> sourceBackend = srcBackends;
-        if (sourceBackend == null) {
-            sourceBackend = getClusterBackends(clusterName);
-        }
+        // for each host, random select one backend.
         Map<String, List<Backend>> backendMaps = Maps.newHashMap();
-        for (Backend backend : sourceBackend) {
+        for (Backend backend : candidates) {
             if (backendMaps.containsKey(backend.getHost())) {
                 backendMaps.get(backend.getHost()).add(backend);
             } else {
@@ -865,94 +808,16 @@ public class SystemInfoService {
                 backendMaps.put(backend.getHost(), list);
             }
         }
-
-        // if more than one backend exists in same host, select a backend at random
-        List<Backend> backends = Lists.newArrayList();
+        candidates.clear();
         for (List<Backend> list : backendMaps.values()) {
-            if (FeConstants.runningUnitTest || Config.allow_replica_on_same_host) {
-                backends.addAll(list);
-            } else {
-                list = list.stream().filter(beAvailablePredicate::isMatch).collect(Collectors.toList());
-                if (list.isEmpty()) {
-                    continue;
-                }
-                Collections.shuffle(list);
-                backends.add(list.get(0));
-            }
+            Collections.shuffle(list);
+            candidates.add(list.get(0));
         }
-
-        Collections.shuffle(backends);
-
-        List<Long> backendIds = Lists.newArrayList();
-        // get last backend index
-        int lastBackendIndex = -1;
-        int index = -1;
-        for (Backend backend : backends) {
-            index++;
-            if (backend.getId() == lastBackendId) {
-                lastBackendIndex = index;
-                break;
-            }
+        if (candidates.size() < number) {
+            return Lists.newArrayList();
         }
-        Iterator<Backend> iterator = Iterators.cycle(backends);
-        index = -1;
-        boolean failed = false;
-        // 2 cycle at most
-        int maxIndex = 2 * backends.size();
-        while (iterator.hasNext() && backendIds.size() < backendNum) {
-            Backend backend = iterator.next();
-            index++;
-            if (index <= lastBackendIndex) {
-                continue;
-            }
-
-            if (index > maxIndex) {
-                failed = true;
-                break;
-            }
-
-            if (!beAvailablePredicate.isMatch(backend)) {
-                continue;
-            }
-
-            long backendId = backend.getId();
-            if (!backendIds.contains(backendId)) {
-                backendIds.add(backendId);
-                lastBackendId = backendId;
-            } else {
-                failed = true;
-                break;
-            }
-        }
-
-        if (clusterName.equals(DEFAULT_CLUSTER)) {
-            if (isCreate) {
-                lastBackendIdForCreation = lastBackendId;
-            } else {
-                lastBackendIdForOther = lastBackendId;
-            }
-        } else {
-            // update last backendId
-            if (isCreate) {
-                lastBackendIdForCreationMap.put(clusterName, lastBackendId);
-            } else {
-                lastBackendIdForOtherMap.put(clusterName, lastBackendId);
-            }
-        }
-        if (backendIds.size() != backendNum) {
-            failed = true;
-        }
-
-        if (!failed) {
-            return backendIds;
-        }
-
-        // debug
-        for (Backend backend : backends) {
-            LOG.debug("random select: {}", backend.toString());
-        }
-
-        return null;
+        Collections.shuffle(candidates);
+        return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList());
     }
 
     public ImmutableMap<Long, Backend> getIdToBackend() {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
index 90fc55a891..aaab4b3155 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
@@ -22,7 +22,6 @@ import org.apache.doris.backup.BackupJobInfo.BackupIndexInfo;
 import org.apache.doris.backup.BackupJobInfo.BackupOlapTableInfo;
 import org.apache.doris.backup.BackupJobInfo.BackupPartitionInfo;
 import org.apache.doris.backup.BackupJobInfo.BackupTabletInfo;
-import org.apache.doris.backup.RestoreJob.RestoreJobState;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.MaterializedIndex;
@@ -38,35 +37,19 @@ import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.MarkedCountDownLatch;
 import org.apache.doris.common.jmockit.Deencapsulation;
 import org.apache.doris.persist.EditLog;
-import org.apache.doris.resource.Tag;
 import org.apache.doris.system.SystemInfoService;
-import org.apache.doris.task.AgentTask;
-import org.apache.doris.task.AgentTaskQueue;
-import org.apache.doris.task.DirMoveTask;
-import org.apache.doris.task.DownloadTask;
-import org.apache.doris.task.SnapshotTask;
-import org.apache.doris.thrift.TBackend;
-import org.apache.doris.thrift.TFinishTaskRequest;
-import org.apache.doris.thrift.TStatus;
-import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TStorageMedium;
-import org.apache.doris.thrift.TTaskType;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
-import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.Adler32;
-
 import mockit.Delegate;
 import mockit.Expectations;
 import mockit.Injectable;
@@ -161,12 +144,12 @@ public class RestoreJobTest {
         
         new Expectations() {
             {
-                systemInfoService.seqChooseBackendIdsByStorageMediumAndTag(anyInt, (SystemInfoService.BeAvailablePredicate) any,
-                        anyBoolean, anyString, (TStorageMedium) any, (Tag) any);
+                systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any,
+                        anyString, (TStorageMedium) any);
                 minTimes = 0;
                 result = new Delegate() {
-                    public synchronized List<Long> seqChooseBackendIds(int backendNum, boolean needAlive,
-                                                                       boolean isCreate, String clusterName) {
+                    public synchronized List<Long> selectBackendIdsForReplicaCreation(
+                            ReplicaAllocation replicaAlloc, String clusterName, TStorageMedium medium) {
                         List<Long> beIds = Lists.newArrayList();
                         beIds.add(CatalogMocker.BACKEND1_ID);
                         beIds.add(CatalogMocker.BACKEND2_ID);
@@ -259,113 +242,6 @@ public class RestoreJobTest {
         backupMeta = new BackupMeta(tbls, resources);
     }
 
-    @Ignore
-    @Test
-    public void testRun() {
-        // pending
-        job.run();
-        Assert.assertEquals(Status.OK, job.getStatus());
-        Assert.assertEquals(RestoreJobState.SNAPSHOTING, job.getState());
-        Assert.assertEquals(12, job.getFileMapping().getMapping().size());
-
-        // 2. snapshoting
-        job.run();
-        Assert.assertEquals(Status.OK, job.getStatus());
-        Assert.assertEquals(RestoreJobState.SNAPSHOTING, job.getState());
-        Assert.assertEquals(12 * 2, AgentTaskQueue.getTaskNum());
-
-        // 3. snapshot finished
-        List<AgentTask> agentTasks = Lists.newArrayList();
-        Map<TTaskType, Set<Long>> runningTasks = Maps.newHashMap();
-        agentTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND1_ID, runningTasks));
-        agentTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND2_ID, runningTasks));
-        agentTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND3_ID, runningTasks));
-        Assert.assertEquals(12 * 2, agentTasks.size());
-
-        for (AgentTask agentTask : agentTasks) {
-            if (agentTask.getTaskType() != TTaskType.MAKE_SNAPSHOT) {
-                continue;
-            }
-
-            SnapshotTask task = (SnapshotTask) agentTask;
-            String snapshotPath = "/path/to/snapshot/" + System.currentTimeMillis();
-            TStatus taskStatus = new TStatus(TStatusCode.OK);
-            TBackend tBackend = new TBackend("", 0, 1);
-            TFinishTaskRequest request = new TFinishTaskRequest(tBackend, TTaskType.MAKE_SNAPSHOT,
-                    task.getSignature(), taskStatus);
-            request.setSnapshotPath(snapshotPath);
-            Assert.assertTrue(job.finishTabletSnapshotTask(task, request));
-        }
-
-        job.run();
-        Assert.assertEquals(Status.OK, job.getStatus());
-        Assert.assertEquals(RestoreJobState.DOWNLOAD, job.getState());
-
-        // download
-        AgentTaskQueue.clearAllTasks();
-        job.run();
-        Assert.assertEquals(Status.OK, job.getStatus());
-        Assert.assertEquals(RestoreJobState.DOWNLOADING, job.getState());
-        Assert.assertEquals(9, AgentTaskQueue.getTaskNum());
-
-        // downloading
-        job.run();
-        Assert.assertEquals(Status.OK, job.getStatus());
-        Assert.assertEquals(RestoreJobState.DOWNLOADING, job.getState());
-
-        List<AgentTask> downloadTasks = Lists.newArrayList();
-        runningTasks = Maps.newHashMap();
-        downloadTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND1_ID, runningTasks));
-        downloadTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND2_ID, runningTasks));
-        downloadTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND3_ID, runningTasks));
-        Assert.assertEquals(9, downloadTasks.size());
-        
-        List<Long> downloadedTabletIds = Lists.newArrayList();
-        for (AgentTask agentTask : downloadTasks) {
-            TStatus taskStatus = new TStatus(TStatusCode.OK);
-            TBackend tBackend = new TBackend("", 0, 1);
-            TFinishTaskRequest request = new TFinishTaskRequest(tBackend, TTaskType.MAKE_SNAPSHOT,
-                    agentTask.getSignature(), taskStatus);
-            request.setDownloadedTabletIds(downloadedTabletIds);
-            Assert.assertTrue(job.finishTabletDownloadTask((DownloadTask) agentTask, request));
-        }
-
-        job.run();
-        Assert.assertEquals(Status.OK, job.getStatus());
-        Assert.assertEquals(RestoreJobState.COMMIT, job.getState());
-
-        // commit
-        AgentTaskQueue.clearAllTasks();
-        job.run();
-        Assert.assertEquals(Status.OK, job.getStatus());
-        Assert.assertEquals(RestoreJobState.COMMITTING, job.getState());
-        Assert.assertEquals(12, AgentTaskQueue.getTaskNum());
-
-        // committing
-        job.run();
-        Assert.assertEquals(Status.OK, job.getStatus());
-        Assert.assertEquals(RestoreJobState.COMMITTING, job.getState());
-
-        List<AgentTask> dirMoveTasks = Lists.newArrayList();
-        runningTasks = Maps.newHashMap();
-        dirMoveTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND1_ID, runningTasks));
-        dirMoveTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND2_ID, runningTasks));
-        dirMoveTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND3_ID, runningTasks));
-        Assert.assertEquals(12, dirMoveTasks.size());
-        
-        for (AgentTask agentTask : dirMoveTasks) {
-            TStatus taskStatus = new TStatus(TStatusCode.OK);
-            TBackend tBackend = new TBackend("", 0, 1);
-            TFinishTaskRequest request = new TFinishTaskRequest(tBackend, TTaskType.MAKE_SNAPSHOT,
-                    agentTask.getSignature(), taskStatus);
-            job.finishDirMoveTask((DirMoveTask) agentTask, request);
-        }
-
-        job.run();
-        Assert.assertEquals(Status.OK, job.getStatus());
-        Assert.assertEquals(RestoreJobState.FINISHED, job.getState());
-    }
-
     @Test
     public void testSignature() throws AnalysisException {
         Adler32 sig1 = new Adler32();
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
index 24b3c89bbc..e4f97607d9 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
@@ -241,8 +241,7 @@ public class CreateTableTest {
                         + "properties('replication_num' = '1', 'short_key' = '4');"));
 
         ExceptionChecker
-                .expectThrowsWithMsg(DdlException.class, "Failed to find enough host with storage medium and " +
-                                "tag(NaN/{\"location\" : \"default\"}) in all backends. need: 3",
+                .expectThrowsWithMsg(DdlException.class, "Failed to find 3 backends for policy",
                         () -> createTable("create table test.atbl5\n" + "(k1 int, k2 int, k3 int)\n"
                                 + "duplicate key(k1, k2, k3)\n" + "distributed by hash(k1) buckets 1\n"
                                 + "properties('replication_num' = '3');"));
@@ -259,8 +258,7 @@ public class CreateTableTest {
 
         ConfigBase.setMutableConfig("disable_storage_medium_check", "false");
         ExceptionChecker
-                .expectThrowsWithMsg(DdlException.class, "Failed to find enough host with storage medium and " +
-                                "tag(SSD/{\"location\" : \"default\"}) in all backends. need: 1",
+                .expectThrowsWithMsg(DdlException.class, " Failed to find 1 backends for policy:",
                         () -> createTable("create table test.tb7(key1 int, key2 varchar(10)) distributed by hash(key1) \n"
                                 + "buckets 1 properties('replication_num' = '1', 'storage_medium' = 'ssd');"));
 
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java
index d755a6a65e..1ded070267 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java
@@ -86,7 +86,7 @@ public class ModifyBackendTest {
                 ");";
         CreateTableStmt createStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createStr, connectContext);
         ExceptionChecker.expectThrowsWithMsg(DdlException.class,
-                "Failed to find enough host with storage medium and tag(HDD/{\"location\" : \"default\"}) in all backends. need: 1",
+                "Failed to find 1 backends for policy:",
                 () -> DdlExecutor.execute(Catalog.getCurrentCatalog(), createStmt));
 
         createStr = "create table test.tbl1(\n" +
@@ -119,7 +119,7 @@ public class ModifyBackendTest {
         Database db = Catalog.getCurrentCatalog().getDbNullable("default_cluster:test");
         Table tbl3 = db.getTableNullable("tbl3");
         String err = Catalog.getCurrentCatalog().getDynamicPartitionScheduler().getRuntimeInfo(tbl3.getId(), DynamicPartitionScheduler.CREATE_PARTITION_MSG);
-        Assert.assertTrue(err.contains("Failed to find enough host with storage medium and tag"));
+        Assert.assertTrue(err.contains("Failed to find 1 backends for policy:"));
 
         createStr = "create table test.tbl4(\n" +
                 "k1 date, k2 int\n" +
@@ -171,7 +171,7 @@ public class ModifyBackendTest {
                 + " set ('replication_allocation' = 'tag.location.zonex:1')";
         AlterTableStmt alterStmt2 = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStr, connectContext);
         ExceptionChecker.expectThrowsWithMsg(DdlException.class,
-                "Failed to find enough host with tag({\"location\" : \"zonex\"}) in all backends. need: 1",
+                "Failed to find enough host with tag",
                 () -> DdlExecutor.execute(Catalog.getCurrentCatalog(), alterStmt2));
         tblProperties = tableProperty.getProperties();
         Assert.assertTrue(tblProperties.containsKey("default.replication_allocation"));
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
index a3051c65dd..3b1b4eddfc 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
@@ -20,12 +20,12 @@ package org.apache.doris.load.sync.canal;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.ReplicaAllocation;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.planner.StreamLoadPlanner;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.Types;
-import org.apache.doris.resource.Tag;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
@@ -59,7 +59,6 @@ import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
-
 import mockit.Expectations;
 import mockit.Mock;
 import mockit.MockUp;
@@ -150,8 +149,8 @@ public class CanalSyncDataTest {
                 minTimes = 0;
                 result = execPlanFragmentParams;
 
-                systemInfoService.seqChooseBackendIdsByStorageMediumAndTag(anyInt, (SystemInfoService.BeAvailablePredicate) any, anyBoolean, anyString,
-                        (TStorageMedium) any, (Tag) any);
+                systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any,
+                        anyString, (TStorageMedium) any);
                 minTimes = 0;
                 result = backendIds;
 
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/MultiLoadMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/MultiLoadMgrTest.java
index 8d4d09ae13..bd099083b1 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/MultiLoadMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/MultiLoadMgrTest.java
@@ -20,9 +20,8 @@ package org.apache.doris.qe;
 import org.apache.doris.backup.CatalogMocker;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.DdlException;
-import org.apache.doris.resource.Tag;
+import org.apache.doris.system.BeSelectionPolicy;
 import org.apache.doris.system.SystemInfoService;
-import org.apache.doris.thrift.TStorageMedium;
 
 import com.google.common.collect.Lists;
 
@@ -31,7 +30,6 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.List;
-
 import mockit.Delegate;
 import mockit.Expectations;
 import mockit.Mocked;
@@ -45,7 +43,7 @@ public class MultiLoadMgrTest {
     @Mocked
     private SystemInfoService systemInfoService;
     @Before
-    public void setUp() {
+    public void setUp() throws Exception {
         new Expectations() {
             {
                 ConnectContext.get();
@@ -62,13 +60,10 @@ public class MultiLoadMgrTest {
         };
         new Expectations() {
             {
-                systemInfoService.seqChooseBackendIdsByStorageMediumAndTag(anyInt, (SystemInfoService.BeAvailablePredicate) any,
-                        anyBoolean, anyString, (TStorageMedium) any, (Tag) any);
+                systemInfoService.selectBackendIdsByPolicy((BeSelectionPolicy) any, anyInt);
                 minTimes = 0;
                 result = new Delegate() {
-                    public synchronized List<Long> seqChooseBackendIdsByStorageMediumAndTag(
-                            int backendNum, SystemInfoService.BeAvailablePredicate availablePredicate,
-                            boolean isCreate, String clusterName, TStorageMedium medium, Tag tag) {
+                    public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) {
                         List<Long> beIds = Lists.newArrayList();
                         beIds.add(CatalogMocker.BACKEND1_ID);
                         beIds.add(CatalogMocker.BACKEND2_ID);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
new file mode 100644
index 0000000000..b2570095a0
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
@@ -0,0 +1,268 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.system;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.DiskInfo;
+import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.persist.EditLog;
+import org.apache.doris.resource.Tag;
+import org.apache.doris.thrift.TStorageMedium;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import mockit.Expectations;
+import mockit.Mocked;
+
+public class SystemInfoServiceTest {
+
+    @Mocked
+    private Catalog catalog;
+    @Mocked
+    private EditLog editLog;
+
+    private SystemInfoService infoService;
+
+    @Before
+    public void setUp() {
+        new Expectations() {
+            {
+                catalog.getEditLog();
+                minTimes = 0;
+                result = editLog;
+
+                editLog.logAddBackend((Backend) any);
+                minTimes = 0;
+
+                Catalog.getCurrentCatalog();
+                minTimes = 0;
+                result = catalog;
+            }
+        };
+
+        infoService = new SystemInfoService();
+    }
+
+    private void addBackend(long beId, String host, int hbPort) {
+        Backend backend = new Backend(beId, host, hbPort);
+        infoService.addBackend(backend);
+    }
+
+    @Test
+    public void testSelectBackendIdsByPolicy() throws Exception {
+        // 1. no backend
+        BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().build();
+        Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy, 1).size());
+        Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy, 4).size());
+
+        // 2. add one backend but not alive
+        addBackend(10001, "192.168.1.1", 9050);
+        Backend be1 = infoService.getBackend(10001);
+        Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy, 1).size());
+        Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy, 0).size());
+        // policy with no condition
+        BeSelectionPolicy policy2 = new BeSelectionPolicy.Builder().build();
+        Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy2, 1).size());
+
+        // 3. add more backends
+        addBackend(10002, "192.168.1.2", 9050);
+        Backend be2 = infoService.getBackend(10002);
+        be2.setAlive(true);
+        addBackend(10003, "192.168.1.3", 9050);
+        Backend be3 = infoService.getBackend(10003);
+        be3.setAlive(true);
+        addBackend(10004, "192.168.1.4", 9050);
+        Backend be4 = infoService.getBackend(10004);
+        be4.setAlive(true);
+        addBackend(10005, "192.168.1.5", 9050);
+        Backend be5 = infoService.getBackend(10005);
+
+        // b1 and be5 is dead, be2,3,4 is alive
+        BeSelectionPolicy policy3 = new BeSelectionPolicy.Builder().needScheduleAvailable().build();
+        Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy3, 1).size());
+        Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy3, 1).contains(10001L));
+        Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy3, 1).contains(10005L));
+        Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy3, 2).size());
+        Assert.assertEquals(3, infoService.selectBackendIdsByPolicy(policy3, 3).size());
+        Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy3, 3).contains(10002L));
+        Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy3, 3).contains(10003L));
+        Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy3, 3).contains(10004L));
+        Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy3, 4).size());
+
+        // 4. set be status
+        be2.setLoadDisabled(true);
+        be3.setQueryDisabled(true);
+        be4.setDecommissioned(true);
+        // now, only b3,b4 is loadable, only be2,b4 is queryable, only be2,3 is schedulable
+        BeSelectionPolicy policy4 = new BeSelectionPolicy.Builder().needScheduleAvailable().build();
+        Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy4, 1).size());
+        Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy4, 1).contains(10001L));
+        Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy4, 1).contains(10004L));
+        Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy4, 1).contains(10005L));
+        Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy4, 2).size());
+        Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy4, 2).contains(10002L));
+        Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy4, 2).contains(10003L));
+        Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy4, 3).size());
+
+        BeSelectionPolicy policy5 = new BeSelectionPolicy.Builder().needLoadAvailable().build();
+        Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy5, 1).size());
+        Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy5, 1).contains(10001L));
+        Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy5, 1).contains(10002L));
+        Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy5, 1).contains(10005L));
+        Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy5, 2).size());
+        Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy5, 2).contains(10003L));
+        Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy5, 2).contains(10004L));
+
+        // 5. set tags
+        // reset all be
+        be1.setAlive(true);
+        be2.setLoadDisabled(false);
+        be3.setQueryDisabled(false);
+        be5.setAlive(true);
+        be3.setAlive(true);
+        be4.setAlive(true);
+        be4.setDecommissioned(false);
+        be5.setAlive(true);
+        BeSelectionPolicy policy6 = new BeSelectionPolicy.Builder().needQueryAvailable().build();
+        Assert.assertEquals(5, infoService.selectBackendIdsByPolicy(policy6, 5).size());
+
+        Tag taga = Tag.create(Tag.TYPE_LOCATION, "taga");
+        Tag tagb = Tag.create(Tag.TYPE_LOCATION, "tagb");
+        be1.setTag(taga);
+        be2.setTag(taga);
+        be3.setTag(tagb);
+        be4.setTag(tagb);
+        be5.setTag(tagb);
+
+        BeSelectionPolicy policy7 = new BeSelectionPolicy.Builder().needQueryAvailable().addTags(Sets.newHashSet(taga)).build();
+        Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy7, 1).size());
+        Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy7, 2).size());
+        Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy7, 2).contains(10001L));
+        Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy7, 2).contains(10002L));
+        Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy7, 3).size());
+
+        BeSelectionPolicy policy8 = new BeSelectionPolicy.Builder().needQueryAvailable().addTags(Sets.newHashSet(tagb)).build();
+        Assert.assertEquals(3, infoService.selectBackendIdsByPolicy(policy8, 3).size());
+        Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy8, 3).contains(10003L));
+        Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy8, 3).contains(10004L));
+        Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy8, 3).contains(10005L));
+
+        BeSelectionPolicy policy9 = new BeSelectionPolicy.Builder().needQueryAvailable().addTags(Sets.newHashSet(taga, tagb)).build();
+        Assert.assertEquals(5, infoService.selectBackendIdsByPolicy(policy9, 5).size());
+
+        // 6. check storage medium
+        addDisk(be1, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 1 * 1024 * 1024L);
+        addDisk(be2, "path2", TStorageMedium.SSD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
+        addDisk(be3, "path3", TStorageMedium.SSD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
+        addDisk(be4, "path4", TStorageMedium.SSD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
+        addDisk(be5, "path5", TStorageMedium.SSD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
+
+        BeSelectionPolicy policy10 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga, tagb))
+                .setStorageMedium(TStorageMedium.SSD).build();
+        Assert.assertEquals(4, infoService.selectBackendIdsByPolicy(policy10, 4).size());
+        Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy10, 5).size());
+
+        BeSelectionPolicy policy11 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(tagb))
+                .setStorageMedium(TStorageMedium.HDD).build();
+        Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy11, 1).size());
+
+        // 7. check disk usage
+        BeSelectionPolicy policy12 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
+                .setStorageMedium(TStorageMedium.HDD).build();
+        Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy12, 1).size());
+        BeSelectionPolicy policy13 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
+                .setStorageMedium(TStorageMedium.HDD).needCheckDiskUsage().build();
+        Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy13, 1).size());
+
+        // 8. check same host
+        addBackend(10006, "192.168.1.1", 9051);
+        Backend be6 = infoService.getBackend(10006);
+        be6.setTag(taga);
+        be6.setAlive(true);
+        addDisk(be1, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 100 * 1024 * 1024L);
+        addDisk(be6, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 100 * 1024 * 1024L);
+        BeSelectionPolicy policy14 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
+                .setStorageMedium(TStorageMedium.HDD).build();
+        Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy14, 2).size());
+        BeSelectionPolicy policy15 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
+                .setStorageMedium(TStorageMedium.HDD).allowOnSameHost().build();
+        Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy15, 2).size());
+    }
+
+    @Test
+    public void testSelectBackendIdsForReplicaCreation() throws Exception {
+        addBackend(10001, "192.168.1.1", 9050);
+        Backend be1 = infoService.getBackend(10001);
+        addDisk(be1, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
+        be1.setAlive(true);
+        addBackend(10002, "192.168.1.2", 9050);
+        Backend be2 = infoService.getBackend(10002);
+        addDisk(be2, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
+        be2.setAlive(true);
+        addBackend(10003, "192.168.1.3", 9050);
+        Backend be3 = infoService.getBackend(10003);
+        addDisk(be3, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
+        be3.setAlive(true);
+        addBackend(10004, "192.168.1.4", 9050);
+        Backend be4 = infoService.getBackend(10004);
+        addDisk(be4, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
+        be4.setAlive(true);
+        addBackend(10005, "192.168.1.5", 9050);
+        Backend be5 = infoService.getBackend(10005);
+        addDisk(be5, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
+        be5.setAlive(true);
+
+        ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
+        // also check if the random selection logic can evenly distribute the replica.
+        Map<Long, Integer> beCounterMap = Maps.newHashMap();
+        for (int i = 0; i < 10000; ++i) {
+            Map<Tag, List<Long>> res = infoService.selectBackendIdsForReplicaCreation(replicaAlloc,
+                    SystemInfoService.DEFAULT_CLUSTER, TStorageMedium.HDD);
+            Assert.assertEquals(3, res.get(Tag.DEFAULT_BACKEND_TAG).size());
+            for (Long beId : res.get(Tag.DEFAULT_BACKEND_TAG)) {
+                beCounterMap.put(beId, beCounterMap.getOrDefault(beId, 0) + 1);
+            }
+        }
+        System.out.println(beCounterMap);
+        List<Integer> list = Lists.newArrayList(beCounterMap.values());
+        Collections.sort(list);
+        int diff = list.get(list.size() - 1) - list.get(0);
+        // The max replica num and min replica num's diff is less than 5%.
+        Assert.assertTrue((diff * 1.0 / list.get(0)) < 0.05);
+    }
+
+    private void addDisk(Backend be, String path, TStorageMedium medium, long totalB, long availB) {
+        DiskInfo diskInfo1 = new DiskInfo(path);
+        diskInfo1.setTotalCapacityB(totalB);
+        diskInfo1.setAvailableCapacityB(availB);
+        diskInfo1.setStorageMedium(medium);
+        Map<String, DiskInfo> map = Maps.newHashMap();
+        map.put(diskInfo1.getRootPath(), diskInfo1);
+        be.setDisks(ImmutableMap.copyOf(map));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 06/17: [performance][query]improve the performance of DISTINCT aggregation by using flat hash set replace unordered set (#9401)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 343f97f028bc33bae07a869864fb034bd5ceb462
Author: shee <13...@users.noreply.github.com>
AuthorDate: Fri May 6 17:43:14 2022 -0700

    [performance][query]improve the performance of DISTINCT aggregation by using flat hash set replace unordered set (#9401)
    
    Co-authored-by: shizhiqiang03 <sh...@meituan.com>
---
 be/src/exprs/aggregate_functions.cpp | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git a/be/src/exprs/aggregate_functions.cpp b/be/src/exprs/aggregate_functions.cpp
index 700851aad0..3e2f4b3fe5 100644
--- a/be/src/exprs/aggregate_functions.cpp
+++ b/be/src/exprs/aggregate_functions.cpp
@@ -1313,7 +1313,8 @@ private:
         }
     };
 
-    std::unordered_set<T, NumericHashHelper> _set;
+    phmap::flat_hash_set<T, NumericHashHelper> _set;
+
     // Because Anyval does not provide the hash function, in order
     // to adopt the type different from the template, the pointer is used
     // HybridSetBase* _set;
@@ -1470,7 +1471,8 @@ public:
 private:
     const int DECIMAL_BYTE_SIZE = 16;
 
-    std::unordered_set<DecimalV2Value> _set;
+    phmap::flat_hash_set<DecimalV2Value> _set;
+
     FunctionContext::Type _type;
 };
 
@@ -1555,7 +1557,8 @@ private:
     const int DATETIME_PACKED_TIME_BYTE_SIZE = 8;
     const int DATETIME_TYPE_BYTE_SIZE = 4;
 
-    std::unordered_set<DateTimeVal, DateTimeHashHelper> _set;
+    phmap::flat_hash_set<DateTimeVal, DateTimeHashHelper> _set;
+
     FunctionContext::Type _type;
 };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 12/17: [Bug] (load) Broker load kerberos auth fail (#9494)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 72e80756e025b3270034e408c224e891b18a4156
Author: Hui Tian <82...@qq.com>
AuthorDate: Thu May 12 15:43:29 2022 +0800

    [Bug] (load) Broker load kerberos auth fail (#9494)
---
 .../main/java/org/apache/doris/broker/hdfs/FileSystemManager.java   | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index 2c00a2ae96..64a875c0ae 100644
--- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -45,6 +45,7 @@ import java.net.InetAddress;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileLock;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.security.PrivilegedExceptionAction;
@@ -285,10 +286,13 @@ public class FileSystemManager {
                         tmpFilePath ="/tmp/." +
                                 principal.replace('/', '_') +
                                 "_" + Long.toString(currentTime) +
-                                "_" + Integer.toString(randNumber);
+                                "_" + Integer.toString(randNumber) +
+                                "_" + Thread.currentThread().getId();
                         logger.info("create kerberos tmp file" + tmpFilePath);
                         FileOutputStream fileOutputStream = new FileOutputStream(tmpFilePath);
+                        FileLock lock = fileOutputStream.getChannel().lock();
                         fileOutputStream.write(base64decodedBytes);
+                        lock.release();
                         fileOutputStream.close();
                         keytab = tmpFilePath;
                     } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 16/17: [enhancement][betarowset]optimize lz4 compress and decompress speed by reusing context (#9566)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 53fb90f04f2dd997f4b6235cc57011eaf102b506
Author: Kang <kx...@gmail.com>
AuthorDate: Sun May 15 21:18:32 2022 +0800

    [enhancement][betarowset]optimize lz4 compress and decompress speed by reusing context (#9566)
---
 be/src/olap/rowset/segment_v2/column_reader.cpp    |  4 +-
 be/src/olap/rowset/segment_v2/column_reader.h      |  2 +-
 be/src/olap/rowset/segment_v2/column_writer.cpp    |  8 +-
 be/src/olap/rowset/segment_v2/column_writer.h      |  2 +-
 .../rowset/segment_v2/indexed_column_reader.cpp    |  4 +-
 .../olap/rowset/segment_v2/indexed_column_reader.h |  2 +-
 .../rowset/segment_v2/indexed_column_writer.cpp    |  6 +-
 .../olap/rowset/segment_v2/indexed_column_writer.h |  2 +-
 be/src/runtime/mem_pool.cpp                        | 11 +++
 be/src/runtime/mem_pool.h                          |  5 ++
 be/src/util/block_compression.cpp                  | 93 +++++++++++++---------
 be/src/util/block_compression.h                    |  4 +-
 be/test/util/block_compression_test.cpp            | 12 +--
 13 files changed, 95 insertions(+), 60 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp
index a67793463f..982de671d7 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -105,7 +105,7 @@ Status ColumnReader::init() {
                 strings::Substitute("unsupported typeinfo, type=$0", _meta.type()));
     }
     RETURN_IF_ERROR(EncodingInfo::get(_type_info, _meta.encoding(), &_encoding_info));
-    RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), &_compress_codec));
+    RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), _compress_codec));
 
     for (int i = 0; i < _meta.indexes_size(); i++) {
         auto& index_meta = _meta.indexes(i);
@@ -146,7 +146,7 @@ Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const Pag
     PageReadOptions opts;
     opts.rblock = iter_opts.rblock;
     opts.page_pointer = pp;
-    opts.codec = _compress_codec;
+    opts.codec = _compress_codec.get();
     opts.stats = iter_opts.stats;
     opts.verify_checksum = _opts.verify_checksum;
     opts.use_page_cache = iter_opts.use_page_cache;
diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h
index a8b0263aac..99206bd53a 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.h
+++ b/be/src/olap/rowset/segment_v2/column_reader.h
@@ -174,7 +174,7 @@ private:
     const TypeInfo* _type_info = nullptr; // initialized in init(), may changed by subclasses.
     const EncodingInfo* _encoding_info =
             nullptr; // initialized in init(), used for create PageDecoder
-    const BlockCompressionCodec* _compress_codec = nullptr; // initialized in init()
+    std::unique_ptr<BlockCompressionCodec> _compress_codec; // initialized in init()
 
     // meta for various column indexes (null if the index is absent)
     const ZoneMapIndexPB* _zone_map_index_meta = nullptr;
diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp
index 4c5623250f..bcf2b2a338 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/column_writer.cpp
@@ -221,7 +221,7 @@ ScalarColumnWriter::~ScalarColumnWriter() {
 }
 
 Status ScalarColumnWriter::init() {
-    RETURN_IF_ERROR(get_block_compression_codec(_opts.meta->compression(), &_compress_codec));
+    RETURN_IF_ERROR(get_block_compression_codec(_opts.meta->compression(), _compress_codec));
 
     PageBuilder* page_builder = nullptr;
 
@@ -377,7 +377,7 @@ Status ScalarColumnWriter::write_data() {
         footer.mutable_dict_page_footer()->set_encoding(PLAIN_ENCODING);
 
         PagePointer dict_pp;
-        RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec,
+        RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec.get(),
                                                         _opts.compression_min_space_saving, _wblock,
                                                         {dict_body.slice()}, footer, &dict_pp));
         dict_pp.to_proto(_opts.meta->mutable_dict_page());
@@ -465,8 +465,8 @@ Status ScalarColumnWriter::finish_current_page() {
     }
     // trying to compress page body
     OwnedSlice compressed_body;
-    RETURN_IF_ERROR(PageIO::compress_page_body(_compress_codec, _opts.compression_min_space_saving,
-                                               body, &compressed_body));
+    RETURN_IF_ERROR(PageIO::compress_page_body(
+            _compress_codec.get(), _opts.compression_min_space_saving, body, &compressed_body));
     if (compressed_body.slice().empty()) {
         // page body is uncompressed
         page->data.emplace_back(std::move(encoded_values));
diff --git a/be/src/olap/rowset/segment_v2/column_writer.h b/be/src/olap/rowset/segment_v2/column_writer.h
index b98f4883ca..6235b313b2 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.h
+++ b/be/src/olap/rowset/segment_v2/column_writer.h
@@ -244,7 +244,7 @@ private:
     PageHead _pages;
     ordinal_t _first_rowid = 0;
 
-    const BlockCompressionCodec* _compress_codec = nullptr;
+    std::unique_ptr<BlockCompressionCodec> _compress_codec;
 
     std::unique_ptr<OrdinalIndexWriter> _ordinal_index_builder;
     std::unique_ptr<ZoneMapIndexWriter> _zone_map_index_builder;
diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
index 67a873fae1..3ee72c8a11 100644
--- a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
@@ -37,7 +37,7 @@ Status IndexedColumnReader::load(bool use_page_cache, bool kept_in_memory) {
                 strings::Substitute("unsupported typeinfo, type=$0", _meta.data_type()));
     }
     RETURN_IF_ERROR(EncodingInfo::get(_type_info, _meta.encoding(), &_encoding_info));
-    RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), &_compress_codec));
+    RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), _compress_codec));
     _value_key_coder = get_key_coder(_type_info->type());
 
     std::unique_ptr<fs::ReadableBlock> rblock;
@@ -82,7 +82,7 @@ Status IndexedColumnReader::read_page(fs::ReadableBlock* rblock, const PagePoint
     PageReadOptions opts;
     opts.rblock = rblock;
     opts.page_pointer = pp;
-    opts.codec = _compress_codec;
+    opts.codec = _compress_codec.get();
     OlapReaderStatistics tmp_stats;
     opts.stats = &tmp_stats;
     opts.use_page_cache = _use_page_cache;
diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.h b/be/src/olap/rowset/segment_v2/indexed_column_reader.h
index a1030586ab..3c464f32f0 100644
--- a/be/src/olap/rowset/segment_v2/indexed_column_reader.h
+++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.h
@@ -84,7 +84,7 @@ private:
 
     const TypeInfo* _type_info = nullptr;
     const EncodingInfo* _encoding_info = nullptr;
-    const BlockCompressionCodec* _compress_codec = nullptr;
+    std::unique_ptr<BlockCompressionCodec> _compress_codec;
     const KeyCoder* _value_key_coder = nullptr;
 };
 
diff --git a/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp b/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp
index 088de6940e..a69b641d32 100644
--- a/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp
@@ -73,7 +73,7 @@ Status IndexedColumnWriter::init() {
     }
 
     if (_options.compression != NO_COMPRESSION) {
-        RETURN_IF_ERROR(get_block_compression_codec(_options.compression, &_compress_codec));
+        RETURN_IF_ERROR(get_block_compression_codec(_options.compression, _compress_codec));
     }
     return Status::OK();
 }
@@ -111,7 +111,7 @@ Status IndexedColumnWriter::_finish_current_data_page() {
     footer.mutable_data_page_footer()->set_num_values(num_values_in_page);
     footer.mutable_data_page_footer()->set_nullmap_size(0);
 
-    RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec,
+    RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec.get(),
                                                     _options.compression_min_space_saving, _wblock,
                                                     {page_body.slice()}, footer, &_last_data_page));
     _num_data_pages++;
@@ -160,7 +160,7 @@ Status IndexedColumnWriter::_flush_index(IndexPageBuilder* index_builder, BTreeM
 
         PagePointer pp;
         RETURN_IF_ERROR(PageIO::compress_and_write_page(
-                _compress_codec, _options.compression_min_space_saving, _wblock,
+                _compress_codec.get(), _options.compression_min_space_saving, _wblock,
                 {page_body.slice()}, page_footer, &pp));
 
         meta->set_is_root_data_page(false);
diff --git a/be/src/olap/rowset/segment_v2/indexed_column_writer.h b/be/src/olap/rowset/segment_v2/indexed_column_writer.h
index bcb27f4343..b33c5d4a82 100644
--- a/be/src/olap/rowset/segment_v2/indexed_column_writer.h
+++ b/be/src/olap/rowset/segment_v2/indexed_column_writer.h
@@ -110,7 +110,7 @@ private:
     std::unique_ptr<IndexPageBuilder> _value_index_builder;
     // encoder for value index's key
     const KeyCoder* _value_key_coder;
-    const BlockCompressionCodec* _compress_codec;
+    std::unique_ptr<BlockCompressionCodec> _compress_codec;
 
     DISALLOW_COPY_AND_ASSIGN(IndexedColumnWriter);
 };
diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp
index 212b88e7de..64c6458351 100644
--- a/be/src/runtime/mem_pool.cpp
+++ b/be/src/runtime/mem_pool.cpp
@@ -41,6 +41,17 @@ MemPool::ChunkInfo::ChunkInfo(const Chunk& chunk_) : chunk(chunk_), allocated_by
     DorisMetrics::instance()->memory_pool_bytes_total->increment(chunk.size);
 }
 
+
+MemPool::MemPool(const std::string& label)
+        : current_chunk_idx_(-1),
+          next_chunk_size_(INITIAL_CHUNK_SIZE),
+          total_allocated_bytes_(0),
+          total_reserved_bytes_(0),
+          peak_allocated_bytes_(0) {
+    mem_tracker_own_ = MemTracker::CreateTracker(-1, label + ":MemPool");
+    mem_tracker_ = mem_tracker_own_.get();
+}
+
 MemPool::~MemPool() {
     int64_t total_bytes_released = 0;
     for (auto& chunk : chunks_) {
diff --git a/be/src/runtime/mem_pool.h b/be/src/runtime/mem_pool.h
index 397d2cde41..63a73db764 100644
--- a/be/src/runtime/mem_pool.h
+++ b/be/src/runtime/mem_pool.h
@@ -99,6 +99,8 @@ public:
         DCHECK(mem_tracker != nullptr);
     }
 
+    MemPool(const std::string& label);
+
     /// Frees all chunks of memory and subtracts the total allocated bytes
     /// from the registered limits.
     ~MemPool();
@@ -302,6 +304,9 @@ private:
     /// The current and peak memory footprint of this pool. This is different from
     /// total allocated_bytes_ since it includes bytes in chunks that are not used.
     MemTracker* mem_tracker_;
+
+    // TODO(zxy) temp variable, In the future, mem trackers should all use raw pointers.
+    std::shared_ptr<MemTracker> mem_tracker_own_;
 };
 
 // Stamp out templated implementations here so they're included in IR module
diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp
index 1b0f8143e2..a1ee74f047 100644
--- a/be/src/util/block_compression.cpp
+++ b/be/src/util/block_compression.cpp
@@ -88,46 +88,46 @@ public:
 // Used for LZ4 frame format, decompress speed is two times faster than LZ4.
 class Lz4fBlockCompression : public BlockCompressionCodec {
 public:
-    static const Lz4fBlockCompression* instance() {
-        static Lz4fBlockCompression s_instance;
-        return &s_instance;
-    }
-
-    ~Lz4fBlockCompression() override {}
+    Status init() override {
+        auto ret1 = LZ4F_createCompressionContext(&ctx_c, LZ4F_VERSION);
+        if (LZ4F_isError(ret1)) {
+            return Status::InvalidArgument(strings::Substitute(
+                    "Fail to LZ4F_createCompressionContext, msg=$0", LZ4F_getErrorName(ret1)));
+        }
+        ctx_c_inited = true;
 
-    Status compress(const Slice& input, Slice* output) const override {
-        auto compressed_len = LZ4F_compressFrame(output->data, output->size, input.data, input.size,
-                                                 &_s_preferences);
-        if (LZ4F_isError(compressed_len)) {
+        auto ret2 = LZ4F_createDecompressionContext(&ctx_d, LZ4F_VERSION);
+        if (LZ4F_isError(ret2)) {
             return Status::InvalidArgument(strings::Substitute(
-                    "Fail to do LZ4F compress frame, msg=$0", LZ4F_getErrorName(compressed_len)));
+                    "Fail to LZ4F_createDecompressionContext, msg=$0", LZ4F_getErrorName(ret2)));
         }
-        output->size = compressed_len;
+        ctx_d_inited = true;
+
         return Status::OK();
     }
 
+    ~Lz4fBlockCompression() override {
+        if (ctx_c_inited) LZ4F_freeCompressionContext(ctx_c);
+        if (ctx_d_inited) LZ4F_freeDecompressionContext(ctx_d);
+    }
+
+    Status compress(const Slice& input, Slice* output) const override {
+        std::vector<Slice> inputs {input};
+        return compress(inputs, output);
+    }
+
     Status compress(const std::vector<Slice>& inputs, Slice* output) const override {
-        LZ4F_compressionContext_t ctx = nullptr;
-        auto lres = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
-        if (lres != 0) {
-            return Status::InvalidArgument(strings::Substitute("Fail to do LZ4F compress, res=$0",
-                                                               LZ4F_getErrorName(lres)));
-        }
-        auto st = _compress(ctx, inputs, output);
-        LZ4F_freeCompressionContext(ctx);
-        return st;
+        if (!ctx_c_inited)
+            return Status::InvalidArgument("LZ4F_createCompressionContext not sucess");
+
+        return _compress(ctx_c, inputs, output);
     }
 
     Status decompress(const Slice& input, Slice* output) const override {
-        LZ4F_decompressionContext_t ctx;
-        auto lres = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION);
-        if (LZ4F_isError(lres)) {
-            return Status::InvalidArgument(strings::Substitute("Fail to do LZ4F decompress, res=$0",
-                                                               LZ4F_getErrorName(lres)));
-        }
-        auto st = _decompress(ctx, input, output);
-        LZ4F_freeDecompressionContext(ctx);
-        return st;
+        if (!ctx_d_inited)
+            return Status::InvalidArgument("LZ4F_createDecompressionContext not sucess");
+
+        return _decompress(ctx_d, input, output);
     }
 
     size_t max_compressed_len(size_t len) const override {
@@ -167,6 +167,8 @@ private:
     }
 
     Status _decompress(LZ4F_decompressionContext_t ctx, const Slice& input, Slice* output) const {
+        // reset decompression context to avoid ERROR_maxBlockSize_invalid
+        LZ4F_resetDecompressionContext(ctx);
         size_t input_size = input.size;
         auto lres =
                 LZ4F_decompress(ctx, output->data, &output->size, input.data, &input_size, nullptr);
@@ -187,6 +189,10 @@ private:
 
 private:
     static LZ4F_preferences_t _s_preferences;
+    LZ4F_compressionContext_t ctx_c;
+    bool ctx_c_inited = false;
+    LZ4F_decompressionContext_t ctx_d;
+    bool ctx_d_inited = false;
 };
 
 LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = {
@@ -370,27 +376,38 @@ public:
 };
 
 Status get_block_compression_codec(segment_v2::CompressionTypePB type,
-                                   const BlockCompressionCodec** codec) {
+                                   std::unique_ptr<BlockCompressionCodec>& codec) {
+    BlockCompressionCodec* ptr = nullptr;
     switch (type) {
     case segment_v2::CompressionTypePB::NO_COMPRESSION:
-        *codec = nullptr;
-        break;
+        codec.reset(nullptr);
+        return Status::OK();
     case segment_v2::CompressionTypePB::SNAPPY:
-        *codec = SnappyBlockCompression::instance();
+        ptr = new SnappyBlockCompression();
         break;
     case segment_v2::CompressionTypePB::LZ4:
-        *codec = Lz4BlockCompression::instance();
+        ptr = new Lz4BlockCompression();
         break;
     case segment_v2::CompressionTypePB::LZ4F:
-        *codec = Lz4fBlockCompression::instance();
+        ptr = new Lz4fBlockCompression();
         break;
     case segment_v2::CompressionTypePB::ZLIB:
-        *codec = ZlibBlockCompression::instance();
+        ptr = new ZlibBlockCompression();
         break;
     default:
         return Status::NotFound(strings::Substitute("unknown compression type($0)", type));
     }
-    return Status::OK();
+
+    if (!ptr) return Status::NotFound("Failed to create compression codec");
+
+    Status st = ptr->init();
+    if (st.ok()) {
+        codec.reset(ptr);
+    } else {
+        delete ptr;
+    }
+
+    return st;
 }
 
 } // namespace doris
diff --git a/be/src/util/block_compression.h b/be/src/util/block_compression.h
index ff25113793..7ad3f9ecb7 100644
--- a/be/src/util/block_compression.h
+++ b/be/src/util/block_compression.h
@@ -34,6 +34,8 @@ class BlockCompressionCodec {
 public:
     virtual ~BlockCompressionCodec() {}
 
+    virtual Status init() { return Status::OK(); }
+
     // This function will compress input data into output.
     // output should be preallocated, and its capacity must be large enough
     // for compressed input, which can be get through max_compressed_len function.
@@ -61,6 +63,6 @@ public:
 //
 // Return not OK, if error happens.
 Status get_block_compression_codec(segment_v2::CompressionTypePB type,
-                                   const BlockCompressionCodec** codec);
+                                   std::unique_ptr<BlockCompressionCodec>& codec);
 
 } // namespace doris
diff --git a/be/test/util/block_compression_test.cpp b/be/test/util/block_compression_test.cpp
index 3eabd735b8..a339d54409 100644
--- a/be/test/util/block_compression_test.cpp
+++ b/be/test/util/block_compression_test.cpp
@@ -42,9 +42,9 @@ static std::string generate_str(size_t len) {
 }
 
 void test_single_slice(segment_v2::CompressionTypePB type) {
-    const BlockCompressionCodec* codec = nullptr;
-    auto st = get_block_compression_codec(type, &codec);
-    ASSERT_TRUE(st.ok());
+    std::unique_ptr<BlockCompressionCodec> codec;
+    auto st = get_block_compression_codec(type, codec);
+    EXPECT_TRUE(st.ok());
 
     size_t test_sizes[] = {0, 1, 10, 1000, 1000000};
     for (auto size : test_sizes) {
@@ -104,9 +104,9 @@ TEST_F(BlockCompressionTest, single) {
 }
 
 void test_multi_slices(segment_v2::CompressionTypePB type) {
-    const BlockCompressionCodec* codec = nullptr;
-    auto st = get_block_compression_codec(type, &codec);
-    ASSERT_TRUE(st.ok());
+    std::unique_ptr<BlockCompressionCodec> codec;
+    auto st = get_block_compression_codec(type, codec);
+    EXPECT_TRUE(st.ok());
 
     size_t test_sizes[] = {0, 1, 10, 1000, 1000000};
     std::vector<std::string> orig_strs;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 02/17: [fix](function) fix last_value get wrong result when have order by clause (#9247)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 3684be8a52ba51daa36fc1ddbf4cf4586bd4bbc7
Author: zhangstar333 <87...@users.noreply.github.com>
AuthorDate: Sun May 15 23:56:01 2022 +0800

    [fix](function) fix last_value get wrong result when have order by clause (#9247)
---
 .../aggregate_function_window.h                    |   6 +-
 be/test/vec/function/function_bitmap_test.cpp      | 135 +++++++++------------
 .../org/apache/doris/analysis/AnalyticExpr.java    |   5 +-
 .../data/correctness/test_last_value_window.out    |   8 ++
 .../correctness/test_last_value_window.groovy      |  51 ++++++++
 5 files changed, 119 insertions(+), 86 deletions(-)

diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.h b/be/src/vec/aggregate_functions/aggregate_function_window.h
index 133efe7ea0..23c40df7eb 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_window.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_window.h
@@ -180,7 +180,7 @@ private:
     std::string _copied_value;
 };
 
-template <typename T, bool is_nullable, bool is_string, typename StoreType = Value>
+template <typename T, bool result_is_nullable, bool is_string, typename StoreType = Value>
 struct LeadAndLagData {
 public:
     bool has_init() const { return _is_init; }
@@ -193,7 +193,7 @@ public:
     }
 
     void insert_result_into(IColumn& to) const {
-        if constexpr (is_nullable) {
+        if constexpr (result_is_nullable) {
             if (_data_value.is_null()) {
                 auto& col = assert_cast<ColumnNullable&>(to);
                 col.insert_default();
@@ -220,7 +220,7 @@ public:
     }
 
     void set_value(const IColumn** columns, int64_t pos) {
-        if constexpr (is_nullable) {
+        if (is_column_nullable(*columns[0])) {
             const auto* nullable_column = check_and_get_column<ColumnNullable>(columns[0]);
             if (nullable_column && nullable_column->is_null_at(pos)) {
                 _data_value.set_null(true);
diff --git a/be/test/vec/function/function_bitmap_test.cpp b/be/test/vec/function/function_bitmap_test.cpp
index 2341c9152e..4e8282f216 100644
--- a/be/test/vec/function/function_bitmap_test.cpp
+++ b/be/test/vec/function/function_bitmap_test.cpp
@@ -26,157 +26,135 @@ TEST(function_bitmap_test, function_bitmap_min_test) {
     std::string func_name = "bitmap_min";
     InputTypeSet input_types = {TypeIndex::BitMap};
 
-    auto bitmap1 = new BitmapValue(1);
-    auto bitmap2 = new BitmapValue(std::vector<uint64_t>({1, 9999999}));
-    auto empty_bitmap = new BitmapValue();
-    DataSet data_set = {{{bitmap1}, (int64_t)1},
-                        {{bitmap2}, (int64_t)1},
-                        {{empty_bitmap}, Null()},
+    BitmapValue bitmap1(1);
+    BitmapValue bitmap2({1, 9999999});
+    BitmapValue empty_bitmap;
+    DataSet data_set = {{{&bitmap1}, (int64_t)1},
+                        {{&bitmap2}, (int64_t)1},
+                        {{&empty_bitmap}, Null()},
                         {{Null()}, Null()}};
 
     check_function<DataTypeInt64, true>(func_name, input_types, data_set);
-    delete bitmap1;
-    delete bitmap2;
-    delete empty_bitmap;
 }
 TEST(function_bitmap_test, function_bitmap_max_test) {
     std::string func_name = "bitmap_max";
     InputTypeSet input_types = {TypeIndex::BitMap};
 
-    auto bitmap1 = new BitmapValue(1);
-    auto bitmap2 = new BitmapValue(std::vector<uint64_t>({1, 9999999}));
-    auto empty_bitmap = new BitmapValue();
-    DataSet data_set = {{{bitmap1}, (int64_t)1},
-                        {{bitmap2}, (int64_t)9999999},
-                        {{empty_bitmap}, Null()},
+    BitmapValue bitmap1(1);
+    BitmapValue bitmap2({1, 9999999});
+    BitmapValue empty_bitmap;
+    DataSet data_set = {{{&bitmap1}, (int64_t)1},
+                        {{&bitmap2}, (int64_t)9999999},
+                        {{&empty_bitmap}, Null()},
                         {{Null()}, Null()}};
 
     check_function<DataTypeInt64, true>(func_name, input_types, data_set);
-    delete bitmap1;
-    delete bitmap2;
-    delete empty_bitmap;
 }
 
 TEST(function_bitmap_test, function_bitmap_to_string_test) {
     std::string func_name = "bitmap_to_string";
     InputTypeSet input_types = {TypeIndex::BitMap};
 
-    auto bitmap1 = new BitmapValue(1);
-    auto bitmap2 = new BitmapValue(std::vector<uint64_t>({1, 9999999}));
-    auto empty_bitmap = new BitmapValue();
-    DataSet data_set = {{{bitmap1}, std::string("1")},
-                        {{bitmap2}, std::string("1,9999999")},
-                        {{empty_bitmap}, std::string("")},
+    BitmapValue bitmap1(1);
+    BitmapValue bitmap2({1, 9999999});
+    BitmapValue empty_bitmap;
+    DataSet data_set = {{{&bitmap1}, std::string("1")},
+                        {{&bitmap2}, std::string("1,9999999")},
+                        {{&empty_bitmap}, std::string("")},
                         {{Null()}, Null()}};
 
     check_function<DataTypeString, true>(func_name, input_types, data_set);
-    delete bitmap1;
-    delete bitmap2;
-    delete empty_bitmap;
 }
 
 TEST(function_bitmap_test, function_bitmap_and_count) {
     std::string func_name = "bitmap_and_count";
     InputTypeSet input_types = {TypeIndex::BitMap, TypeIndex::BitMap};
-    auto bitmap1 = new BitmapValue(std::vector<uint64_t>({1, 2, 3}));
-    auto bitmap2 = new BitmapValue(std::vector<uint64_t>({3, 4, 5}));
-    auto empty_bitmap = new BitmapValue();
-    DataSet data_set = {{{bitmap1, empty_bitmap}, (int64_t)0},
-                        {{bitmap1, bitmap1}, (int64_t)3},
-                        {{bitmap1, bitmap2}, (int64_t)1}};
+    BitmapValue bitmap1({1, 2, 3});
+    BitmapValue bitmap2({3, 4, 5});
+    BitmapValue empty_bitmap;
+    DataSet data_set = {{{&bitmap1, &empty_bitmap}, (int64_t)0},
+                        {{&bitmap1, &bitmap1}, (int64_t)3},
+                        {{&bitmap1, &bitmap2}, (int64_t)1}};
 
     check_function<DataTypeInt64, true>(func_name, input_types, data_set);
-    delete bitmap1;
-    delete bitmap2;
-    delete empty_bitmap;
 
     {
         InputTypeSet input_types = {TypeIndex::BitMap, TypeIndex::BitMap, TypeIndex::BitMap};
         BitmapValue bitmap1({33, 1, 2019});
         BitmapValue bitmap2({0, 33, std::numeric_limits<uint64_t>::min()});
         BitmapValue bitmap3({33, 5, std::numeric_limits<uint64_t>::max()});
-        auto empty_bitmap = new BitmapValue(); //test empty
+        BitmapValue empty_bitmap; //test empty
 
-        DataSet data_set = {{{&bitmap1, &bitmap2, empty_bitmap}, (int64_t)0},
+        DataSet data_set = {{{&bitmap1, &bitmap2, &empty_bitmap}, (int64_t)0},
                             {{&bitmap1, &bitmap2, &bitmap3}, (int64_t)1}, //33
                             {{&bitmap1, &bitmap2, Null()}, Null()},
                             {{&bitmap1, &bitmap3, &bitmap3}, (int64_t)1}}; //33
 
         check_function<DataTypeInt64, true>(func_name, input_types, data_set);
-        delete empty_bitmap;
     }
 }
 
 TEST(function_bitmap_test, function_bitmap_or_count) {
     std::string func_name = "bitmap_or_count";
     InputTypeSet input_types = {TypeIndex::BitMap, TypeIndex::BitMap};
-    auto bitmap1 = new BitmapValue(std::vector<uint64_t>({1, 2, 3}));
-    auto bitmap2 = new BitmapValue(std::vector<uint64_t>({1, 2, 3, 4}));
-    auto bitmap3 = new BitmapValue(std::vector<uint64_t>({2, 3}));
-    auto empty_bitmap = new BitmapValue();
-    DataSet data_set = {{{bitmap1, empty_bitmap}, (int64_t)3},
-                        {{bitmap2, bitmap3}, (int64_t)4},
-                        {{bitmap1, bitmap3}, (int64_t)3}};
+
+    BitmapValue bitmap1({1, 2, 3});
+    BitmapValue bitmap2({1, 2, 3, 4});
+    BitmapValue bitmap3({2, 3});
+    BitmapValue empty_bitmap;
+    DataSet data_set = {{{&bitmap1, &empty_bitmap}, (int64_t)3},
+                        {{&bitmap2, &bitmap3}, (int64_t)4},
+                        {{&bitmap1, &bitmap3}, (int64_t)3}};
 
     check_function<DataTypeInt64, true>(func_name, input_types, data_set);
-    delete bitmap1;
-    delete bitmap2;
-    delete bitmap3;
-    delete empty_bitmap;
 
     {
         InputTypeSet input_types = {TypeIndex::BitMap, TypeIndex::BitMap, TypeIndex::BitMap};
         BitmapValue bitmap1({1024, 1, 2019});
         BitmapValue bitmap2({0, 33, std::numeric_limits<uint64_t>::min()});
         BitmapValue bitmap3({33, 5, std::numeric_limits<uint64_t>::max()}); //18446744073709551615
-        auto empty_bitmap = new BitmapValue();                              //test empty
+        BitmapValue empty_bitmap;                                           //test empty
 
-        DataSet data_set = {{{&bitmap1, &bitmap2, empty_bitmap}, (int64_t)5}, //0,1,33,1024,2019
+        DataSet data_set = {{{&bitmap1, &bitmap2, &empty_bitmap}, (int64_t)5}, //0,1,33,1024,2019
                             {{&bitmap1, &bitmap2, &bitmap3},
                              (int64_t)7}, //0,1,5,33,1024,2019,18446744073709551615
-                            {{&bitmap1, empty_bitmap, Null()}, Null()},
+                            {{&bitmap1, &empty_bitmap, Null()}, Null()},
                             {{&bitmap1, &bitmap3, &bitmap3},
                              (int64_t)6}}; //1,5,33,1024,2019,18446744073709551615
 
         check_function<DataTypeInt64, true>(func_name, input_types, data_set);
-        delete empty_bitmap;
     }
 }
 
 TEST(function_bitmap_test, function_bitmap_xor_count) {
     std::string func_name = "bitmap_xor_count";
     InputTypeSet input_types = {TypeIndex::BitMap, TypeIndex::BitMap};
-    auto bitmap1 = new BitmapValue(std::vector<uint64_t>({1, 2, 3}));
-    auto bitmap2 = new BitmapValue(std::vector<uint64_t>({1, 2, 3, 4}));
-    auto bitmap3 = new BitmapValue(std::vector<uint64_t>({2, 3}));
-    auto bitmap4 = new BitmapValue(std::vector<uint64_t>({1, 2, 6}));
-    auto empty_bitmap = new BitmapValue();
-    DataSet data_set = {{{bitmap1, empty_bitmap}, (int64_t)3},
-                        {{bitmap2, bitmap3}, (int64_t)2},
-                        {{bitmap1, bitmap4}, (int64_t)2}};
+
+    BitmapValue bitmap1({1, 2, 3});
+    BitmapValue bitmap2({1, 2, 3, 4});
+    BitmapValue bitmap3({2, 3});
+    BitmapValue bitmap4({1, 2, 6});
+    BitmapValue empty_bitmap;
+    DataSet data_set = {{{&bitmap1, &empty_bitmap}, (int64_t)3},
+                        {{&bitmap2, &bitmap3}, (int64_t)2},
+                        {{&bitmap1, &bitmap4}, (int64_t)2}};
 
     check_function<DataTypeInt64, true>(func_name, input_types, data_set);
-    delete bitmap1;
-    delete bitmap2;
-    delete bitmap3;
-    delete bitmap4;
-    delete empty_bitmap;
 
     {
         InputTypeSet input_types = {TypeIndex::BitMap, TypeIndex::BitMap, TypeIndex::BitMap};
         BitmapValue bitmap1({1024, 1, 2019});
         BitmapValue bitmap2({0, 33, std::numeric_limits<uint64_t>::min()});
         BitmapValue bitmap3({33, 5, std::numeric_limits<uint64_t>::max()});
-        auto empty_bitmap = new BitmapValue(); //test empty
+        BitmapValue empty_bitmap; //test empty
 
         DataSet data_set = {
-                {{&bitmap1, &bitmap2, empty_bitmap}, (int64_t)5}, //0,1,33,1024,2019
+                {{&bitmap1, &bitmap2, &empty_bitmap}, (int64_t)5}, //0,1,33,1024,2019
                 {{&bitmap1, &bitmap2, &bitmap3}, (int64_t)6}, //0,1,5,1024,2019,18446744073709551615
-                {{&bitmap1, empty_bitmap, Null()}, Null()},
+                {{&bitmap1, &empty_bitmap, Null()}, Null()},
                 {{&bitmap1, &bitmap3, &bitmap3}, (int64_t)3}}; //1,1024,2019
 
         check_function<DataTypeInt64, true>(func_name, input_types, data_set);
-        delete empty_bitmap;
     }
 }
 
@@ -186,15 +164,14 @@ TEST(function_bitmap_test, function_bitmap_and_not_count) {
     BitmapValue bitmap1({1, 2, 3});
     BitmapValue bitmap2({3, 4, std::numeric_limits<uint64_t>::min()});
     BitmapValue bitmap3({33, 5, std::numeric_limits<uint64_t>::max()});
-    auto empty_bitmap = new BitmapValue();
+    BitmapValue empty_bitmap;
 
-    DataSet data_set = {{{&bitmap1, empty_bitmap}, (int64_t)3}, //1,2,3
+    DataSet data_set = {{{&bitmap1, &empty_bitmap}, (int64_t)3}, //1,2,3
                         {{&bitmap2, Null()}, Null()},
                         {{&bitmap2, &bitmap3}, (int64_t)3},  //0,3,4
                         {{&bitmap1, &bitmap2}, (int64_t)2}}; //1,2
 
     check_function<DataTypeInt64, true>(func_name, input_types, data_set);
-    delete empty_bitmap;
 }
 TEST(function_bitmap_test, function_bitmap_has_all) {
     std::string func_name = "bitmap_has_all";
@@ -207,18 +184,16 @@ TEST(function_bitmap_test, function_bitmap_has_all) {
     BitmapValue bitmap3 = BitmapValue({0, 1, 2});
     BitmapValue bitmap4 = BitmapValue({0, 1, 2, std::numeric_limits<uint64_t>::max()});
     BitmapValue bitmap5 = BitmapValue({0, 1, 2});
-    auto empty_bitmap1 = new BitmapValue();
-    auto empty_bitmap2 = new BitmapValue();
+    BitmapValue empty_bitmap1;
+    BitmapValue empty_bitmap2;
 
     DataSet data_set = {{{&bitmap1, &bitmap2}, uint8(true)},
-                        {{empty_bitmap1, empty_bitmap2}, uint8(true)},
+                        {{&empty_bitmap1, &empty_bitmap2}, uint8(true)},
                         {{&bitmap3, &bitmap4}, uint8(false)},
                         {{&bitmap4, &bitmap5}, uint8(true)},
-                        {{Null(), empty_bitmap1}, Null()}};
+                        {{Null(), &empty_bitmap1}, Null()}};
 
     check_function<DataTypeUInt8, true>(func_name, input_types, data_set);
-    delete empty_bitmap1;
-    delete empty_bitmap2;
 }
 
 } // namespace doris::vectorized
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyticExpr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyticExpr.java
index 7eae49924f..6c6d16807b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyticExpr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyticExpr.java
@@ -707,9 +707,8 @@ public class AnalyticExpr extends Expr {
             resetWindow = true;
         }
 
-       // Change first_value/last_value RANGE windows to ROWS 
-       if ((analyticFnName.getFunction().equalsIgnoreCase(FIRSTVALUE)
-                || analyticFnName.getFunction().equalsIgnoreCase(LASTVALUE))
+        // Change first_value RANGE windows to ROWS
+        if ((analyticFnName.getFunction().equalsIgnoreCase(FIRSTVALUE))
                 && window != null
                 && window.getType() == AnalyticWindow.Type.RANGE) {
             window = new AnalyticWindow(AnalyticWindow.Type.ROWS, window.getLeftBoundary(),
diff --git a/regression-test/data/correctness/test_last_value_window.out b/regression-test/data/correctness/test_last_value_window.out
new file mode 100644
index 0000000000..2b56500191
--- /dev/null
+++ b/regression-test/data/correctness/test_last_value_window.out
@@ -0,0 +1,8 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !select_default --
+21	04-21-11	1	1
+22	04-22-10-21	0	1
+22	04-22-10-21	1	1
+23	04-23-10	1	1
+24	02-24-10-21	1	1
+
diff --git a/regression-test/suites/correctness/test_last_value_window.groovy b/regression-test/suites/correctness/test_last_value_window.groovy
new file mode 100644
index 0000000000..274cdec46d
--- /dev/null
+++ b/regression-test/suites/correctness/test_last_value_window.groovy
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_last_value_window") {
+    def tableName = "state"
+
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+            CREATE TABLE ${tableName} (
+            `myday` INT,
+            `time` VARCHAR(40) NOT NULL,
+            `state` INT
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`myday`,time,state)
+            COMMENT "OLAP"
+            DISTRIBUTED BY HASH(`myday`) BUCKETS 2
+            PROPERTIES (
+            "replication_num" = "1",
+            "in_memory" = "false",
+            "storage_format" = "V2"
+            );
+    """
+
+    sql """ INSERT INTO ${tableName} VALUES 
+            (21,"04-21-11",1),
+            (22,"04-22-10-21",0),
+            (22,"04-22-10-21",1),
+            (23,"04-23-10",1),
+            (24,"02-24-10-21",1); """
+
+    // not_vectorized
+    sql """ set enable_vectorized_engine = false; """
+
+    qt_select_default """ select *,last_value(state) over(partition by myday order by time) from ${tableName}; """
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 10/17: [fix](http) Hardening Recommendations Disable TRACE/TRAC methods (#9479)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 59daed7591cb4ac4d73231225353e9bc9f34fcfd
Author: jiafeng.zhang <zh...@gmail.com>
AuthorDate: Wed May 11 09:41:59 2022 +0800

    [fix](http) Hardening Recommendations Disable TRACE/TRAC methods (#9479)
---
 .../httpv2/interceptor/ServletTraceIterceptor.java | 61 ++++++++++++++++++++++
 1 file changed, 61 insertions(+)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/interceptor/ServletTraceIterceptor.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/interceptor/ServletTraceIterceptor.java
new file mode 100644
index 0000000000..4e223429e9
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/interceptor/ServletTraceIterceptor.java
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.interceptor;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.annotation.WebFilter;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+
+@WebFilter (urlPatterns = "/*", filterName = "traceIterceptor")
+public class ServletTraceIterceptor implements Filter {
+
+    private static final Logger LOG = LogManager.getLogger(ServletTraceIterceptor.class);
+    @Override
+    public void init( FilterConfig filterConfig) throws ServletException {
+
+    }
+
+    @Override
+    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
+        throws IOException, ServletException {
+        HttpServletRequest httpRequest = (HttpServletRequest) request;
+        HttpServletResponse httpResponse = (HttpServletResponse) response;
+        if ("TRACE".equalsIgnoreCase(httpRequest.getMethod())) {
+            httpResponse.setStatus(HttpServletResponse.SC_METHOD_NOT_ALLOWED);
+            LOG.warn ("Trace method is not allowed to be called, has been intercepted, IP address:"
+                + request.getRemoteAddr());
+            return;
+        }
+        chain.doFilter(request, response);
+    }
+
+    @Override
+    public void destroy() {
+
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org