You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/12/26 05:24:06 UTC

[doris] branch branch-1.2-lts updated (3937270a49 -> 98c7779e91)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a change to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


    from 3937270a49 Revert "[fix](compile) fix compile error caused by `mysql_scan_node.cpp` not being found when enabling `WITH_MYSQL` (#15277)" (#15351)
     new 4b592efc41 [fix](information_schema) fix messy code of CHECK_TIME column of informatio_schema.tables (#14915)
     new 5cacef1547 [enhancement](memory) Add Memory GC when the available memory of the BE process is lacking (#14712)
     new a117c4d050 [enhancement](memory) Support query memroy overcommit #14948
     new a9f47302aa [deps](fe)upgrade deps version (#15262)
     new 637c46ca03 [Improvement](multi-catalog) support hive external tables which store data on tencent chdfs (#15297)
     new 7dd3513d81 [fix](array-type) forbid implicit cast of array type while load (#15325)
     new 7fdc001add [fix](information-schema) fix bug that query tables in information_schema db will return error #15336
     new 98c7779e91 [feature](planner) remove restrict of offset without order by (#15218)

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/build-extension.yml              |  19 ++-
 be/src/common/config.h                             |  13 ++
 be/src/common/daemon.cpp                           |   2 +
 be/src/olap/lru_cache.cpp                          |   4 +
 be/src/olap/lru_cache.h                            |   3 +
 be/src/olap/page_cache.cpp                         |   5 +
 be/src/olap/page_cache.h                           |   6 +
 be/src/runtime/fragment_mgr.cpp                    |  16 ++
 be/src/runtime/fragment_mgr.h                      |   3 +
 be/src/runtime/memory/chunk_allocator.cpp          |  20 +++
 be/src/runtime/memory/chunk_allocator.h            |   4 +
 be/src/runtime/memory/mem_tracker_limiter.cpp      | 164 ++++++++++++++++-----
 be/src/runtime/memory/mem_tracker_limiter.h        |  32 +++-
 be/src/runtime/memory/thread_mem_tracker_mgr.cpp   |  17 ++-
 be/src/runtime/memory/thread_mem_tracker_mgr.h     |  10 +-
 be/src/runtime/plan_fragment_executor.cpp          |   2 +
 be/src/runtime/query_fragments_ctx.h               |   2 +
 be/src/runtime/thread_context.h                    |   2 +
 be/src/util/mem_info.cpp                           |  49 ++++++
 be/src/util/mem_info.h                             |  18 ++-
 be/src/vec/exec/vexchange_node.cpp                 |  13 +-
 be/src/vec/exec/vexchange_node.h                   |   1 +
 dist/LICENSE-dist.txt                              |  30 ++--
 .../docs/ecosystem/external-table/hive-of-doris.md |   1 +
 .../docs/ecosystem/external-table/multi-catalog.md |   1 +
 .../docs/ecosystem/external-table/hive-of-doris.md |   1 +
 .../docs/ecosystem/external-table/multi-catalog.md |   1 +
 fe/fe-core/pom.xml                                 | 127 +++++++++++++++-
 .../apache/doris/alter/SchemaChangeHandler.java    |   8 +-
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |   6 +-
 .../java/org/apache/doris/analysis/InsertStmt.java |   2 +-
 .../java/org/apache/doris/analysis/QueryStmt.java  |   9 +-
 .../java/org/apache/doris/catalog/BrokerMgr.java   |  21 +++
 .../main/java/org/apache/doris/catalog/Column.java |   8 +-
 .../java/org/apache/doris/catalog/DatabaseIf.java  |  42 ++++++
 .../java/org/apache/doris/catalog/OlapTable.java   |   1 +
 .../java/org/apache/doris/catalog/Partition.java   |   2 +
 .../java/org/apache/doris/catalog/TableIf.java     |  14 ++
 .../catalog/external/JdbcExternalDatabase.java     |   1 -
 .../java/org/apache/doris/common/FeNameFormat.java |   2 +-
 .../org/apache/doris/datasource/CatalogIf.java     |  14 ++
 .../apache/doris/datasource/ExternalCatalog.java   |  14 +-
 .../src/main/java/org/apache/doris/load/Load.java  |  19 ++-
 .../apache/doris/load/update/UpdatePlanner.java    |   4 +-
 .../org/apache/doris/planner/BrokerScanNode.java   |   9 +-
 .../apache/doris/planner/DistributedPlanner.java   |   5 +-
 .../org/apache/doris/planner/EmptySetNode.java     |   1 +
 .../org/apache/doris/planner/ExchangeNode.java     |  11 +-
 .../java/org/apache/doris/planner/PlanNode.java    |  24 +++
 .../apache/doris/planner/SingleNodePlanner.java    |   2 +-
 .../java/org/apache/doris/planner/SortNode.java    |   9 --
 .../doris/planner/external/HiveScanProvider.java   |   2 +
 .../doris/planner/external/QueryScanProvider.java  |  26 +++-
 .../apache/doris/service/FrontendServiceImpl.java  |  33 ++---
 .../org/apache/doris/task/CreateReplicaTask.java   |   4 +-
 .../apache/doris/planner/UpdatePlannerTest.java    |   2 +-
 fe/hive-udf/pom.xml                                |  38 +++++
 fe/java-udf/pom.xml                                |  19 +++
 fe/pom.xml                                         |   7 +-
 fe/spark-dpp/pom.xml                               |  17 +++
 fs_brokers/apache_hdfs_broker/pom.xml              |  32 +++-
 .../conditional_functions/test_query_limit.out     |  23 ++-
 .../load_p0/stream_load/test_stream_load.groovy    |  22 +++
 .../conditional_functions/test_query_limit.groovy  |  18 +++
 64 files changed, 888 insertions(+), 149 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 05/08: [Improvement](multi-catalog) support hive external tables which store data on tencent chdfs (#15297)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 637c46ca0356454082a3e288233e8244bc8114ad
Author: Yulei-Yang <yu...@gmail.com>
AuthorDate: Sun Dec 25 21:57:18 2022 +0800

    [Improvement](multi-catalog) support hive external tables which store data on tencent chdfs (#15297)
    
    * support read hive table whichs store data on tencent chdfs in multi-catalog
---
 .../docs/ecosystem/external-table/hive-of-doris.md |  1 +
 .../docs/ecosystem/external-table/multi-catalog.md |  1 +
 .../docs/ecosystem/external-table/hive-of-doris.md |  1 +
 .../docs/ecosystem/external-table/multi-catalog.md |  1 +
 .../java/org/apache/doris/catalog/BrokerMgr.java   | 21 +++++++++++++++++
 .../org/apache/doris/planner/BrokerScanNode.java   |  9 ++++++--
 .../doris/planner/external/HiveScanProvider.java   |  2 ++
 .../doris/planner/external/QueryScanProvider.java  | 26 ++++++++++++++++++++--
 8 files changed, 58 insertions(+), 4 deletions(-)

diff --git a/docs/en/docs/ecosystem/external-table/hive-of-doris.md b/docs/en/docs/ecosystem/external-table/hive-of-doris.md
index ed7f4438d9..a68b6ce157 100644
--- a/docs/en/docs/ecosystem/external-table/hive-of-doris.md
+++ b/docs/en/docs/ecosystem/external-table/hive-of-doris.md
@@ -31,6 +31,7 @@ Hive External Table of Doris provides Doris with direct access to Hive external
  1. support for Hive data sources to access Doris
  2. Support joint queries between Doris and Hive data sources to perform more complex analysis operations
  3. Support access to kerberos-enabled Hive data sources
+ 4. Support access to hive tables whose data stored on tencent chdfs
 
 This document introduces how to use this feature and the considerations.
 
diff --git a/docs/en/docs/ecosystem/external-table/multi-catalog.md b/docs/en/docs/ecosystem/external-table/multi-catalog.md
index 3be2f3bba0..2eff944c35 100644
--- a/docs/en/docs/ecosystem/external-table/multi-catalog.md
+++ b/docs/en/docs/ecosystem/external-table/multi-catalog.md
@@ -77,6 +77,7 @@ This function will be used as a supplement and enhancement to the previous exter
 > 1. hive supports version 2.3.7 and above.
 > 2. Iceberg currently only supports V1 version, V2 version will be supported soon.
 > 3. Hudi currently only supports Snapshot Query for Copy On Write tables and Read Optimized Query for Merge On Read tables. In the future, Incremental Query and Snapshot Query for Merge On Read tables will be supported soon.
+> 4. Support access to hive tables whose data stored on tencent chdfs, usage is same as common hive table.
 
 The following example is used to create a Catalog named hive to connect the specified Hive MetaStore, and provide the HDFS HA connection properties to access the corresponding files in HDFS.
 
diff --git a/docs/zh-CN/docs/ecosystem/external-table/hive-of-doris.md b/docs/zh-CN/docs/ecosystem/external-table/hive-of-doris.md
index f348d97947..89e581a4a6 100644
--- a/docs/zh-CN/docs/ecosystem/external-table/hive-of-doris.md
+++ b/docs/zh-CN/docs/ecosystem/external-table/hive-of-doris.md
@@ -33,6 +33,7 @@ Hive External Table of Doris 提供了 Doris 直接访问 Hive 外部表的能
 1. 支持 Hive 数据源接入Doris
 2. 支持 Doris 与 Hive 数据源中的表联合查询,进行更加复杂的分析操作
 3. 支持 访问开启 kerberos 的 Hive 数据源
+4. 支持 访问数据存储在腾讯 CHDFS 上的 Hive 数据源
  
 本文档主要介绍该功能的使用方式和注意事项等。
 
diff --git a/docs/zh-CN/docs/ecosystem/external-table/multi-catalog.md b/docs/zh-CN/docs/ecosystem/external-table/multi-catalog.md
index 3627bb9221..e7ad61f766 100644
--- a/docs/zh-CN/docs/ecosystem/external-table/multi-catalog.md
+++ b/docs/zh-CN/docs/ecosystem/external-table/multi-catalog.md
@@ -77,6 +77,7 @@ under the License.
 > 1. hive 支持 2.3.7 以上版本。
 > 2. Iceberg 目前仅支持 V1 版本,V2 版本即将支持。
 > 3. Hudi 目前仅支持 Copy On Write 表的 Snapshot Query,以及 Merge On Read 表的 Read Optimized Query。后续将支持 Incremental Query 和 Merge On Read 表的 Snapshot Query。
+> 4. 支持数据存储在腾讯 CHDFS 上的 hive 表,用法和普通 hive 一样。
 
 以下示例,用于创建一个名为 hive 的 Catalog 连接指定的 Hive MetaStore,并提供了 HDFS HA 连接属性,用于访问对应的 HDFS 中的文件。
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerMgr.java
index 84adeea719..28a679b9a1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerMgr.java
@@ -38,6 +38,7 @@ import com.google.common.collect.Maps;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -128,6 +129,26 @@ public class BrokerMgr {
         }
     }
 
+    public FsBroker getAnyAliveBroker() {
+        lock.lock();
+        try {
+            List<FsBroker> allBrokers = new ArrayList<>();
+            for (List<FsBroker> list : brokerListMap.values()) {
+                allBrokers.addAll(list);
+            }
+
+            Collections.shuffle(allBrokers);
+            for (FsBroker fsBroker : allBrokers) {
+                if (fsBroker.isAlive) {
+                    return fsBroker;
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+        return null;
+    }
+
     public FsBroker getBroker(String brokerName, String host) throws AnalysisException {
         if (brokerName.equalsIgnoreCase(BrokerDesc.MULTI_LOAD_BROKER)) {
             return new FsBroker("127.0.0.1", 0);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
index 41b0d1f770..38284b097e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
@@ -317,8 +317,7 @@ public class BrokerScanNode extends LoadScanNode {
         // Generate on broker scan range
         TBrokerScanRange brokerScanRange = new TBrokerScanRange();
         brokerScanRange.setParams(params);
-        if (brokerDesc.getStorageType() == StorageBackend.StorageType.BROKER
-                || brokerDesc.getStorageType() == StorageBackend.StorageType.OFS) {
+        if (brokerDesc.getStorageType() == StorageBackend.StorageType.BROKER) {
             FsBroker broker = null;
             try {
                 broker = Env.getCurrentEnv().getBrokerMgr()
@@ -327,6 +326,12 @@ public class BrokerScanNode extends LoadScanNode {
                 throw new UserException(e.getMessage());
             }
             brokerScanRange.addToBrokerAddresses(new TNetworkAddress(broker.ip, broker.port));
+        } else if (brokerDesc.getStorageType() == StorageBackend.StorageType.OFS) {
+            FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
+            if (broker == null) {
+                throw new UserException("No alive broker.");
+            }
+            brokerScanRange.addToBrokerAddresses(new TNetworkAddress(broker.ip, broker.port));
         } else {
             brokerScanRange.setBrokerAddresses(new ArrayList<>());
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
index ace3693907..36bde10b7f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
@@ -126,6 +126,8 @@ public class HiveScanProvider extends HMSTableScanProvider {
                 return TFileType.FILE_HDFS;
             } else if (location.startsWith(FeConstants.FS_PREFIX_FILE)) {
                 return TFileType.FILE_LOCAL;
+            }  else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) {
+                return TFileType.FILE_BROKER;
             }
         }
         throw new DdlException("Unknown file location " + location + " for hms table " + hmsTable.getName());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
index a7453394f6..b332819700 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
@@ -17,6 +17,8 @@
 
 package org.apache.doris.planner.external;
 
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.catalog.HdfsResource;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
@@ -38,6 +40,7 @@ import org.apache.doris.thrift.TScanRangeLocation;
 import org.apache.doris.thrift.TScanRangeLocations;
 
 import com.google.common.base.Joiner;
+import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.logging.log4j.LogManager;
@@ -75,7 +78,7 @@ public abstract class QueryScanProvider implements FileScanProviderIf {
 
             // set hdfs params for hdfs file type.
             Map<String, String> locationProperties = getLocationProperties();
-            if (locationType == TFileType.FILE_HDFS) {
+            if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) {
                 String fsName = "";
                 if (this instanceof TVFScanProvider) {
                     fsName = ((TVFScanProvider) this).getFsName();
@@ -90,6 +93,14 @@ public abstract class QueryScanProvider implements FileScanProviderIf {
                 THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties);
                 tHdfsParams.setFsName(fsName);
                 context.params.setHdfsParams(tHdfsParams);
+
+                if (locationType == TFileType.FILE_BROKER) {
+                    FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
+                    if (broker == null) {
+                        throw new UserException("No alive broker.");
+                    }
+                    context.params.addToBrokerAddresses(new TNetworkAddress(broker.ip, broker.port));
+                }
             } else if (locationType == TFileType.FILE_S3) {
                 context.params.setProperties(locationProperties);
             }
@@ -108,6 +119,13 @@ public abstract class QueryScanProvider implements FileScanProviderIf {
                 if (split instanceof IcebergSplit) {
                     IcebergScanProvider.setIcebergParams(rangeDesc, (IcebergSplit) split);
                 }
+
+                // file size of orc files is not correct get by FileSplit.getLength(),
+                // broker reader needs correct file size
+                if (locationType == TFileType.FILE_BROKER && fileFormatType == TFileFormatType.FORMAT_ORC) {
+                    rangeDesc.setFileSize(((OrcSplit) fileSplit).getFileLength());
+                }
+
                 curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
                 LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}",
                         curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), fileSplit.getStart(),
@@ -172,12 +190,16 @@ public abstract class QueryScanProvider implements FileScanProviderIf {
         TFileRangeDesc rangeDesc = new TFileRangeDesc();
         rangeDesc.setStartOffset(fileSplit.getStart());
         rangeDesc.setSize(fileSplit.getLength());
+        // fileSize only be used when format is orc or parquet and TFileType is broker
+        // When TFileType is other type, it is not necessary
+        rangeDesc.setFileSize(fileSplit.getLength());
         rangeDesc.setColumnsFromPath(columnsFromPath);
         rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys);
 
         if (getLocationType() == TFileType.FILE_HDFS) {
             rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
-        } else if (getLocationType() == TFileType.FILE_S3) {
+        } else if (getLocationType() == TFileType.FILE_S3 || getLocationType() == TFileType.FILE_BROKER) {
+            // need full path
             rangeDesc.setPath(fileSplit.getPath().toString());
         }
         return rangeDesc;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 08/08: [feature](planner) remove restrict of offset without order by (#15218)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 98c7779e914c260bd5986bcdceccbf121aef493f
Author: morrySnow <10...@users.noreply.github.com>
AuthorDate: Mon Dec 26 09:37:41 2022 +0800

    [feature](planner) remove restrict of offset without order by (#15218)
    
    Support SELECT * FROM tbl LIMIT 5, 3;
---
 be/src/vec/exec/vexchange_node.cpp                 | 13 +++++++++++-
 be/src/vec/exec/vexchange_node.h                   |  1 +
 .../java/org/apache/doris/analysis/QueryStmt.java  |  9 ++++----
 .../apache/doris/planner/DistributedPlanner.java   |  5 +++--
 .../org/apache/doris/planner/EmptySetNode.java     |  1 +
 .../org/apache/doris/planner/ExchangeNode.java     | 11 ++++++++--
 .../java/org/apache/doris/planner/PlanNode.java    | 24 ++++++++++++++++++++++
 .../apache/doris/planner/SingleNodePlanner.java    |  2 +-
 .../java/org/apache/doris/planner/SortNode.java    |  9 --------
 .../conditional_functions/test_query_limit.out     | 23 +++++++++++++++++----
 .../conditional_functions/test_query_limit.groovy  | 18 ++++++++++++++++
 11 files changed, 93 insertions(+), 23 deletions(-)

diff --git a/be/src/vec/exec/vexchange_node.cpp b/be/src/vec/exec/vexchange_node.cpp
index 8da3d2ead4..c28ca147f8 100644
--- a/be/src/vec/exec/vexchange_node.cpp
+++ b/be/src/vec/exec/vexchange_node.cpp
@@ -33,7 +33,8 @@ VExchangeNode::VExchangeNode(ObjectPool* pool, const TPlanNode& tnode, const Des
                           std::vector<bool>(tnode.nullable_tuples.begin(),
                                             tnode.nullable_tuples.begin() +
                                                     tnode.exchange_node.input_row_tuples.size())),
-          _offset(tnode.exchange_node.__isset.offset ? tnode.exchange_node.offset : 0) {}
+          _offset(tnode.exchange_node.__isset.offset ? tnode.exchange_node.offset : 0),
+          _num_rows_skipped(0) {}
 
 Status VExchangeNode::init(const TPlanNode& tnode, RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::init(tnode, state));
@@ -87,6 +88,16 @@ Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) {
     SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     auto status = _stream_recvr->get_next(block, eos);
     if (block != nullptr) {
+        if (!_is_merging) {
+            if (_num_rows_skipped + block->rows() < _offset) {
+                _num_rows_skipped += block->rows();
+                block->set_num_rows(0);
+            } else if (_num_rows_skipped < _offset) {
+                auto offset = _offset - _num_rows_skipped;
+                _num_rows_skipped = _offset;
+                block->set_num_rows(block->rows() - offset);
+            }
+        }
         if (_num_rows_returned + block->rows() < _limit) {
             _num_rows_returned += block->rows();
         } else {
diff --git a/be/src/vec/exec/vexchange_node.h b/be/src/vec/exec/vexchange_node.h
index b985097ca6..cdd526314f 100644
--- a/be/src/vec/exec/vexchange_node.h
+++ b/be/src/vec/exec/vexchange_node.h
@@ -51,6 +51,7 @@ private:
 
     // use in merge sort
     size_t _offset;
+    int64_t _num_rows_skipped;
     VSortExecExprs _vsort_exec_exprs;
     std::vector<bool> _is_asc_order;
     std::vector<bool> _nulls_first;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/QueryStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/QueryStmt.java
index 9616c1cb47..5c0f24f2f7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/QueryStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/QueryStmt.java
@@ -26,6 +26,7 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.VectorizedUtil;
 import org.apache.doris.rewrite.ExprRewriter;
 
 import com.google.common.base.Preconditions;
@@ -187,8 +188,7 @@ public abstract class QueryStmt extends StatementBase implements Queriable {
     }
 
     private void analyzeLimit(Analyzer analyzer) throws AnalysisException {
-        // TODO chenhao
-        if (limitElement.getOffset() > 0 && !hasOrderByClause()) {
+        if (!VectorizedUtil.isVectorized() && limitElement.getOffset() > 0 && !hasOrderByClause()) {
             throw new AnalysisException("OFFSET requires an ORDER BY clause: "
                     + limitElement.toSql().trim());
         }
@@ -621,10 +621,11 @@ public abstract class QueryStmt extends StatementBase implements Queriable {
         return limitElement.getLimit();
     }
 
-    public void setLimit(long limit) throws AnalysisException {
+    public void setLimit(long limit) {
         Preconditions.checkState(limit >= 0);
         long newLimit = hasLimitClause() ? Math.min(limit, getLimit()) : limit;
-        limitElement = new LimitElement(newLimit);
+        long offset = hasLimitClause() ? getOffset() : 0;
+        limitElement = new LimitElement(offset, newLimit);
     }
 
     public void removeLimitElement() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index 6c35b62006..19d153f1b5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -236,7 +236,8 @@ public class DistributedPlanner {
         // move 'result' to end, it depends on all of its children
         fragments.remove(result);
         fragments.add(result);
-        if (!isPartitioned && result.isPartitioned() && result.getPlanRoot().getNumInstances() > 1) {
+        if ((!isPartitioned && result.isPartitioned() && result.getPlanRoot().getNumInstances() > 1)
+                || (!(root instanceof SortNode) && root.hasOffset())) {
             result = createMergeFragment(result);
             fragments.add(result);
         }
@@ -251,7 +252,7 @@ public class DistributedPlanner {
      */
     private PlanFragment createMergeFragment(PlanFragment inputFragment)
             throws UserException {
-        Preconditions.checkState(inputFragment.isPartitioned());
+        Preconditions.checkState(inputFragment.isPartitioned() || inputFragment.getPlanRoot().hasOffset());
 
         // exchange node clones the behavior of its input, aside from the conjuncts
         ExchangeNode mergePlan =
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java
index f56cb16a72..0497fd4838 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java
@@ -43,6 +43,7 @@ public class EmptySetNode extends PlanNode {
     public EmptySetNode(PlanNodeId id, ArrayList<TupleId> tupleIds) {
         super(id, tupleIds, "EMPTYSET", StatisticalType.EMPTY_SET_NODE);
         cardinality = 0L;
+        offset = 0;
         Preconditions.checkArgument(tupleIds.size() > 0);
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index fd4da60790..e3cd192f8c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -30,6 +30,7 @@ import org.apache.doris.common.util.VectorizedUtil;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.statistics.StatsRecursiveDerive;
 import org.apache.doris.thrift.TExchangeNode;
+import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TPlanNodeType;
 import org.apache.doris.thrift.TSortInfo;
@@ -81,8 +82,9 @@ public class ExchangeNode extends PlanNode {
         }
         // Only apply the limit at the receiver if there are multiple senders.
         if (inputNode.getFragment().isPartitioned()) {
-            limit = inputNode.limit;
+            limit = inputNode.limit - inputNode.offset;
         }
+        offset = inputNode.offset;
         computeTupleIds();
 
     }
@@ -162,8 +164,8 @@ public class ExchangeNode extends PlanNode {
                     Expr.treesToThrift(mergeInfo.getOrderingExprs()),
                     mergeInfo.getIsAscOrder(), mergeInfo.getNullsFirst());
             msg.exchange_node.setSortInfo(sortInfo);
-            msg.exchange_node.setOffset(offset);
         }
+        msg.exchange_node.setOffset(offset);
     }
 
     @Override
@@ -179,4 +181,9 @@ public class ExchangeNode extends PlanNode {
         return numInstances;
     }
 
+    @Override
+    public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
+        return prefix + "offset: " + offset + "\n";
+    }
+
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index 36263f5795..cc1a1aa981 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -82,6 +82,7 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats {
     protected PlanNodeId id;  // unique w/in plan tree; assigned by planner
     protected PlanFragmentId fragmentId;  // assigned by planner after fragmentation step
     protected long limit; // max. # of rows to be returned; 0: no limit
+    protected long offset;
 
     // ids materialized by the tree rooted at this node
     protected ArrayList<TupleId> tupleIds;
@@ -151,6 +152,7 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats {
             StatisticalType statisticalType) {
         this.id = id;
         this.limit = -1;
+        this.offset = 0;
         // make a copy, just to be on the safe side
         this.tupleIds = Lists.newArrayList(tupleIds);
         this.tblRefIds = Lists.newArrayList(tupleIds);
@@ -177,6 +179,7 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats {
     protected PlanNode(PlanNodeId id, PlanNode node, String planNodeName, StatisticalType statisticalType) {
         this.id = id;
         this.limit = node.limit;
+        this.offset = node.offset;
         this.tupleIds = Lists.newArrayList(node.tupleIds);
         this.tblRefIds = Lists.newArrayList(node.tblRefIds);
         this.nullableTupleIds = Sets.newHashSet(node.nullableTupleIds);
@@ -258,6 +261,10 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats {
         return limit;
     }
 
+    public long getOffset() {
+        return offset;
+    }
+
     /**
      * Set the limit to the given limit only if the limit hasn't been set, or the new limit
      * is lower.
@@ -270,10 +277,27 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats {
         }
     }
 
+    public void setLimitAndOffset(long limit, long offset) {
+        if (this.limit == -1) {
+            this.limit = limit;
+        } else if (limit != -1) {
+            this.limit = Math.min(this.limit - offset, limit);
+        }
+        this.offset += offset;
+    }
+
+    public void setOffset(long offset) {
+        this.offset = offset;
+    }
+
     public boolean hasLimit() {
         return limit > -1;
     }
 
+    public boolean hasOffset() {
+        return offset != 0;
+    }
+
     public void setCardinality(long cardinality) {
         this.cardinality = cardinality;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index fd24403179..82bceeb92a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -305,7 +305,7 @@ public class SingleNodePlanner {
             // from SelectStmt outside
             root = addUnassignedConjuncts(analyzer, root);
         } else {
-            root.setLimit(stmt.getLimit());
+            root.setLimitAndOffset(stmt.getLimit(), stmt.getOffset());
             root.computeStats(analyzer);
         }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
index f65235794b..6075f1dcd9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
@@ -64,7 +64,6 @@ public class SortNode extends PlanNode {
     private final boolean  useTopN;
 
     private boolean  isDefaultLimit;
-    private long offset;
     // if true, the output of this node feeds an AnalyticNode
     private boolean isAnalyticSort;
     private DataPartition inputPartition;
@@ -130,14 +129,6 @@ public class SortNode extends PlanNode {
         this.inputPartition = inputPartition;
     }
 
-    public long getOffset() {
-        return offset;
-    }
-
-    public void setOffset(long offset) {
-        this.offset = offset;
-    }
-
     public SortInfo getSortInfo() {
         return info;
     }
diff --git a/regression-test/data/query_p0/sql_functions/conditional_functions/test_query_limit.out b/regression-test/data/query_p0/sql_functions/conditional_functions/test_query_limit.out
index c0bd4ba317..c918eecfd4 100644
--- a/regression-test/data/query_p0/sql_functions/conditional_functions/test_query_limit.out
+++ b/regression-test/data/query_p0/sql_functions/conditional_functions/test_query_limit.out
@@ -1,7 +1,7 @@
 -- This file is automatically generated. You should know what you did if you want to edit this
 -- !limit1 --
 false	1	1989	1001	11011902	123.123	true	1989-03-21	1989-03-21T13:00	wangjuoo4	0.1	6.333	string12345	170141183460469231731687303715884105727
-false	2	1986	1001	11011903	1243.5	false	1901-12-31	1989-03-21T13:00	wangynnsf	20.268	789.25	string12345	-170141183460469231731687303715884105727
+false	2	1986	1001	11011903	1243.500	false	1901-12-31	1989-03-21T13:00	wangynnsf	20.268	789.25	string12345	-170141183460469231731687303715884105727
 
 -- !limit2 --
 
@@ -9,7 +9,7 @@ false	2	1986	1001	11011903	1243.5	false	1901-12-31	1989-03-21T13:00	wangynnsf	20
 
 -- !limit4 --
 false	1	1989	1001	11011902	123.123	true	1989-03-21	1989-03-21T13:00	wangjuoo4	0.1	6.333	string12345	170141183460469231731687303715884105727
-false	2	1986	1001	11011903	1243.5	false	1901-12-31	1989-03-21T13:00	wangynnsf	20.268	789.25	string12345	-170141183460469231731687303715884105727
+false	2	1986	1001	11011903	1243.500	false	1901-12-31	1989-03-21T13:00	wangynnsf	20.268	789.25	string12345	-170141183460469231731687303715884105727
 false	3	1989	1002	11011905	24453.325	false	2012-03-14	2000-01-01T00:00	yunlj8@nk	78945.0	3654.0	string12345	0
 
 -- !limit5 --
@@ -20,13 +20,13 @@ false	3	1989	1002	11011905	24453.325	false	2012-03-14	2000-01-01T00:00	yunlj8@nk
 
 -- !limit7 --
 false	1	1989	1001	11011902	123.123	true	1989-03-21	1989-03-21T13:00	wangjuoo4	0.1	6.333	string12345	170141183460469231731687303715884105727
-false	2	1986	1001	11011903	1243.5	false	1901-12-31	1989-03-21T13:00	wangynnsf	20.268	789.25	string12345	-170141183460469231731687303715884105727
+false	2	1986	1001	11011903	1243.500	false	1901-12-31	1989-03-21T13:00	wangynnsf	20.268	789.25	string12345	-170141183460469231731687303715884105727
 
 -- !limit8 --
 
 -- !limit9 --
 false	1	1989	1001	11011902	123.123	true	1989-03-21	1989-03-21T13:00	wangjuoo4	0.1	6.333	string12345	170141183460469231731687303715884105727
-false	2	1986	1001	11011903	1243.5	false	1901-12-31	1989-03-21T13:00	wangynnsf	20.268	789.25	string12345	-170141183460469231731687303715884105727
+false	2	1986	1001	11011903	1243.500	false	1901-12-31	1989-03-21T13:00	wangynnsf	20.268	789.25	string12345	-170141183460469231731687303715884105727
 false	3	1989	1002	11011905	24453.325	false	2012-03-14	2000-01-01T00:00	yunlj8@nk	78945.0	3654.0	string12345	0
 
 -- !limit10 --
@@ -35,3 +35,18 @@ false	3	1989	1002	11011905	24453.325	false	2012-03-14	2000-01-01T00:00	yunlj8@nk
 
 -- !limit11 --
 
+-- !limit12 --
+false	3	1989	1002	11011905	24453.325	false	2012-03-14	2000-01-01T00:00	yunlj8@nk	78945.0	3654.0	string12345	0
+
+-- !limit13 --
+false	3	1989	1002	11011905	24453.325	false	2012-03-14	2000-01-01T00:00	yunlj8@nk	78945.0	3654.0	string12345	0
+
+-- !limit14 --
+false	3	1989	1002	11011905	24453.325	false	2012-03-14	2000-01-01T00:00	yunlj8@nk	78945.0	3654.0	string12345	0
+
+-- !limit15 --
+
+-- !limit16 --
+
+-- !limit17 --
+
diff --git a/regression-test/suites/query_p0/sql_functions/conditional_functions/test_query_limit.groovy b/regression-test/suites/query_p0/sql_functions/conditional_functions/test_query_limit.groovy
index 8c9b205088..88816931dd 100644
--- a/regression-test/suites/query_p0/sql_functions/conditional_functions/test_query_limit.groovy
+++ b/regression-test/suites/query_p0/sql_functions/conditional_functions/test_query_limit.groovy
@@ -31,4 +31,22 @@ suite("test_query_limit", "query,p0") {
     qt_limit9 "select * from ${tableName} order by k1, k2, k3, k4 desc limit 100"
     qt_limit10 "select k3, sum(k9) from ${tableName} where k1<5 group by 1 order by 2 limit 3"
     qt_limit11 "select * from (select * from ${tableName} union all select * from ${tableName2}) b limit 0"
+    qt_limit12 "select * from (select * from ${tableName} order by k1, k2, k3, k4 limit 1, 2) a limit 1, 1"
+    qt_limit13 "select * from (select * from ${tableName} order by k1, k2, k3, k4 limit 1, 2) a limit 1, 2"
+    qt_limit14 "select * from (select * from ${tableName} order by k1, k2, k3, k4 limit 1, 2) a limit 1, 3"
+    qt_limit15 "select * from (select * from ${tableName} order by k1, k2, k3, k4 limit 1, 2) a limit 2, 1"
+    qt_limit16 "select * from (select * from ${tableName} order by k1, k2, k3, k4 limit 1, 2) a limit 2, 2"
+    qt_limit17 "select * from (select * from ${tableName} order by k1, k2, k3, k4 limit 1, 2) a limit 2, 3"
+    test {
+        sql "select * from ${tableName} limit 1, 10"
+        rowNum 2
+    }
+    test {
+        sql "select * from ${tableName} limit 2, 10"
+        rowNum 1
+    }
+    test {
+        sql "select * from ${tableName} limit 3, 10"
+        rowNum 0
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 01/08: [fix](information_schema) fix messy code of CHECK_TIME column of informatio_schema.tables (#14915)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 4b592efc4184489f8303c6b79d352916d1cf8ab3
Author: Mingyu Chen <mo...@163.com>
AuthorDate: Sat Dec 10 18:45:25 2022 +0800

    [fix](information_schema) fix messy code of CHECK_TIME column of informatio_schema.tables (#14915)
---
 .../java/org/apache/doris/service/FrontendServiceImpl.java   | 12 +++---------
 1 file changed, 3 insertions(+), 9 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 5742042697..e0a622b55c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -30,7 +30,6 @@ import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.TableIf.TableType;
 import org.apache.doris.catalog.external.ExternalDatabase;
-import org.apache.doris.catalog.external.ExternalTable;
 import org.apache.doris.cluster.Cluster;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
@@ -314,21 +313,16 @@ public class FrontendServiceImpl implements FrontendService.Iface {
                         if (matcher != null && !matcher.match(table.getName())) {
                             continue;
                         }
-                        long lastCheckTime = 0;
-                        if (table instanceof Table) {
-                            lastCheckTime = ((Table) table).getLastCheckTime();
-                        } else {
-                            lastCheckTime = ((ExternalTable) table).getLastCheckTime();
-                        }
+                        long lastCheckTime = table.getLastCheckTime() <= 0 ? 0 : table.getLastCheckTime();
                         TTableStatus status = new TTableStatus();
                         status.setName(table.getName());
                         status.setType(table.getMysqlType());
                         status.setEngine(table.getEngine());
                         status.setComment(table.getComment());
                         status.setCreateTime(table.getCreateTime());
-                        status.setLastCheckTime(lastCheckTime);
+                        status.setLastCheckTime(lastCheckTime / 1000);
                         status.setUpdateTime(table.getUpdateTime() / 1000);
-                        status.setCheckTime(lastCheckTime);
+                        status.setCheckTime(lastCheckTime / 1000);
                         status.setCollation("utf-8");
                         status.setRows(table.getRowCount());
                         status.setDataLength(table.getDataLength());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 02/08: [enhancement](memory) Add Memory GC when the available memory of the BE process is lacking (#14712)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 5cacef1547d331ff6936f37634d107ace11c186a
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Wed Dec 7 15:28:52 2022 +0800

    [enhancement](memory) Add Memory GC when the available memory of the BE process is lacking (#14712)
    
    When the system MemAvailable is less than the warning water mark, or the memory used by the BE process exceeds the mem soft limit, run minor gc and try to release cache.
    
    When the MemAvailable of the system is less than the low water mark, or the memory used by the BE process exceeds the mem limit, run fucc gc, try to release the cache, and start canceling from the query with the largest memory usage until the memory of mem_limit * 20% is released.
---
 be/src/common/config.h                           |   7 ++
 be/src/common/daemon.cpp                         |   2 +
 be/src/olap/lru_cache.cpp                        |   4 +
 be/src/olap/lru_cache.h                          |   3 +
 be/src/olap/page_cache.cpp                       |   5 ++
 be/src/olap/page_cache.h                         |   6 ++
 be/src/runtime/fragment_mgr.cpp                  |  16 ++++
 be/src/runtime/fragment_mgr.h                    |   3 +
 be/src/runtime/memory/chunk_allocator.cpp        |  20 +++++
 be/src/runtime/memory/chunk_allocator.h          |   4 +
 be/src/runtime/memory/mem_tracker_limiter.cpp    | 110 +++++++++++++++--------
 be/src/runtime/memory/mem_tracker_limiter.h      |  15 +++-
 be/src/runtime/memory/thread_mem_tracker_mgr.cpp |  17 +++-
 be/src/runtime/memory/thread_mem_tracker_mgr.h   |  10 ++-
 be/src/runtime/plan_fragment_executor.cpp        |   2 +
 be/src/runtime/query_fragments_ctx.h             |   2 +
 be/src/runtime/thread_context.h                  |   2 +
 be/src/util/mem_info.cpp                         |  45 ++++++++++
 be/src/util/mem_info.h                           |  18 +++-
 19 files changed, 244 insertions(+), 47 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 707222447b..86b278b76b 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -68,6 +68,13 @@ CONF_Double(soft_mem_limit_frac, "0.9");
 // Turn down max. will use as much memory as possible.
 CONF_Int64(max_sys_mem_available_low_water_mark_bytes, "1717986918");
 
+// The size of the memory that gc wants to release each time, as a percentage of the mem limit.
+CONF_mString(process_minor_gc_size, "10%");
+CONF_mString(process_full_gc_size, "20%");
+
+// The maximum time a thread waits for a full GC. Currently only query will wait for full gc.
+CONF_mInt32(thread_wait_gc_max_milliseconds, "1000");
+
 // the port heartbeat service used
 CONF_Int32(heartbeat_service_port, "9050");
 // the count of heart beat service
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index fb306c21a0..c0d3fee21f 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -228,11 +228,13 @@ void Daemon::memory_maintenance_thread() {
                     doris::MemInfo::sys_mem_available_low_water_mark() ||
             doris::MemInfo::proc_mem_no_allocator_cache() >= doris::MemInfo::mem_limit()) {
             interval_milliseconds = 100;
+            doris::MemInfo::process_full_gc();
         } else if (doris::MemInfo::sys_mem_available() <
                            doris::MemInfo::sys_mem_available_warning_water_mark() ||
                    doris::MemInfo::proc_mem_no_allocator_cache() >=
                            doris::MemInfo::soft_mem_limit()) {
             interval_milliseconds = 200;
+            doris::MemInfo::process_minor_gc();
         } else {
             interval_milliseconds = config::memory_maintenance_sleep_time_ms;
         }
diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp
index a6505d9bbe..484660e1bb 100644
--- a/be/src/olap/lru_cache.cpp
+++ b/be/src/olap/lru_cache.cpp
@@ -523,6 +523,10 @@ int64_t ShardedLRUCache::prune_if(CacheValuePredicate pred) {
     return num_prune;
 }
 
+int64_t ShardedLRUCache::mem_consumption() {
+    return _mem_tracker->consumption();
+}
+
 void ShardedLRUCache::update_cache_metrics() const {
     size_t total_capacity = 0;
     size_t total_usage = 0;
diff --git a/be/src/olap/lru_cache.h b/be/src/olap/lru_cache.h
index 7ae79ef3ba..480a61c712 100644
--- a/be/src/olap/lru_cache.h
+++ b/be/src/olap/lru_cache.h
@@ -217,6 +217,8 @@ public:
     // may hold lock for a long time to execute predicate.
     virtual int64_t prune_if(CacheValuePredicate pred) { return 0; }
 
+    virtual int64_t mem_consumption() = 0;
+
 private:
     DISALLOW_COPY_AND_ASSIGN(Cache);
 };
@@ -370,6 +372,7 @@ public:
     virtual uint64_t new_id() override;
     virtual int64_t prune() override;
     virtual int64_t prune_if(CacheValuePredicate pred) override;
+    int64_t mem_consumption() override;
 
 private:
     void update_cache_metrics() const;
diff --git a/be/src/olap/page_cache.cpp b/be/src/olap/page_cache.cpp
index fa9f9010be..2813f85dd3 100644
--- a/be/src/olap/page_cache.cpp
+++ b/be/src/olap/page_cache.cpp
@@ -76,4 +76,9 @@ void StoragePageCache::insert(const CacheKey& key, const Slice& data, PageCacheH
     *handle = PageCacheHandle(cache, lru_handle);
 }
 
+void StoragePageCache::prune(segment_v2::PageTypePB page_type) {
+    auto cache = _get_page_cache(page_type);
+    cache->prune();
+}
+
 } // namespace doris
diff --git a/be/src/olap/page_cache.h b/be/src/olap/page_cache.h
index c1f0b48da3..6313e931e0 100644
--- a/be/src/olap/page_cache.h
+++ b/be/src/olap/page_cache.h
@@ -91,6 +91,12 @@ public:
         return _get_page_cache(page_type) != nullptr;
     }
 
+    void prune(segment_v2::PageTypePB page_type);
+
+    int64_t get_page_cache_mem_consumption(segment_v2::PageTypePB page_type) {
+        return _get_page_cache(page_type)->mem_consumption();
+    }
+
 private:
     StoragePageCache();
     static StoragePageCache* _s_instance;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 9bf0a361cd..5e0d4f4645 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -680,6 +680,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
             }
         }
     }
+    fragments_ctx->fragment_ids.push_back(fragment_instance_id);
 
     exec_state.reset(new FragmentExecState(fragments_ctx->query_id,
                                            params.params.fragment_instance_id, params.backend_num,
@@ -792,6 +793,21 @@ Status FragmentMgr::cancel(const TUniqueId& fragment_id, const PPlanFragmentCanc
     return Status::OK();
 }
 
+void FragmentMgr::cancel_query(const TUniqueId& query_id, const PPlanFragmentCancelReason& reason,
+                               const std::string& msg) {
+    std::vector<TUniqueId> cancel_fragment_ids;
+    {
+        std::lock_guard<std::mutex> lock(_lock);
+        auto ctx = _fragments_ctx_map.find(query_id);
+        if (ctx != _fragments_ctx_map.end()) {
+            cancel_fragment_ids = ctx->second->fragment_ids;
+        }
+    }
+    for (auto it : cancel_fragment_ids) {
+        cancel(it, reason, msg);
+    }
+}
+
 void FragmentMgr::cancel_worker() {
     LOG(INFO) << "FragmentMgr cancel worker start working.";
     do {
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 2246a42ac8..c5411eeed9 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -78,6 +78,9 @@ public:
     Status cancel(const TUniqueId& fragment_id, const PPlanFragmentCancelReason& reason,
                   const std::string& msg = "");
 
+    void cancel_query(const TUniqueId& query_id, const PPlanFragmentCancelReason& reason,
+                      const std::string& msg = "");
+
     void cancel_worker();
 
     virtual void debug(std::stringstream& ss);
diff --git a/be/src/runtime/memory/chunk_allocator.cpp b/be/src/runtime/memory/chunk_allocator.cpp
index 6ac8021648..a99b8b0873 100644
--- a/be/src/runtime/memory/chunk_allocator.cpp
+++ b/be/src/runtime/memory/chunk_allocator.cpp
@@ -120,6 +120,19 @@ public:
         _chunk_lists[idx].push_back(ptr);
     }
 
+    void clear() {
+        std::lock_guard<SpinLock> l(_lock);
+        for (int i = 0; i < 64; ++i) {
+            if (_chunk_lists[i].empty()) {
+                continue;
+            }
+            for (auto ptr : _chunk_lists[i]) {
+                ::free(ptr);
+            }
+            std::vector<uint8_t*>().swap(_chunk_lists[i]);
+        }
+    }
+
 private:
     SpinLock _lock;
     std::vector<std::vector<uint8_t*>> _chunk_lists;
@@ -256,4 +269,11 @@ void ChunkAllocator::free(uint8_t* data, size_t size) {
     free(chunk);
 }
 
+void ChunkAllocator::clear() {
+    for (int i = 0; i < _arenas.size(); ++i) {
+        _arenas[i]->clear();
+    }
+    THREAD_MEM_TRACKER_TRANSFER_FROM(_mem_tracker->consumption(), _mem_tracker.get());
+}
+
 } // namespace doris
diff --git a/be/src/runtime/memory/chunk_allocator.h b/be/src/runtime/memory/chunk_allocator.h
index de9ff70487..8602815a57 100644
--- a/be/src/runtime/memory/chunk_allocator.h
+++ b/be/src/runtime/memory/chunk_allocator.h
@@ -72,6 +72,10 @@ public:
     // otherwise the capacity of chunk allocator will be wrong.
     void free(uint8_t* data, size_t size);
 
+    void clear();
+
+    int64_t mem_consumption() { return _reserved_bytes; }
+
 private:
     ChunkAllocator(size_t reserve_limit);
 
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 767595d4cf..a228dcf5a7 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -20,8 +20,10 @@
 #include <fmt/format.h>
 
 #include <boost/stacktrace.hpp>
+#include <queue>
 
 #include "gutil/once.h"
+#include "runtime/fragment_mgr.h"
 #include "runtime/runtime_state.h"
 #include "runtime/thread_context.h"
 #include "util/pretty_printer.h"
@@ -238,42 +240,78 @@ Status MemTrackerLimiter::fragment_mem_limit_exceeded(RuntimeState* state, const
     return Status::MemoryLimitExceeded(failed_msg);
 }
 
-// TODO(zxy) More observable methods
-// /// Logs the usage of 'limit' number of queries based on maximum total memory
-// /// consumption.
-// std::string MemTracker::LogTopNQueries(int limit) {
-//     if (limit == 0) return "";
-//     priority_queue<pair<int64_t, string>, std::vector<pair<int64_t, string>>,
-//                    std::greater<pair<int64_t, string>>>
-//             min_pq;
-//     GetTopNQueries(min_pq, limit);
-//     std::vector<string> usage_strings(min_pq.size());
-//     while (!min_pq.empty()) {
-//         usage_strings.push_back(min_pq.top().second);
-//         min_pq.pop();
-//     }
-//     std::reverse(usage_strings.begin(), usage_strings.end());
-//     return join(usage_strings, "\n");
-// }
+int64_t MemTrackerLimiter::free_top_query(int64_t min_free_mem) {
+    std::priority_queue<std::pair<int64_t, std::string>,
+                        std::vector<std::pair<int64_t, std::string>>,
+                        std::greater<std::pair<int64_t, std::string>>>
+            min_pq;
+    // After greater than min_free_mem, will not be modified.
+    int64_t prepare_free_mem = 0;
 
-// /// Helper function for LogTopNQueries that iterates through the MemTracker hierarchy
-// /// and populates 'min_pq' with 'limit' number of elements (that contain state related
-// /// to query MemTrackers) based on maximum total memory consumption.
-// void MemTracker::GetTopNQueries(
-//         priority_queue<pair<int64_t, string>, std::vector<pair<int64_t, string>>,
-//                        greater<pair<int64_t, string>>>& min_pq,
-//         int limit) {
-//     list<weak_ptr<MemTracker>> children;
-//     {
-//         lock_guard<SpinLock> l(child_trackers_lock_);
-//         children = child_trackers_;
-//     }
-//     for (const auto& child_weak : children) {
-//         shared_ptr<MemTracker> child = child_weak.lock();
-//         if (child) {
-//             child->GetTopNQueries(min_pq, limit);
-//         }
-//     }
-// }
+    auto label_to_queryid = [&](const std::string& label) -> TUniqueId {
+        auto queryid = split(label, "#Id=")[1];
+        TUniqueId querytid;
+        parse_id(queryid, &querytid);
+        return querytid;
+    };
+
+    auto cancel_top_query = [&](auto min_pq, auto label_to_queryid) -> int64_t {
+        std::vector<std::string> usage_strings;
+        bool had_cancel = false;
+        int64_t freed_mem = 0;
+        while (!min_pq.empty()) {
+            TUniqueId cancelled_queryid = label_to_queryid(min_pq.top().second);
+            ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
+                    cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
+                    fmt::format("Process has no memory available, cancel top memory usage query: "
+                                "query memory tracker <{}> consumption {}, backend {} "
+                                "process memory used {} exceed limit {} or sys mem available {} "
+                                "less than low water mark {}. Execute again after enough memory, "
+                                "details see be.INFO.",
+                                min_pq.top().second, print_bytes(min_pq.top().first),
+                                BackendOptions::get_localhost(), PerfCounters::get_vm_rss_str(),
+                                MemInfo::mem_limit_str(), MemInfo::sys_mem_available_str(),
+                                print_bytes(MemInfo::sys_mem_available_low_water_mark())));
+
+            freed_mem += min_pq.top().first;
+            usage_strings.push_back(fmt::format("{} memory usage {} Bytes", min_pq.top().second,
+                                                min_pq.top().first));
+            had_cancel = true;
+            min_pq.pop();
+        }
+        if (had_cancel) {
+            LOG(INFO) << "Process GC Free Top Memory Usage Query: " << join(usage_strings, ",");
+        }
+        return freed_mem;
+    };
+
+    for (unsigned i = 1; i < mem_tracker_limiter_pool.size(); ++i) {
+        std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock);
+        for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
+            if (tracker->type() == Type::QUERY) {
+                if (tracker->consumption() > min_free_mem) {
+                    std::priority_queue<std::pair<int64_t, std::string>,
+                                        std::vector<std::pair<int64_t, std::string>>,
+                                        std::greater<std::pair<int64_t, std::string>>>
+                            min_pq_null;
+                    std::swap(min_pq, min_pq_null);
+                    min_pq.push(
+                            pair<int64_t, std::string>(tracker->consumption(), tracker->label()));
+                    return cancel_top_query(min_pq, label_to_queryid);
+                } else if (tracker->consumption() + prepare_free_mem < min_free_mem) {
+                    min_pq.push(
+                            pair<int64_t, std::string>(tracker->consumption(), tracker->label()));
+                    prepare_free_mem += tracker->consumption();
+                } else if (tracker->consumption() > min_pq.top().first) {
+                    // No need to modify prepare_free_mem, prepare_free_mem will always be greater than min_free_mem.
+                    min_pq.push(
+                            pair<int64_t, std::string>(tracker->consumption(), tracker->label()));
+                    min_pq.pop();
+                }
+            }
+        }
+    }
+    return cancel_top_query(min_pq, label_to_queryid);
+}
 
 } // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h
index dfba608144..6415510315 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -144,14 +144,18 @@ public:
     Status fragment_mem_limit_exceeded(RuntimeState* state, const std::string& msg,
                                        int64_t failed_allocation_size = 0);
 
+    // Start canceling from the query with the largest memory usage until the memory of min_free_mem size is released.
+    static int64_t free_top_query(int64_t min_free_mem);
+
     static std::string process_mem_log_str() {
         return fmt::format(
                 "physical memory {}, process memory used {} limit {}, sys mem available {} low "
-                "water mark {}",
+                "water mark {}, refresh interval memory growth {} B",
                 PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES),
                 PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(),
                 MemInfo::sys_mem_available_str(),
-                PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES));
+                PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES),
+                MemInfo::refresh_interval_memory_growth);
     }
 
     std::string debug_string() {
@@ -169,7 +173,7 @@ private:
     // Increases consumption of this tracker by 'bytes' only if will not exceeding limit.
     // Returns true if the consumption was successfully updated.
     WARN_UNUSED_RESULT
-    bool try_consume(int64_t bytes, std::string& failed_msg);
+    bool try_consume(int64_t bytes, std::string& failed_msg, bool& is_process_exceed);
 
     // When the accumulated untracked memory value exceeds the upper limit,
     // the current value is returned and set to 0.
@@ -232,7 +236,8 @@ inline void MemTrackerLimiter::cache_consume(int64_t bytes) {
     consume(consume_bytes);
 }
 
-inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_msg) {
+inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_msg,
+                                           bool& is_process_exceed) {
     if (bytes <= 0) {
         release(-bytes);
         failed_msg = std::string();
@@ -245,6 +250,7 @@ inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_ms
 
     if (sys_mem_exceed_limit_check(bytes)) {
         failed_msg = process_limit_exceeded_errmsg_str(bytes);
+        is_process_exceed = true;
         return false;
     }
 
@@ -253,6 +259,7 @@ inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_ms
     } else {
         if (!_consumption->try_add(bytes, _limit)) {
             failed_msg = tracker_limit_exceeded_errmsg_str(bytes, this);
+            is_process_exceed = false;
             return false;
         }
     }
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index f197cad79a..d45d6b8cb5 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -17,6 +17,9 @@
 
 #include "runtime/memory/thread_mem_tracker_mgr.h"
 
+#include <chrono>
+#include <thread>
+
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
 #include "service/backend_options.h"
@@ -49,14 +52,24 @@ void ThreadMemTrackerMgr::cancel_fragment() {
     _check_limit = false; // Make sure it will only be canceled once
 }
 
-void ThreadMemTrackerMgr::exceeded() {
+void ThreadMemTrackerMgr::exceeded(int64_t size) {
     if (_cb_func != nullptr) {
         _cb_func();
     }
     _limiter_tracker_raw->print_log_usage(_exceed_mem_limit_msg);
 
     if (is_attach_query()) {
-        // TODO wait gc
+        if (_is_process_exceed && _wait_gc) {
+            int64_t wait_milliseconds = config::thread_wait_gc_max_milliseconds;
+            while (wait_milliseconds > 0) {
+                std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Check every 100 ms.
+                if (!MemTrackerLimiter::sys_mem_exceed_limit_check(size)) {
+                    MemInfo::refresh_interval_memory_growth += size;
+                    return; // Process memory is sufficient, no cancel query.
+                }
+                wait_milliseconds -= 100;
+            }
+        }
         cancel_fragment();
     }
 }
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 86a5c51d6e..151f79cfcb 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -104,6 +104,7 @@ public:
     void set_check_limit(bool check_limit) { _check_limit = check_limit; }
     std::string exceed_mem_limit_msg() { return _exceed_mem_limit_msg; }
     void clear_exceed_mem_limit_msg() { _exceed_mem_limit_msg = ""; }
+    void disable_wait_gc() { _wait_gc = false; }
 
     std::string print_debug_string() {
         fmt::memory_buffer consumer_tracker_buf;
@@ -119,7 +120,7 @@ public:
 
 private:
     void cancel_fragment();
-    void exceeded();
+    void exceeded(int64_t size);
 
     void save_exceed_mem_limit_msg() {
         _exceed_mem_limit_msg = _limiter_tracker_raw->mem_limit_exceeded(
@@ -138,6 +139,8 @@ private:
 
     std::string _failed_consume_msg = std::string();
     std::string _exceed_mem_limit_msg = std::string();
+    bool _is_process_exceed = false;
+    bool _wait_gc = true;
 
     std::shared_ptr<MemTrackerLimiter> _limiter_tracker;
     MemTrackerLimiter* _limiter_tracker_raw = nullptr;
@@ -216,10 +219,11 @@ inline bool ThreadMemTrackerMgr::flush_untracked_mem() {
     old_untracked_mem = _untracked_mem;
     if (_count_scope_mem) _scope_mem += _untracked_mem;
     if (CheckLimit) {
-        if (!_limiter_tracker_raw->try_consume(old_untracked_mem, _failed_consume_msg)) {
+        if (!_limiter_tracker_raw->try_consume(old_untracked_mem, _failed_consume_msg,
+                                               _is_process_exceed)) {
             if (Force) _limiter_tracker_raw->consume(old_untracked_mem);
             save_exceed_mem_limit_msg();
-            exceeded();
+            exceeded(old_untracked_mem);
             if (!Force) return false;
         }
     } else {
diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp
index 3a9206f1fa..bc46027bec 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -276,6 +276,7 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
         SCOPED_CPU_TIMER(_fragment_cpu_timer);
         SCOPED_TIMER(profile()->total_time_counter());
         RETURN_IF_ERROR(_plan->open(_runtime_state.get()));
+        RETURN_IF_CANCELLED(_runtime_state);
     }
     if (_sink == nullptr) {
         return Status::OK();
@@ -289,6 +290,7 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
         auto sink_send_span_guard = Defer {[this]() { this->_sink->end_send_span(); }};
         while (true) {
             doris::vectorized::Block* block;
+            RETURN_IF_CANCELLED(_runtime_state);
 
             {
                 SCOPED_CPU_TIMER(_fragment_cpu_timer);
diff --git a/be/src/runtime/query_fragments_ctx.h b/be/src/runtime/query_fragments_ctx.h
index fe327d0e7c..f3565b9f30 100644
--- a/be/src/runtime/query_fragments_ctx.h
+++ b/be/src/runtime/query_fragments_ctx.h
@@ -128,6 +128,8 @@ public:
     // MemTracker that is shared by all fragment instances running on this host.
     std::shared_ptr<MemTrackerLimiter> query_mem_tracker;
 
+    std::vector<TUniqueId> fragment_ids;
+
 private:
     ExecEnv* _exec_env;
     DateTimeValue _start_time;
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index cdc8d483f2..8b7116aabc 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -205,6 +205,8 @@ static void pthread_attach_bthread() {
         // 2. A pthread switch occurs. Because the pthread switch cannot be accurately identified at the moment.
         // So tracker call reset 0 like reuses btls.
         bthread_context = new ThreadContext;
+        // The brpc server should respond as quickly as possible.
+        bthread_context->thread_mem_tracker_mgr->disable_wait_gc();
         // set the data so that next time bthread_getspecific in the thread returns the data.
         CHECK_EQ(0, bthread_setspecific(btls_key, bthread_context));
     }
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index 6a2f57cc4b..453c1cd2bd 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -34,6 +34,7 @@
 
 #include "common/config.h"
 #include "gutil/strings/split.h"
+#include "olap/page_cache.h"
 #include "util/cgroup_util.h"
 #include "util/parse_util.h"
 #include "util/pretty_printer.h"
@@ -51,12 +52,15 @@ int64_t MemInfo::_s_allocator_cache_mem = 0;
 std::string MemInfo::_s_allocator_cache_mem_str = "";
 int64_t MemInfo::_s_virtual_memory_used = 0;
 int64_t MemInfo::_s_proc_mem_no_allocator_cache = -1;
+std::atomic<int64_t> MemInfo::refresh_interval_memory_growth = 0;
 
 static std::unordered_map<std::string, int64_t> _mem_info_bytes;
 int64_t MemInfo::_s_sys_mem_available = 0;
 std::string MemInfo::_s_sys_mem_available_str = "";
 int64_t MemInfo::_s_sys_mem_available_low_water_mark = 0;
 int64_t MemInfo::_s_sys_mem_available_warning_water_mark = 0;
+int64_t MemInfo::_s_process_minor_gc_size = -1;
+int64_t MemInfo::_s_process_full_gc_size = -1;
 
 void MemInfo::refresh_allocator_mem() {
 #if defined(ADDRESS_SANITIZER) || defined(LEAK_SANITIZER) || defined(THREAD_SANITIZER)
@@ -85,6 +89,42 @@ void MemInfo::refresh_allocator_mem() {
 #endif
 }
 
+void MemInfo::process_minor_gc() {
+    // TODO, free more cache, and should free a certain percentage of capacity, not all.
+    int64_t freed_mem = 0;
+    Defer defer {[&]() {
+        LOG(INFO) << fmt::format("Process Minor GC Free Memory {} Bytes", freed_mem);
+    }};
+
+    freed_mem += ChunkAllocator::instance()->mem_consumption();
+    ChunkAllocator::instance()->clear();
+    if (freed_mem > _s_process_minor_gc_size) {
+        return;
+    }
+    freed_mem +=
+            StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE);
+    StoragePageCache::instance()->prune(segment_v2::DATA_PAGE);
+}
+
+void MemInfo::process_full_gc() {
+    int64_t freed_mem = 0;
+    Defer defer {
+            [&]() { LOG(INFO) << fmt::format("Process Full GC Free Memory {} Bytes", freed_mem); }};
+
+    freed_mem +=
+            StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE);
+    StoragePageCache::instance()->prune(segment_v2::DATA_PAGE);
+    if (freed_mem > _s_process_full_gc_size) {
+        return;
+    }
+    freed_mem += ChunkAllocator::instance()->mem_consumption();
+    ChunkAllocator::instance()->clear();
+    if (freed_mem > _s_process_full_gc_size) {
+        return;
+    }
+    freed_mem += MemTrackerLimiter::free_top_query(_s_process_full_gc_size - freed_mem);
+}
+
 #ifndef __APPLE__
 void MemInfo::refresh_proc_meminfo() {
     std::ifstream meminfo("/proc/meminfo", std::ios::in);
@@ -143,6 +183,11 @@ void MemInfo::init() {
     _s_mem_limit_str = PrettyPrinter::print(_s_mem_limit, TUnit::BYTES);
     _s_soft_mem_limit = _s_mem_limit * config::soft_mem_limit_frac;
 
+    _s_process_minor_gc_size =
+            ParseUtil::parse_mem_spec(config::process_minor_gc_size, -1, _s_mem_limit, &is_percent);
+    _s_process_full_gc_size =
+            ParseUtil::parse_mem_spec(config::process_full_gc_size, -1, _s_mem_limit, &is_percent);
+
     std::string line;
     int64_t _s_vm_min_free_kbytes = 0;
     std::ifstream vminfo("/proc/sys/vm/min_free_kbytes", std::ios::in);
diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h
index 52281f508e..bd76c6124c 100644
--- a/be/src/util/mem_info.h
+++ b/be/src/util/mem_info.h
@@ -53,7 +53,9 @@ public:
 
     static void refresh_proc_meminfo();
 
-    static inline int64_t sys_mem_available() { return _s_sys_mem_available; }
+    static inline int64_t sys_mem_available() {
+        return _s_sys_mem_available - refresh_interval_memory_growth;
+    }
     static inline std::string sys_mem_available_str() { return _s_sys_mem_available_str; }
     static inline int64_t sys_mem_available_low_water_mark() {
         return _s_sys_mem_available_low_water_mark;
@@ -83,7 +85,9 @@ public:
     static inline size_t allocator_virtual_mem() { return _s_virtual_memory_used; }
     static inline size_t allocator_cache_mem() { return _s_allocator_cache_mem; }
     static inline std::string allocator_cache_mem_str() { return _s_allocator_cache_mem_str; }
-    static inline int64_t proc_mem_no_allocator_cache() { return _s_proc_mem_no_allocator_cache; }
+    static inline int64_t proc_mem_no_allocator_cache() {
+        return _s_proc_mem_no_allocator_cache + refresh_interval_memory_growth;
+    }
 
     // Tcmalloc property `generic.total_physical_bytes` records the total length of the virtual memory
     // obtained by the process malloc, not the physical memory actually used by the process in the OS.
@@ -92,6 +96,7 @@ public:
     static inline void refresh_proc_mem_no_allocator_cache() {
         _s_proc_mem_no_allocator_cache =
                 PerfCounters::get_vm_rss() - static_cast<int64_t>(_s_allocator_cache_mem);
+        refresh_interval_memory_growth = 0;
     }
 
     static inline int64_t mem_limit() {
@@ -109,6 +114,13 @@ public:
 
     static std::string debug_string();
 
+    static void process_minor_gc();
+    static void process_full_gc();
+
+    // It is only used after the memory limit is exceeded. When multiple threads are waiting for the available memory of the process,
+    // avoid multiple threads starting at the same time and causing OOM.
+    static std::atomic<int64_t> refresh_interval_memory_growth;
+
 private:
     static bool _s_initialized;
     static int64_t _s_physical_mem;
@@ -125,6 +137,8 @@ private:
     static std::string _s_sys_mem_available_str;
     static int64_t _s_sys_mem_available_low_water_mark;
     static int64_t _s_sys_mem_available_warning_water_mark;
+    static int64_t _s_process_minor_gc_size;
+    static int64_t _s_process_full_gc_size;
 };
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 03/08: [enhancement](memory) Support query memroy overcommit #14948

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit a117c4d05097f90df2022cd5ae9d33e58aed3a97
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Fri Dec 9 14:09:05 2022 +0800

    [enhancement](memory) Support query memroy overcommit #14948
    
    Add conf enable_query_memroy_overcommit
    
    If true, when the process does not exceed the soft mem limit, the query memory will not be limited; when the process memory exceeds the soft mem limit, the query with the largest ratio between the currently used memory and the exec_mem_limit will be canceled.
    
    If false, cancel query when the memory used exceeds exec_mem_limit, same as before.
---
 be/src/common/config.h                        |  6 ++
 be/src/runtime/memory/mem_tracker_limiter.cpp | 80 ++++++++++++++++++++++-----
 be/src/runtime/memory/mem_tracker_limiter.h   | 21 +++++--
 be/src/util/mem_info.cpp                      |  6 +-
 4 files changed, 93 insertions(+), 20 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 86b278b76b..3bc843e422 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -72,6 +72,12 @@ CONF_Int64(max_sys_mem_available_low_water_mark_bytes, "1717986918");
 CONF_mString(process_minor_gc_size, "10%");
 CONF_mString(process_full_gc_size, "20%");
 
+// If true, when the process does not exceed the soft mem limit, the query memory will not be limited;
+// when the process memory exceeds the soft mem limit, the query with the largest ratio between the currently
+// used memory and the exec_mem_limit will be canceled.
+// If false, cancel query when the memory used exceeds exec_mem_limit, same as before.
+CONF_mBool(enable_query_memroy_overcommit, "true");
+
 // The maximum time a thread waits for a full GC. Currently only query will wait for full gc.
 CONF_mInt32(thread_wait_gc_max_milliseconds, "1000");
 
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp
index a228dcf5a7..a37a0be0f9 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -28,7 +28,6 @@
 #include "runtime/thread_context.h"
 #include "util/pretty_printer.h"
 #include "util/stack_util.h"
-#include "util/string_util.h"
 
 namespace doris {
 
@@ -240,7 +239,7 @@ Status MemTrackerLimiter::fragment_mem_limit_exceeded(RuntimeState* state, const
     return Status::MemoryLimitExceeded(failed_msg);
 }
 
-int64_t MemTrackerLimiter::free_top_query(int64_t min_free_mem) {
+int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem) {
     std::priority_queue<std::pair<int64_t, std::string>,
                         std::vector<std::pair<int64_t, std::string>>,
                         std::greater<std::pair<int64_t, std::string>>>
@@ -248,16 +247,8 @@ int64_t MemTrackerLimiter::free_top_query(int64_t min_free_mem) {
     // After greater than min_free_mem, will not be modified.
     int64_t prepare_free_mem = 0;
 
-    auto label_to_queryid = [&](const std::string& label) -> TUniqueId {
-        auto queryid = split(label, "#Id=")[1];
-        TUniqueId querytid;
-        parse_id(queryid, &querytid);
-        return querytid;
-    };
-
-    auto cancel_top_query = [&](auto min_pq, auto label_to_queryid) -> int64_t {
+    auto cancel_top_query = [&](auto min_pq) -> int64_t {
         std::vector<std::string> usage_strings;
-        bool had_cancel = false;
         int64_t freed_mem = 0;
         while (!min_pq.empty()) {
             TUniqueId cancelled_queryid = label_to_queryid(min_pq.top().second);
@@ -276,10 +267,9 @@ int64_t MemTrackerLimiter::free_top_query(int64_t min_free_mem) {
             freed_mem += min_pq.top().first;
             usage_strings.push_back(fmt::format("{} memory usage {} Bytes", min_pq.top().second,
                                                 min_pq.top().first));
-            had_cancel = true;
             min_pq.pop();
         }
-        if (had_cancel) {
+        if (!usage_strings.empty()) {
             LOG(INFO) << "Process GC Free Top Memory Usage Query: " << join(usage_strings, ",");
         }
         return freed_mem;
@@ -297,7 +287,7 @@ int64_t MemTrackerLimiter::free_top_query(int64_t min_free_mem) {
                     std::swap(min_pq, min_pq_null);
                     min_pq.push(
                             pair<int64_t, std::string>(tracker->consumption(), tracker->label()));
-                    return cancel_top_query(min_pq, label_to_queryid);
+                    return cancel_top_query(min_pq);
                 } else if (tracker->consumption() + prepare_free_mem < min_free_mem) {
                     min_pq.push(
                             pair<int64_t, std::string>(tracker->consumption(), tracker->label()));
@@ -311,7 +301,67 @@ int64_t MemTrackerLimiter::free_top_query(int64_t min_free_mem) {
             }
         }
     }
-    return cancel_top_query(min_pq, label_to_queryid);
+    return cancel_top_query(min_pq);
+}
+
+int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem) {
+    std::priority_queue<std::pair<int64_t, std::string>,
+                        std::vector<std::pair<int64_t, std::string>>,
+                        std::greater<std::pair<int64_t, std::string>>>
+            min_pq;
+    std::unordered_map<std::string, int64_t> query_consumption;
+
+    for (unsigned i = 1; i < mem_tracker_limiter_pool.size(); ++i) {
+        std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock);
+        for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
+            if (tracker->type() == Type::QUERY) {
+                int64_t overcommit_ratio =
+                        (static_cast<double>(tracker->consumption()) / tracker->limit()) * 10000;
+                if (overcommit_ratio == 0) { // Small query does not cancel
+                    continue;
+                }
+                min_pq.push(pair<int64_t, std::string>(overcommit_ratio, tracker->label()));
+                query_consumption[tracker->label()] = tracker->consumption();
+            }
+        }
+    }
+
+    std::priority_queue<std::pair<int64_t, std::string>> max_pq;
+    // Min-heap to Max-heap.
+    while (!min_pq.empty()) {
+        max_pq.push(min_pq.top());
+        min_pq.pop();
+    }
+
+    std::vector<std::string> usage_strings;
+    int64_t freed_mem = 0;
+    while (!max_pq.empty()) {
+        TUniqueId cancelled_queryid = label_to_queryid(max_pq.top().second);
+        int64_t query_mem = query_consumption[max_pq.top().second];
+        ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
+                cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
+                fmt::format("Process has no memory available, cancel top memory usage query: "
+                            "query memory tracker <{}> consumption {}, backend {} "
+                            "process memory used {} exceed limit {} or sys mem available {} "
+                            "less than low water mark {}. Execute again after enough memory, "
+                            "details see be.INFO.",
+                            max_pq.top().second, print_bytes(query_mem),
+                            BackendOptions::get_localhost(), PerfCounters::get_vm_rss_str(),
+                            MemInfo::mem_limit_str(), MemInfo::sys_mem_available_str(),
+                            print_bytes(MemInfo::sys_mem_available_low_water_mark())));
+
+        usage_strings.push_back(fmt::format("{} memory usage {} Bytes, overcommit ratio: {}",
+                                            max_pq.top().second, query_mem, max_pq.top().first));
+        freed_mem += query_mem;
+        if (freed_mem > min_free_mem) {
+            break;
+        }
+        max_pq.pop();
+    }
+    if (!usage_strings.empty()) {
+        LOG(INFO) << "Process GC Free Top Memory Overcommit Query: " << join(usage_strings, ",");
+    }
+    return freed_mem;
 }
 
 } // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h
index 6415510315..617a7ffdee 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -25,6 +25,7 @@
 #include "service/backend_options.h"
 #include "util/mem_info.h"
 #include "util/perf_counters.h"
+#include "util/string_util.h"
 
 namespace doris {
 
@@ -144,8 +145,18 @@ public:
     Status fragment_mem_limit_exceeded(RuntimeState* state, const std::string& msg,
                                        int64_t failed_allocation_size = 0);
 
-    // Start canceling from the query with the largest memory usage until the memory of min_free_mem size is released.
-    static int64_t free_top_query(int64_t min_free_mem);
+    // Start canceling from the query with the largest memory usage until the memory of min_free_mem size is freed.
+    static int64_t free_top_memory_query(int64_t min_free_mem);
+    // Start canceling from the query with the largest memory overcommit ratio until the memory
+    // of min_free_mem size is freed.
+    static int64_t free_top_overcommit_query(int64_t min_free_mem);
+    // only for Type::QUERY or Type::LOAD.
+    static TUniqueId label_to_queryid(const std::string& label) {
+        auto queryid = split(label, "#Id=")[1];
+        TUniqueId querytid;
+        parse_id(queryid, &querytid);
+        return querytid;
+    };
 
     static std::string process_mem_log_str() {
         return fmt::format(
@@ -254,7 +265,7 @@ inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_ms
         return false;
     }
 
-    if (_limit < 0) {
+    if (_limit < 0 || (_type == Type::QUERY && config::enable_query_memroy_overcommit)) {
         _consumption->add(bytes); // No limit at this tracker.
     } else {
         if (!_consumption->try_add(bytes, _limit)) {
@@ -271,7 +282,9 @@ inline Status MemTrackerLimiter::check_limit(int64_t bytes) {
     if (sys_mem_exceed_limit_check(bytes)) {
         return Status::MemoryLimitExceeded(process_limit_exceeded_errmsg_str(bytes));
     }
-    if (bytes <= 0) return Status::OK();
+    if (bytes <= 0 || (_type == Type::QUERY && config::enable_query_memroy_overcommit)) {
+        return Status::OK();
+    }
     if (_limit > 0 && _consumption->current_value() + bytes > _limit) {
         return Status::MemoryLimitExceeded(tracker_limit_exceeded_errmsg_str(bytes, this));
     }
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index 453c1cd2bd..55500feea7 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -104,6 +104,10 @@ void MemInfo::process_minor_gc() {
     freed_mem +=
             StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE);
     StoragePageCache::instance()->prune(segment_v2::DATA_PAGE);
+    if (config::enable_query_memroy_overcommit) {
+        freed_mem +=
+                MemTrackerLimiter::free_top_overcommit_query(_s_process_full_gc_size - freed_mem);
+    }
 }
 
 void MemInfo::process_full_gc() {
@@ -122,7 +126,7 @@ void MemInfo::process_full_gc() {
     if (freed_mem > _s_process_full_gc_size) {
         return;
     }
-    freed_mem += MemTrackerLimiter::free_top_query(_s_process_full_gc_size - freed_mem);
+    freed_mem += MemTrackerLimiter::free_top_memory_query(_s_process_full_gc_size - freed_mem);
 }
 
 #ifndef __APPLE__


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 07/08: [fix](information-schema) fix bug that query tables in information_schema db will return error #15336

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 7fdc001add8870a412172631009995e56dbc7a83
Author: Mingyu Chen <mo...@163.com>
AuthorDate: Sun Dec 25 10:09:40 2022 +0800

    [fix](information-schema) fix bug that query tables in information_schema db will return error #15336
---
 .../apache/doris/alter/SchemaChangeHandler.java    |  8 ++---
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |  6 ++--
 .../java/org/apache/doris/analysis/InsertStmt.java |  2 +-
 .../main/java/org/apache/doris/catalog/Column.java |  8 ++---
 .../java/org/apache/doris/catalog/DatabaseIf.java  | 42 ++++++++++++++++++++++
 .../java/org/apache/doris/catalog/OlapTable.java   |  1 +
 .../java/org/apache/doris/catalog/Partition.java   |  2 ++
 .../java/org/apache/doris/catalog/TableIf.java     | 14 ++++++++
 .../catalog/external/JdbcExternalDatabase.java     |  1 -
 .../java/org/apache/doris/common/FeNameFormat.java |  2 +-
 .../org/apache/doris/datasource/CatalogIf.java     | 14 ++++++++
 .../apache/doris/datasource/ExternalCatalog.java   | 14 ++++++--
 .../src/main/java/org/apache/doris/load/Load.java  |  8 ++---
 .../apache/doris/load/update/UpdatePlanner.java    |  4 +--
 .../apache/doris/service/FrontendServiceImpl.java  | 21 ++++++-----
 .../org/apache/doris/task/CreateReplicaTask.java   |  4 +--
 .../apache/doris/planner/UpdatePlannerTest.java    |  2 +-
 17 files changed, 120 insertions(+), 33 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index a14b03aa31..1a0489ea58 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -111,7 +111,7 @@ public class SchemaChangeHandler extends AlterHandler {
     private static final Logger LOG = LogManager.getLogger(SchemaChangeHandler.class);
 
     // all shadow indexes should have this prefix in name
-    public static final String SHADOW_NAME_PRFIX = "__doris_shadow_";
+    public static final String SHADOW_NAME_PREFIX = "__doris_shadow_";
 
     public static final int MAX_ACTIVE_SCHEMA_CHANGE_JOB_V2_SIZE = 10;
 
@@ -681,7 +681,7 @@ public class SchemaChangeHandler extends AlterHandler {
              * And if the column type is not changed, the same column name is still to the same column type,
              * so no need to add prefix.
              */
-            modColumn.setName(SHADOW_NAME_PRFIX + modColumn.getName());
+            modColumn.setName(SHADOW_NAME_PREFIX + modColumn.getName());
         }
     }
 
@@ -1470,7 +1470,7 @@ public class SchemaChangeHandler extends AlterHandler {
             while (currentSchemaHash == newSchemaHash) {
                 newSchemaHash = Util.generateSchemaHash();
             }
-            String newIndexName = SHADOW_NAME_PRFIX + olapTable.getIndexNameById(originIndexId);
+            String newIndexName = SHADOW_NAME_PREFIX + olapTable.getIndexNameById(originIndexId);
             short newShortKeyColumnCount = indexIdToShortKeyColumnCount.get(originIndexId);
             long shadowIndexId = idGeneratorBuffer.getNextId();
 
@@ -2252,7 +2252,7 @@ public class SchemaChangeHandler extends AlterHandler {
 
         for (Map.Entry<Long, List<Column>> entry : changedIndexIdToSchema.entrySet()) {
             long originIndexId = entry.getKey();
-            String newIndexName = SHADOW_NAME_PRFIX + olapTable.getIndexNameById(originIndexId);
+            String newIndexName = SHADOW_NAME_PREFIX + olapTable.getIndexNameById(originIndexId);
             MaterializedIndexMeta currentIndexMeta = olapTable.getIndexMetaByIndexId(originIndexId);
             // 1. get new schema version/schema version hash, short key column count
             int currentSchemaVersion = currentIndexMeta.getSchemaVersion();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 72b9457a3f..51321cac8e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -424,9 +424,9 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                         destSlotDesc.setColumn(column);
                         destSlotDesc.setIsNullable(column.isAllowNull());
 
-                        if (indexColumnMap.containsKey(SchemaChangeHandler.SHADOW_NAME_PRFIX + column.getName())) {
+                        if (indexColumnMap.containsKey(SchemaChangeHandler.SHADOW_NAME_PREFIX + column.getName())) {
                             Column newColumn = indexColumnMap
-                                    .get(SchemaChangeHandler.SHADOW_NAME_PRFIX + column.getName());
+                                    .get(SchemaChangeHandler.SHADOW_NAME_PREFIX + column.getName());
                             if (newColumn.getType() != column.getType()) {
                                 try {
                                     SlotRef slot = new SlotRef(destSlotDesc);
@@ -860,7 +860,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
             info.add(TimeUtils.longToTimeStringWithms(createTimeMs));
             info.add(TimeUtils.longToTimeStringWithms(finishedTimeMs));
             // only show the origin index name
-            info.add(indexIdToName.get(shadowIndexId).substring(SchemaChangeHandler.SHADOW_NAME_PRFIX.length()));
+            info.add(indexIdToName.get(shadowIndexId).substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length()));
             info.add(shadowIndexId);
             info.add(entry.getValue());
             info.add(indexSchemaVersionAndHashMap.get(shadowIndexId).toString());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
index 0a3ede083c..44140b24e9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
@@ -444,7 +444,7 @@ public class InsertStmt extends DdlStmt {
          */
         List<Pair<Integer, Column>> origColIdxsForExtendCols = Lists.newArrayList();
         for (Column column : targetTable.getFullSchema()) {
-            if (column.isNameWithPrefix(SchemaChangeHandler.SHADOW_NAME_PRFIX)) {
+            if (column.isNameWithPrefix(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
                 String origName = Column.removeNamePrefix(column.getName());
                 for (int i = 0; i < targetColumns.size(); i++) {
                     if (targetColumns.get(i).nameEquals(origName, false)) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
index 4432db3335..cd7cde3cbc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
@@ -495,8 +495,8 @@ public class Column implements Writable, GsonPostProcessable {
     }
 
     public static String removeNamePrefix(String colName) {
-        if (colName.startsWith(SchemaChangeHandler.SHADOW_NAME_PRFIX)) {
-            return colName.substring(SchemaChangeHandler.SHADOW_NAME_PRFIX.length());
+        if (colName.startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
+            return colName.substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length());
         }
         return colName;
     }
@@ -505,11 +505,11 @@ public class Column implements Writable, GsonPostProcessable {
         if (isShadowColumn(colName)) {
             return colName;
         }
-        return SchemaChangeHandler.SHADOW_NAME_PRFIX + colName;
+        return SchemaChangeHandler.SHADOW_NAME_PREFIX + colName;
     }
 
     public static boolean isShadowColumn(String colName) {
-        return colName.startsWith(SchemaChangeHandler.SHADOW_NAME_PRFIX);
+        return colName.startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX);
     }
 
     public Expr getDefineExpr() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java
index 9fad070fe9..80a7f61cc9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java
@@ -22,6 +22,11 @@ import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.MetaNotFoundException;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -35,6 +40,7 @@ import java.util.function.Function;
  * Maybe changed later.
  */
 public interface DatabaseIf<T extends TableIf> {
+    Logger LOG = LogManager.getLogger(DatabaseIf.class);
 
     void readLock();
 
@@ -64,18 +70,54 @@ public interface DatabaseIf<T extends TableIf> {
 
     List<T> getTables();
 
+    default List<T> getTablesOrEmpty() {
+        try {
+            return getTables();
+        } catch (Exception e) {
+            LOG.warn("failed to get tables for db {}", getFullName(), e);
+            return Lists.newArrayList();
+        }
+    }
+
     List<T> getTablesOnIdOrder();
 
     List<T> getViews();
 
+    default List<T> getViewsOrEmpty() {
+        try {
+            return getViews();
+        } catch (Exception e) {
+            LOG.warn("failed to get views for db {}", getFullName(), e);
+            return Lists.newArrayList();
+        }
+    }
+
     List<T> getTablesOnIdOrderIfExist(List<Long> tableIdList);
 
     List<T> getTablesOnIdOrderOrThrowException(List<Long> tableIdList) throws MetaNotFoundException;
 
     Set<String> getTableNamesWithLock();
 
+    default Set<String> getTableNamesOrEmptyWithLock() {
+        try {
+            return getTableNamesWithLock();
+        } catch (Exception e) {
+            LOG.warn("failed to get table names for db {}", getFullName(), e);
+            return Sets.newHashSet();
+        }
+    }
+
     T getTableNullable(String tableName);
 
+    default T getTableNullableIfException(String tableName) {
+        try {
+            return getTableNullable(tableName);
+        } catch (Exception e) {
+            LOG.warn("failed to get table {} in database {}", tableName, getFullName(), e);
+            return null;
+        }
+    }
+
     T getTableNullable(long tableId);
 
     default Optional<T> getTable(String tableName) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 80d08f4858..87ba47bb27 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -353,6 +353,7 @@ public class OlapTable extends Table {
         if (indexId != baseIndexId) {
             rebuildFullSchema();
         }
+        LOG.info("delete index info {} in table {}-{}", indexName, id, name);
         return true;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
index 16a298f69f..f74e1a35fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
@@ -197,8 +197,10 @@ public class Partition extends MetaObject implements Writable {
 
     public MaterializedIndex deleteRollupIndex(long indexId) {
         if (this.idToVisibleRollupIndex.containsKey(indexId)) {
+            LOG.info("delete visible rollup index {} in partition {}-{}", indexId, id, name);
             return idToVisibleRollupIndex.remove(indexId);
         } else {
+            LOG.info("delete shadow rollup index {} in partition {}-{}", indexId, id, name);
             return idToShadowIndex.remove(indexId);
         }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index f4b88af123..957c4d4d75 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -25,12 +25,17 @@ import org.apache.doris.statistics.AnalysisTaskScheduler;
 import org.apache.doris.statistics.BaseAnalysisTask;
 import org.apache.doris.thrift.TTableDescriptor;
 
+import com.google.common.collect.Lists;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 public interface TableIf {
+    Logger LOG = LogManager.getLogger(TableIf.class);
 
     void readLock();
 
@@ -72,6 +77,15 @@ public interface TableIf {
 
     List<Column> getBaseSchema();
 
+    default List<Column> getBaseSchemaOrEmpty() {
+        try {
+            return getBaseSchema();
+        } catch (Exception e) {
+            LOG.warn("failed to get base schema for table {}", getName(), e);
+            return Lists.newArrayList();
+        }
+    }
+
     List<Column> getBaseSchema(boolean full);
 
     void setNewFullSchema(List<Column> newSchema);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalDatabase.java
index 45d6b1a647..f0772d8f26 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalDatabase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalDatabase.java
@@ -128,7 +128,6 @@ public class JdbcExternalDatabase extends ExternalDatabase<JdbcExternalTable> im
         return Lists.newArrayList(idToTbl.values());
     }
 
-    // TODO(ftw): drew
     @Override
     public JdbcExternalTable getTableNullable(String tableName) {
         makeSureInitialized();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeNameFormat.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeNameFormat.java
index 4048d9f647..7afbff8f37 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeNameFormat.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeNameFormat.java
@@ -72,7 +72,7 @@ public class FeNameFormat {
             ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_COLUMN_NAME,
                     columnName, FeNameFormat.COLUMN_NAME_REGEX);
         }
-        if (columnName.startsWith(SchemaChangeHandler.SHADOW_NAME_PRFIX)) {
+        if (columnName.startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
             ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_COLUMN_NAME,
                     columnName, FeNameFormat.COLUMN_NAME_REGEX);
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java
index a22d72a361..a60dc8430e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java
@@ -23,6 +23,10 @@ import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.MetaNotFoundException;
 
+import com.google.common.collect.Lists;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -33,6 +37,7 @@ import javax.annotation.Nullable;
  * The interface of Catalog
  */
 public interface CatalogIf<T extends DatabaseIf> {
+    Logger LOG = LogManager.getLogger(CatalogIf.class);
 
     // Type of this catalog
     String getType();
@@ -44,6 +49,15 @@ public interface CatalogIf<T extends DatabaseIf> {
 
     List<String> getDbNames();
 
+    default List<String> getDbNamesOrEmpty() {
+        try {
+            return getDbNames();
+        } catch (Exception e) {
+            LOG.warn("failed to get db names in catalog {}", getName(), e);
+            return Lists.newArrayList();
+        }
+    }
+
     List<Long> getDbIds();
 
     @Nullable
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index cc0c2f0b2d..31a16850ec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -171,7 +171,12 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
     @Nullable
     @Override
     public ExternalDatabase getDbNullable(String dbName) {
-        makeSureInitialized();
+        try {
+            makeSureInitialized();
+        } catch (Exception e) {
+            LOG.warn("failed to get db {} in catalog {}", dbName, name, e);
+            return null;
+        }
         String realDbName = ClusterNamespace.getNameFromFullName(dbName);
         if (!dbNameToId.containsKey(realDbName)) {
             return null;
@@ -182,7 +187,12 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
     @Nullable
     @Override
     public ExternalDatabase getDbNullable(long dbId) {
-        makeSureInitialized();
+        try {
+            makeSureInitialized();
+        } catch (Exception e) {
+            LOG.warn("failed to get db {} in catalog {}", dbId, name, e);
+            return null;
+        }
         return idToDb.get(dbId);
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
index a779028388..5cc878301e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
@@ -585,8 +585,8 @@ public class Load {
             // so there will be a shadow column: '__doris_shadow_B'
             // So the final column mapping should looks like: (A, B, C, __doris_shadow_B = substitute(B));
             for (Column column : table.getFullSchema()) {
-                if (column.isNameWithPrefix(SchemaChangeHandler.SHADOW_NAME_PRFIX)) {
-                    String originCol = column.getNameWithoutPrefix(SchemaChangeHandler.SHADOW_NAME_PRFIX);
+                if (column.isNameWithPrefix(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
+                    String originCol = column.getNameWithoutPrefix(SchemaChangeHandler.SHADOW_NAME_PREFIX);
                     if (parsedColumnExprMap.containsKey(originCol)) {
                         Expr mappingExpr = parsedColumnExprMap.get(originCol);
                         if (mappingExpr != null) {
@@ -749,11 +749,11 @@ public class Load {
     public static List<ImportColumnDesc> getSchemaChangeShadowColumnDesc(Table tbl, Map<String, Expr> columnExprMap) {
         List<ImportColumnDesc> shadowColumnDescs = Lists.newArrayList();
         for (Column column : tbl.getFullSchema()) {
-            if (!column.isNameWithPrefix(SchemaChangeHandler.SHADOW_NAME_PRFIX)) {
+            if (!column.isNameWithPrefix(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
                 continue;
             }
 
-            String originCol = column.getNameWithoutPrefix(SchemaChangeHandler.SHADOW_NAME_PRFIX);
+            String originCol = column.getNameWithoutPrefix(SchemaChangeHandler.SHADOW_NAME_PREFIX);
             if (columnExprMap.containsKey(originCol)) {
                 Expr mappingExpr = columnExprMap.get(originCol);
                 if (mappingExpr != null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java
index a0f187277c..35dae8caf5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java
@@ -159,8 +159,8 @@ public class UpdatePlanner extends OriginalPlanner {
         for (int i = 0; i < targetTable.getFullSchema().size(); i++) {
             Column column = targetTable.getFullSchema().get(i);
             // pay attention to case ignore of column name
-            String originColumnName = (column.getName().startsWith(SchemaChangeHandler.SHADOW_NAME_PRFIX)
-                    ? column.getName().substring(SchemaChangeHandler.SHADOW_NAME_PRFIX.length()) : column.getName())
+            String originColumnName = (column.getName().startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)
+                    ? column.getName().substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length()) : column.getName())
                     .toLowerCase();
             Expr setExpr = columnNameToSetExpr.get(originColumnName);
             SlotDescriptor srcSlotDesc = columnNameToSrcSlotDesc.get(originColumnName);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index e0a622b55c..71784eae21 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -139,6 +139,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -180,7 +181,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
                     .getCatalogOrException(params.catalog, catalog -> new TException("Unknown catalog " + catalog)));
         }
         for (CatalogIf catalog : catalogIfs) {
-            List<String> dbNames = catalog.getDbNames();
+            List<String> dbNames = catalog.getDbNamesOrEmpty();
             LOG.debug("get db names: {}, in catalog: {}", dbNames, catalog.getName());
 
             UserIdentity currentUser = null;
@@ -234,11 +235,14 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         }
         String catalogName = Strings.isNullOrEmpty(params.catalog) ? InternalCatalog.INTERNAL_CATALOG_NAME
                 : params.catalog;
+
         DatabaseIf<TableIf> db = Env.getCurrentEnv().getCatalogMgr()
                 .getCatalogOrException(catalogName, catalog -> new TException("Unknown catalog " + catalog))
                 .getDbNullable(params.db);
+
         if (db != null) {
-            for (String tableName : db.getTableNamesWithLock()) {
+            Set<String> tableNames = db.getTableNamesOrEmptyWithLock();
+            for (String tableName : tableNames) {
                 LOG.debug("get table: {}, wait to check", tableName);
                 if (!Env.getCurrentEnv().getAuth()
                         .checkTblPriv(currentUser, params.db, tableName, PrivPredicate.SHOW)) {
@@ -286,16 +290,16 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         if (catalog != null) {
             DatabaseIf db = catalog.getDbNullable(params.db);
             if (db != null) {
-                List<TableIf> tables = null;
+                List<TableIf> tables = Lists.newArrayList();
                 if (!params.isSetType() || params.getType() == null || params.getType().isEmpty()) {
-                    tables = db.getTables();
+                    tables = db.getTablesOrEmpty();
                 } else {
                     switch (params.getType()) {
                         case "VIEW":
-                            tables = db.getViews();
+                            tables = db.getViewsOrEmpty();
                             break;
                         default:
-                            tables = db.getTables();
+                            tables = db.getTablesOrEmpty();
                     }
                 }
                 for (TableIf table : tables) {
@@ -421,11 +425,12 @@ public class FrontendServiceImpl implements FrontendService.Iface {
                 .getCatalogOrException(catalogName, catalog -> new TException("Unknown catalog " + catalog))
                 .getDbNullable(params.db);
         if (db != null) {
-            TableIf table = db.getTableNullable(params.getTableName());
+            TableIf table = db.getTableNullableIfException(params.getTableName());
             if (table != null) {
                 table.readLock();
                 try {
-                    for (Column column : table.getBaseSchema()) {
+                    List<Column> baseSchema = table.getBaseSchemaOrEmpty();
+                    for (Column column : baseSchema) {
                         final TColumnDesc desc = new TColumnDesc(column.getName(), column.getDataType().toThrift());
                         final Integer precision = column.getOriginType().getPrecision();
                         if (precision != null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
index 2da8f6dda3..d416c75796 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
@@ -202,8 +202,8 @@ public class CreateReplicaTask extends AgentTask {
             }
             // when doing schema change, some modified column has a prefix in name.
             // this prefix is only used in FE, not visible to BE, so we should remove this prefix.
-            if (column.getName().startsWith(SchemaChangeHandler.SHADOW_NAME_PRFIX)) {
-                tColumn.setColumnName(column.getName().substring(SchemaChangeHandler.SHADOW_NAME_PRFIX.length()));
+            if (column.getName().startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
+                tColumn.setColumnName(column.getName().substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length()));
             }
             tColumn.setVisible(column.isVisible());
             tColumns.add(tColumn);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/UpdatePlannerTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/UpdatePlannerTest.java
index 3573614198..253ce82930 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/UpdatePlannerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/UpdatePlannerTest.java
@@ -96,7 +96,7 @@ public class UpdatePlannerTest {
                 v1.getName();
                 result = "v1";
                 shadowV1.getName();
-                result = SchemaChangeHandler.SHADOW_NAME_PRFIX + "v1";
+                result = SchemaChangeHandler.SHADOW_NAME_PREFIX + "v1";
             }
         };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 04/08: [deps](fe)upgrade deps version (#15262)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit a9f47302aa737a5e04dd4674d47cdbd21f95ae34
Author: jiafeng.zhang <zh...@gmail.com>
AuthorDate: Sat Dec 24 22:18:10 2022 +0800

    [deps](fe)upgrade deps version (#15262)
    
    upgrade hadoop version to 2.10.2
    jackson-databind to 2.14.1
---
 .github/workflows/build-extension.yml |  19 +++--
 dist/LICENSE-dist.txt                 |  30 ++++----
 fe/fe-core/pom.xml                    | 127 +++++++++++++++++++++++++++++++---
 fe/hive-udf/pom.xml                   |  38 ++++++++++
 fe/java-udf/pom.xml                   |  19 +++++
 fe/pom.xml                            |   7 +-
 fe/spark-dpp/pom.xml                  |  17 +++++
 fs_brokers/apache_hdfs_broker/pom.xml |  32 ++++++++-
 8 files changed, 260 insertions(+), 29 deletions(-)

diff --git a/.github/workflows/build-extension.yml b/.github/workflows/build-extension.yml
index 7f91eda546..b78f69400c 100644
--- a/.github/workflows/build-extension.yml
+++ b/.github/workflows/build-extension.yml
@@ -65,11 +65,22 @@ jobs:
 
       - name: Setup thrift
         run: |
-          sudo apt-get update
-          sudo DEBIAN_FRONTEND=noninteractive apt-get install -y thrift-compiler=0.13.0-2build2
+          pushd thirdparty
+          branch="${{ github.base_ref }}"
+          if [[ -z "${branch}" ]] || [[ "${branch}" == 'master' ]]; then
+            curl -L https://github.com/apache/doris-thirdparty/releases/download/automation/doris-thirdparty-prebuilt-linux-x86_64.tar.xz \
+              -o doris-thirdparty-prebuilt-linux-x86_64.tar.xz
+          else
+            curl -L "https://github.com/apache/doris-thirdparty/releases/download/automation-${branch/branch-/}/doris-thirdparty-prebuilt-linux-x86_64.tar.xz" \
+              -o doris-thirdparty-prebuilt-linux-x86_64.tar.xz
+          fi
+          tar -xvf doris-thirdparty-prebuilt-linux-x86_64.tar.xz
+          popd
+          export PATH="${DEFAULT_DIR}/ldb-toolchain/bin/:$(pwd)/thirdparty/installed/bin/:${PATH}"
+          
           thrift --version
-          mkdir -p thirdparty/installed/bin/
-          cd thirdparty/installed/bin/ && ln -s /usr/bin/thrift thrift
+          # mkdir -p thirdparty/installed/bin/
+          # cd thirdparty/installed/bin/ && ln -s /usr/bin/thrift thrift
       - name: Build broker
         run: |
           cd fs_brokers/apache_hdfs_broker/ && /bin/bash build.sh
diff --git a/dist/LICENSE-dist.txt b/dist/LICENSE-dist.txt
index e3b1ebe75a..4fe86da53f 100644
--- a/dist/LICENSE-dist.txt
+++ b/dist/LICENSE-dist.txt
@@ -683,22 +683,17 @@ The Apache Software License, Version 2.0
         - org.apache.hadoop:hadoop-aws:2.7.3 (no url defined)
         - org.apache.hadoop:hadoop-aws:2.8.0 (no url defined)
     * Apache Hadoop Annotations:
-        - org.apache.hadoop:hadoop-annotations:2.6.5 (no url defined)
-        - org.apache.hadoop:hadoop-annotations:2.7.3 (no url defined)
-        - org.apache.hadoop:hadoop-annotations:2.8.0 (no url defined)
+        - org.apache.hadoop:hadoop-annotations:2.10.2 (no url defined)
     * Apache Hadoop Auth:
-        - org.apache.hadoop:hadoop-auth:2.7.3 (no url defined)
-        - org.apache.hadoop:hadoop-auth:2.8.0 (no url defined)
+        - org.apache.hadoop:hadoop-auth:2.10.2 (no url defined)
     * Apache Hadoop Client:
-        - org.apache.hadoop:hadoop-client:2.6.5 (no url defined)
+        - org.apache.hadoop:hadoop-client:2.10.2 (no url defined)
     * Apache Hadoop Common:
-        - org.apache.hadoop:hadoop-common:2.7.3 (no url defined)
-        - org.apache.hadoop:hadoop-common:2.8.0 (no url defined)
+        - org.apache.hadoop:hadoop-common:2.10.2 (no url defined)
     * Apache Hadoop HDFS:
-        - org.apache.hadoop:hadoop-hdfs:2.7.3 (no url defined)
-        - org.apache.hadoop:hadoop-hdfs:2.8.0 (no url defined)
+        - org.apache.hadoop:hadoop-hdfs:2.10.2 (no url defined)
     * Apache Hadoop HDFS Client:
-        - org.apache.hadoop:hadoop-hdfs-client:2.8.0 (no url defined)
+        - org.apache.hadoop:hadoop-hdfs-client:2.10.2 (no url defined)
     * Apache HttpClient:
         - org.apache.httpcomponents:httpclient:4.5.13 (http://hc.apache.org/httpcomponents-client)
     * Apache HttpCore:
@@ -985,6 +980,14 @@ The Apache Software License, Version 2.0
         - org.lz4:lz4-java:1.4.0 (https://github.com/lz4/lz4-java)
     * Metrics Core:
         - io.dropwizard.metrics:metrics-core:4.0.2 (http://metrics.dropwizard.io/metrics-core)
+    * avatica:
+        - org.apache.calcite.avatica:avatica:1.22.0 (https://calcite.apache.org/avatica/)        
+    * calcite-core:
+        - org.apache.calcite:calcite-core:1.32.0 (https://calcite.apache.org/avatica/)        
+    * calcite-core:
+        - org.apache.calcite:calcite-linq4j:1.32.0 (https://calcite.apache.org/avatica/)        
+    * calcite-core:
+        - org.apache.calcite.:calcite-druid:1.32.0 (https://calcite.apache.org/avatica/)        
     * Netty:
         - io.netty:netty:3.9.9.Final (http://netty.io/)
     * Netty Reactive Streams HTTP support:
@@ -992,8 +995,7 @@ The Apache Software License, Version 2.0
     * Netty Reactive Streams Implementation:
         - com.typesafe.netty:netty-reactive-streams:2.0.4 (https://github.com/playframework/netty-reactive-streams/netty-reactive-streams)
     * Netty/All-in-One:
-        - io.netty:netty-all:4.0.23.Final (http://netty.io/netty-all/)
-        - io.netty:netty-all:4.1.42.Final (https://netty.io/netty-all/)
+        - io.netty:netty-all:4.1.65.Final (https://netty.io/netty-all/)
     * Netty/Buffer:
         - io.netty:netty-buffer:4.1.72.Final (https://netty.io/netty-buffer/)
     * Netty/Codec:
@@ -1190,7 +1192,7 @@ The Apache Software License, Version 2.0
     * io.grpc:grpc-stub:
         - io.grpc:grpc-stub:1.30.0 (https://github.com/grpc/grpc-java)
     * jackson-databind:
-        - com.fasterxml.jackson.core:jackson-databind:2.12.1 (http://github.com/FasterXML/jackson)
+        - com.fasterxml.jackson.core:jackson-databind:2.14.1 (http://github.com/FasterXML/jackson)
     * jackson-module-scala:
         - com.fasterxml.jackson.module:jackson-module-scala_2.12:2.13.1 (http://wiki.fasterxml.com/JacksonModuleScala)
         - com.fasterxml.jackson.module:jackson-module-scala_2.12:2.6.7.1 (http://wiki.fasterxml.com/JacksonModuleScala)
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index 27fbafa26c..b26e2c132e 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -85,7 +85,7 @@ under the License.
             <dependency>
                 <groupId>org.apache.hadoop</groupId>
                 <artifactId>hadoop-mapreduce-client</artifactId>
-                <version>2.7.4</version>
+                <version>${hadoop.version}</version>
                 <scope>compile</scope>
             </dependency>
         </dependencies>
@@ -100,6 +100,12 @@ under the License.
             <groupId>${project.groupId}</groupId>
             <artifactId>spark-dpp</artifactId>
             <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>netty-all</artifactId>
+                    <groupId>io.netty</groupId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.logging.log4j</groupId>
@@ -430,6 +436,12 @@ under the License.
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-core_2.12</artifactId>
+            <exclusions>
+                <exclusion>
+                    <artifactId>netty-all</artifactId>
+                    <groupId>io.netty</groupId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-launcher_2.12 -->
         <dependency>
@@ -456,6 +468,10 @@ under the License.
                     <groupId>ch.qos.logback</groupId>
                     <artifactId>logback-classic</artifactId>
                 </exclusion>
+                <exclusion>
+                    <artifactId>netty-all</artifactId>
+                    <groupId>io.netty</groupId>
+                </exclusion>
             </exclusions>
         </dependency>
         <!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.protocol -->
@@ -472,6 +488,10 @@ under the License.
                     <groupId>ch.qos.logback</groupId>
                     <artifactId>logback-classic</artifactId>
                 </exclusion>
+                <exclusion>
+                    <artifactId>netty-all</artifactId>
+                    <groupId>io.netty</groupId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
@@ -599,6 +619,18 @@ under the License.
             <groupId>org.apache.hive</groupId>
             <artifactId>hive-metastore</artifactId>
             <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>netty-all</artifactId>
+                    <groupId>io.netty</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+            <version>${netty.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.hive</groupId>
@@ -610,20 +642,89 @@ under the License.
                     <groupId>org.apache.velocity</groupId>
                     <artifactId>velocity</artifactId>
                 </exclusion>
+                <exclusion>
+                    <artifactId>commons-httpclient</artifactId>
+                    <groupId>commons-httpclient</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>calcite-core</artifactId>
+                    <groupId>org.apache.calcite</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>calcite-druid</artifactId>
+                    <groupId>org.apache.calcite</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>calcite-linq4j</artifactId>
+                    <groupId>org.apache.calcite</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>avatica</artifactId>
+                    <groupId>org.apache.calcite.avatica</groupId>
+                </exclusion>
             </exclusions>
-    </dependency>
-    <dependency>
-        <groupId>org.apache.velocity</groupId>
-        <artifactId>velocity-engine-core</artifactId>
-        <version>2.3</version>
-    </dependency>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/org.apache.calcite.avatica/avatica -->
+        <dependency>
+            <groupId>org.apache.calcite.avatica</groupId>
+            <artifactId>avatica</artifactId>
+            <version>${avatica.version}</version>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/org.apache.calcite/calcite-core -->
+        <dependency>
+            <groupId>org.apache.calcite</groupId>
+            <artifactId>calcite-core</artifactId>
+            <version>${calcite.version}</version>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/org.apache.calcite/calcite-linq4j -->
+        <dependency>
+            <groupId>org.apache.calcite</groupId>
+            <artifactId>calcite-linq4j</artifactId>
+            <version>${calcite.version}</version>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/org.apache.calcite/calcite-druid -->
+        <dependency>
+            <groupId>org.apache.calcite</groupId>
+            <artifactId>calcite-druid</artifactId>
+            <version>${calcite.version}</version>
+        </dependency>
+        <dependency>
+           <groupId>org.apache.httpcomponents</groupId>
+           <artifactId>httpclient</artifactId>
+           <version>4.5.13</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.velocity</groupId>
+            <artifactId>velocity-engine-core</artifactId>
+            <version>2.3</version>
+        </dependency>
         <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-hdfs</artifactId>
             <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>netty-all</artifactId>
+                    <groupId>io.netty</groupId>
+                </exclusion>
+                 <exclusion>
+                    <artifactId>jackson-databind</artifactId>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                </exclusion>               
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>2.14.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-auth</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>provided</scope>
         </dependency>
-
         <!-- https://mvnrepository.com/artifact/io.opentelemetry/opentelemetry-api -->
         <dependency>
             <groupId>io.opentelemetry</groupId>
@@ -674,6 +775,16 @@ under the License.
         <dependency>
             <groupId>org.apache.hudi</groupId>
             <artifactId>hudi-common</artifactId>
+            <exclusions>
+                <exclusion>
+                    <artifactId>commons-httpclient</artifactId>
+                    <groupId>commons-httpclient</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>netty-all</artifactId>
+                    <groupId>io.netty</groupId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <!-- https://mvnrepository.com/artifact/org.apache.hudi/hudi-hadoop-mr -->
diff --git a/fe/hive-udf/pom.xml b/fe/hive-udf/pom.xml
index 564b399062..17994353e1 100644
--- a/fe/hive-udf/pom.xml
+++ b/fe/hive-udf/pom.xml
@@ -48,8 +48,46 @@ under the License.
                     <groupId>org.apache.velocity</groupId>
                     <artifactId>velocity</artifactId>
                 </exclusion>
+                <exclusion>
+                    <artifactId>commons-httpclient</artifactId>
+                    <groupId>commons-httpclient</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>avatica</artifactId>
+                    <groupId>org.apache.calcite.avatica</groupId>
+                </exclusion>
             </exclusions>
         </dependency>
+        <!-- https://mvnrepository.com/artifact/org.apache.calcite.avatica/avatica -->
+        <dependency>
+            <groupId>org.apache.calcite.avatica</groupId>
+            <artifactId>avatica</artifactId>
+            <version>${avatica.version}</version>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/org.apache.calcite/calcite-core -->
+        <dependency>
+            <groupId>org.apache.calcite</groupId>
+            <artifactId>calcite-core</artifactId>
+            <version>${calcite.version}</version>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/org.apache.calcite/calcite-linq4j -->
+        <dependency>
+            <groupId>org.apache.calcite</groupId>
+            <artifactId>calcite-linq4j</artifactId>
+            <version>${calcite.version}</version>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/org.apache.calcite/calcite-druid -->
+        <dependency>
+            <groupId>org.apache.calcite</groupId>
+            <artifactId>calcite-druid</artifactId>
+            <version>${calcite.version}</version>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.5.13</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.velocity</groupId>
             <artifactId>velocity-engine-core</artifactId>
diff --git a/fe/java-udf/pom.xml b/fe/java-udf/pom.xml
index d8fc0da715..f65234957f 100644
--- a/fe/java-udf/pom.xml
+++ b/fe/java-udf/pom.xml
@@ -44,6 +44,16 @@ under the License.
             <groupId>${project.groupId}</groupId>
             <artifactId>fe-core</artifactId>
             <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>netty-all</artifactId>
+                    <groupId>io.netty</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>commons-httpclient</artifactId>
+                    <groupId>commons-httpclient</groupId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.hive</groupId>
@@ -58,6 +68,10 @@ under the License.
                     <groupId>org.apache.velocity</groupId>
                     <artifactId>velocity</artifactId>
                 </exclusion>
+                <exclusion>
+                    <artifactId>commons-httpclient</artifactId>
+                    <groupId>commons-httpclient</groupId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
@@ -65,6 +79,11 @@ under the License.
             <artifactId>velocity-engine-core</artifactId>
             <version>2.3</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.5.13</version>
+        </dependency>
         <!-- https://mvnrepository.com/artifact/org.junit.jupiter/junit-jupiter-engine -->
         <dependency>
             <groupId>org.junit.jupiter</groupId>
diff --git a/fe/pom.xml b/fe/pom.xml
index e78ac13188..24d00e8744 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -220,6 +220,9 @@ under the License.
         <spark.version>2.4.6</spark.version>
         <hive.version>2.3.7</hive.version>
         <hadoop.version>2.8.0</hadoop.version>
+        <netty.version>4.1.65.Final</netty.version>
+        <calcite.version>1.32.0</calcite.version>
+        <avatica.version>1.22.0</avatica.version>
         <!-- ATTN: avro version must be consistent with Iceberg version -->
         <!-- Please modify iceberg.version and avro.version together,
          you can find avro version info in iceberg mvn repository -->
@@ -239,7 +242,7 @@ under the License.
         <aws-java-sdk-s3.version>1.11.95</aws-java-sdk-s3.version>
         <mariadb-java-client.version>3.0.4</mariadb-java-client.version>
         <dlf-metastore-client-hive2.version>0.2.14</dlf-metastore-client-hive2.version>
-
+        <hadoop.version>2.10.2</hadoop.version>
         <revision>1.0-SNAPSHOT</revision>
         <project.scm.id>github</project.scm.id>
     </properties>
@@ -308,7 +311,7 @@ under the License.
             <dependency>
                 <groupId>org.apache.hadoop</groupId>
                 <artifactId>hadoop-client</artifactId>
-                <version>2.8.0</version>
+                <version>${hadoop.version}</version>
                 <scope>compile</scope>
             </dependency>
             <dependency>
diff --git a/fe/spark-dpp/pom.xml b/fe/spark-dpp/pom.xml
index 1d7bff6aae..8e792980d7 100644
--- a/fe/spark-dpp/pom.xml
+++ b/fe/spark-dpp/pom.xml
@@ -87,7 +87,24 @@ under the License.
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-core_2.12</artifactId>
             <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>netty-all</artifactId>
+                    <groupId>io.netty</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>netty</artifactId>
+                    <groupId>io.netty</groupId>
+                </exclusion>
+            </exclusions>
         </dependency>
+        <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+            <version>${netty.version}</version>
+        </dependency>
+
         <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.12 -->
         <dependency>
             <groupId>org.apache.spark</groupId>
diff --git a/fs_brokers/apache_hdfs_broker/pom.xml b/fs_brokers/apache_hdfs_broker/pom.xml
index 6d1147b8c1..bea0443d56 100644
--- a/fs_brokers/apache_hdfs_broker/pom.xml
+++ b/fs_brokers/apache_hdfs_broker/pom.xml
@@ -69,7 +69,8 @@ under the License.
         <maven.compiler.target>1.8</maven.compiler.target>
         <log4j2.version>2.18.0</log4j2.version>
         <project.scm.id>github</project.scm.id>
-        <hadoop.version>2.9.1</hadoop.version>
+        <hadoop.version>2.10.2</hadoop.version>
+        <netty.version>4.1.65.Final</netty.version>
     </properties>
     <profiles>
         <!-- for custom internal repository -->
@@ -211,7 +212,29 @@ under the License.
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-hdfs</artifactId>
             <version>${hadoop.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>netty-all</artifactId>
+                    <groupId>io.netty</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>jackson-databind</artifactId>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                </exclusion>
+            </exclusions>
         </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>2.14.1</version>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+            <version>${netty.version}</version>
+        </dependency>
+
         <!-- https://mvnrepository.com/artifact/org.apache.htrace/htrace-core -->
         <dependency>
             <groupId>org.apache.htrace</groupId>
@@ -285,6 +308,12 @@ under the License.
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-huaweicloud</artifactId>
             <version>2.8.3</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>jackson-databind</artifactId>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aliyun -->
         <dependency>
@@ -292,6 +321,7 @@ under the License.
             <artifactId>hadoop-aliyun</artifactId>
             <version>${hadoop.version}</version>
         </dependency>
+
         <!-- https://mvnrepository.com/artifact/com.qcloud.cos/hadoop-cos -->
         <dependency>
             <groupId>com.qcloud.cos</groupId>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 06/08: [fix](array-type) forbid implicit cast of array type while load (#15325)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 7dd3513d817f99e42006ff64f20a0c34b247006d
Author: camby <10...@qq.com>
AuthorDate: Mon Dec 26 09:24:44 2022 +0800

    [fix](array-type) forbid implicit cast of array type while load (#15325)
    
    * forbit array cast while load
    
    * add regression test
    
    Co-authored-by: cambyzju <zh...@baidu.com>
---
 .../src/main/java/org/apache/doris/load/Load.java  | 11 +++++++++++
 .../load_p0/stream_load/test_stream_load.groovy    | 22 ++++++++++++++++++++++
 2 files changed, 33 insertions(+)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
index 2ac0161df3..a779028388 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
@@ -1061,6 +1061,17 @@ public class Load {
                     throw new AnalysisException("Don't support aggregation function in load expression");
                 }
             }
+
+            // Array type do not support cast now
+            Type exprReturnType = expr.getType();
+            if (exprReturnType.isArrayType()) {
+                Type schemaType = tbl.getColumn(entry.getKey()).getType();
+                if (exprReturnType != schemaType) {
+                    throw new AnalysisException("Don't support load from type:" + exprReturnType + " to type:"
+                            + schemaType + " for column:" + entry.getKey());
+                }
+            }
+
             exprsByName.put(entry.getKey(), expr);
         }
 
diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
index ec129c0964..e93daf8e22 100644
--- a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
@@ -650,5 +650,27 @@ suite("test_stream_load", "p0") {
     order_qt_all102 "SELECT * from ${tableName8}" // 8
     sql """truncate table ${tableName8}"""
     sql """sync"""
+
+    // malformat with mismatch array type
+    streamLoad {
+        table "${tableName8}"
+
+        set 'column_separator', '|'
+        set 'columns', 'k1,k2,k3,k4,k5,k6,k7,k8,k9,b10,k11,k10=array_remove(cast(k5 as array<bigint>), 1)'
+
+        file 'array_normal.csv'
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("fail", json.Status.toLowerCase())
+            assertTrue(json.Message.contains('Don\'t support load from type'))
+        }
+    }
+    sql "sync"
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org