You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/06/07 16:12:54 UTC

[doris] branch branch-2.0-beta updated (5c33dd7a2c -> f4f86b07d8)

This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a change to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git


    from 5c33dd7a2c [fix](log) publish version log is printed too frequently (#20507)
     new 41ec7010d6 [fix](cooldown) fix bug due to tablets info changed (#20465)
     new aff0907778 [improvement](sink) reuse rows buffer in msyql_result_writer (#20482)
     new 053c8c303f [feature](backup-restore) Add local backup/restore not upload/download by broker (#20492)
     new b2d47d98da [fix](regex) String with Chinese characters matching failed (#20493)
     new 317685ef79 [bug](table_function) fix table function node forget to call open function of expr (#20495)
     new e0e7e3e4fd [fix](stats) Make alter column stats no forward  (#20501)
     new 55698a240d [fix](nereids) filter and project node should be pushed down through cte (#20508)
     new 4654fe0a74 [Bug](memleak) Fix emptyoperator may cause node not close (#20525)
     new c6a2bc900d [Fix](inverted index) if range query exceeds CLucene limits, downgrade it from inverted index (#20528)
     new 8b5d439102 [fix](olapscanner) fix coredump caused by concurrent acccess of olap scan node _conjuncts (#20534)
     new 2e4ba8252a [fix](lazy_open) fix lazy open null point (#20540)
     new 23e760a0a9 [feature-wip](multi-catalog)(step2)support read max compute data by JNI (#19819)
     new f4f86b07d8 [Feature](multi-catalog)support paimon catalog (#19681)

The 13 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/agent/task_worker_pool.cpp                  |  28 ++-
 be/src/common/status.h                             |   4 +-
 .../rowset/segment_v2/inverted_index_reader.cpp    |  20 +-
 .../olap/rowset/segment_v2/inverted_index_reader.h |   1 +
 be/src/olap/rowset/segment_v2/segment_iterator.cpp |   4 +-
 be/src/pipeline/exec/empty_source_operator.cpp     |   2 +-
 be/src/pipeline/exec/empty_source_operator.h       |  19 +-
 be/src/pipeline/pipeline_fragment_context.cpp      |   2 +-
 be/src/runtime/descriptors.cpp                     |  23 +-
 be/src/runtime/descriptors.h                       |  23 ++
 be/src/runtime/load_channel_mgr.cpp                |  14 +-
 be/src/runtime/snapshot_loader.cpp                 | 269 +++++++++++++++++++++
 be/src/runtime/snapshot_loader.h                   |   5 +
 be/src/service/backend_service.cpp                 |  14 +-
 be/src/util/mysql_row_buffer.cpp                   |  20 ++
 be/src/util/mysql_row_buffer.h                     |   2 +
 be/src/vec/CMakeLists.txt                          |   2 +
 be/src/vec/exec/jni_connector.cpp                  |  47 ++--
 be/src/vec/exec/jni_connector.h                    |   1 -
 be/src/vec/exec/scan/max_compute_jni_reader.cpp    | 100 ++++++++
 be/src/vec/exec/scan/max_compute_jni_reader.h      |  80 ++++++
 be/src/vec/exec/scan/new_olap_scan_node.cpp        |   1 +
 be/src/vec/exec/scan/new_olap_scanner.cpp          |   5 +-
 be/src/vec/exec/scan/new_olap_scanner.h            |   2 +
 be/src/vec/exec/scan/paimon_reader.cpp             |  84 +++++++
 be/src/vec/exec/scan/paimon_reader.h               |  76 ++++++
 be/src/vec/exec/scan/vfile_scanner.cpp             |  22 ++
 be/src/vec/exec/vtable_function_node.h             |   1 +
 be/src/vec/functions/like.cpp                      |   5 +-
 be/src/vec/sink/vmysql_result_writer.cpp           |  61 +++--
 be/src/vec/sink/vmysql_result_writer.h             |   3 +
 fe/fe-core/pom.xml                                 |  20 +-
 .../doris/analysis/AlterColumnStatsStmt.java       |   5 +
 .../org/apache/doris/analysis/RestoreStmt.java     |  32 ++-
 .../apache/doris/analysis/ShowSnapshotStmt.java    |  33 ++-
 .../org/apache/doris/backup/BackupHandler.java     | 132 +++++++---
 .../java/org/apache/doris/backup/BackupJob.java    |  25 +-
 .../org/apache/doris/backup/BackupJobInfo.java     |  93 ++++++-
 .../java/org/apache/doris/backup/BackupMeta.java   |  14 ++
 .../java/org/apache/doris/backup/Repository.java   |   2 +
 .../java/org/apache/doris/backup/RestoreJob.java   | 206 +++++++++++++++-
 .../java/org/apache/doris/backup/Snapshot.java     |  69 ++++++
 .../java/org/apache/doris/catalog/TableIf.java     |   4 +-
 .../catalog/external/MaxComputeExternalTable.java  |  25 +-
 .../catalog/external/PaimonExternalDatabase.java   |  72 ++++++
 .../catalog/external/PaimonExternalTable.java      | 132 ++++++++++
 .../apache/doris/datasource/CatalogFactory.java    |   4 +
 .../apache/doris/datasource/ExternalCatalog.java   |  18 +-
 .../apache/doris/datasource/InitCatalogLog.java    |   1 +
 .../apache/doris/datasource/InitDatabaseLog.java   |   1 +
 .../datasource/MaxComputeExternalCatalog.java      |  61 ++++-
 .../datasource/paimon/PaimonExternalCatalog.java   | 103 ++++++++
 .../paimon/PaimonHMSExternalCatalog.java           | 105 ++++++++
 .../property/constants/MCProperties.java           |   1 +
 .../property/constants/PaimonProperties.java       |  11 +-
 .../glue/translator/PhysicalPlanTranslator.java    |   4 +
 .../doris/nereids/jobs/batch/NereidsRewriter.java  |   6 -
 .../nereids/processor/post/PlanPostProcessors.java |   2 +-
 .../org/apache/doris/nereids/rules/RuleSet.java    |   6 +-
 .../org/apache/doris/nereids/rules/RuleType.java   |   4 +-
 ...TEAnchor.java => PushdownFilterThroughCTE.java} |  16 +-
 ...EAnchor.java => PushdownProjectThroughCTE.java} |  16 +-
 .../org/apache/doris/persist/gson/GsonUtils.java   |   8 +
 .../apache/doris/planner/SingleNodePlanner.java    |   4 +
 .../doris/planner/external/FileQueryScanNode.java  |  15 +-
 .../doris/planner/external/MaxComputeScanNode.java |  48 +++-
 .../doris/planner/external/TableFormatType.java    |   3 +-
 .../planner/external/paimon/PaimonScanNode.java    | 177 ++++++++++++++
 .../planner/external/paimon/PaimonSource.java      |  64 +++++
 .../doris/planner/external/paimon/PaimonSplit.java |  65 +++++
 .../apache/doris/service/FrontendServiceImpl.java  | 194 ++++++++++++++-
 .../org/apache/doris/statistics/DeriveFactory.java |   1 +
 .../apache/doris/statistics/StatisticalType.java   |   1 +
 .../java/org/apache/doris/task/DownloadTask.java   |  36 ++-
 fe/java-udf/pom.xml                                |  46 +++-
 .../org/apache/doris/hudi/HudiColumnValue.java     |   5 +
 .../main/java/org/apache/doris/jni/JniScanner.java |   5 +-
 .../org/apache/doris/jni/MaxComputeJniScanner.java | 251 +++++++++++++++++++
 .../java/org/apache/doris/jni/MockJniScanner.java  |   5 +
 .../org/apache/doris/jni/PaimonJniScanner.java     | 186 ++++++++++++++
 .../java/org/apache/doris/jni/vec/ColumnValue.java |  34 +--
 .../doris/jni/vec/MaxComputeColumnValue.java       | 185 ++++++++++++++
 .../vec/PaimonColumnValue.java}                    |  61 ++---
 .../org/apache/doris/jni/vec/ScanPredicate.java    |   5 +
 .../org/apache/doris/jni/vec/VectorColumn.java     |   2 +-
 fe/pom.xml                                         |  21 +-
 gensrc/thrift/AgentService.thrift                  |  11 +
 gensrc/thrift/Descriptors.thrift                   |   6 +-
 gensrc/thrift/FrontendService.thrift               |  48 ++++
 gensrc/thrift/PlanNodes.thrift                     |  25 +-
 gensrc/thrift/Status.thrift                        |   3 +
 gensrc/thrift/Types.thrift                         |   1 +
 regression-test/data/nereids_syntax_p0/cte.out     |  10 +
 .../test_string_function_regexp.out                |   6 +
 .../create_table_use_partition_policy.groovy       |   2 +-
 .../create_table_use_policy.groovy                 |   2 +-
 .../modify_replica_use_partition.groovy            |   2 +-
 .../table_modify_resouce_and_policy.groovy         |   2 +-
 .../suites/nereids_syntax_p0/cte.groovy            |  34 +++
 .../test_string_function_regexp.groovy             |   3 +
 100 files changed, 3479 insertions(+), 261 deletions(-)
 create mode 100644 be/src/vec/exec/scan/max_compute_jni_reader.cpp
 create mode 100644 be/src/vec/exec/scan/max_compute_jni_reader.h
 create mode 100644 be/src/vec/exec/scan/paimon_reader.cpp
 create mode 100644 be/src/vec/exec/scan/paimon_reader.h
 create mode 100644 fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
 create mode 100644 fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalDatabase.java
 create mode 100644 fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java
 create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
 create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java
 copy be/src/pipeline/exec/empty_source_operator.cpp => fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java (76%)
 rename fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/{PushdownFilterThroughCTEAnchor.java => PushdownFilterThroughCTE.java} (68%)
 rename fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/{PushdownProjectThroughCTEAnchor.java => PushdownProjectThroughCTE.java} (68%)
 create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
 create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java
 create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java
 create mode 100644 fe/java-udf/src/main/java/org/apache/doris/jni/MaxComputeJniScanner.java
 create mode 100644 fe/java-udf/src/main/java/org/apache/doris/jni/PaimonJniScanner.java
 create mode 100644 fe/java-udf/src/main/java/org/apache/doris/jni/vec/MaxComputeColumnValue.java
 copy fe/java-udf/src/main/java/org/apache/doris/{hudi/HudiColumnValue.java => jni/vec/PaimonColumnValue.java} (61%)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 09/13: [Fix](inverted index) if range query exceeds CLucene limits, downgrade it from inverted index (#20528)

Posted by kx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git

commit c6a2bc900d66350664ca3bf377d65ecfd3e5160e
Author: airborne12 <ai...@gmail.com>
AuthorDate: Wed Jun 7 20:07:48 2023 +0800

    [Fix](inverted index) if range query exceeds CLucene limits, downgrade it from inverted index (#20528)
    
    CLucene may throw CL_ERR_TooManyClauses when a range query hits too many terms.
    In this situation, we have to downgrade from inverted index.
---
 be/src/common/status.h                               |  4 ++--
 .../olap/rowset/segment_v2/inverted_index_reader.cpp | 20 +++++++++++++++++---
 .../olap/rowset/segment_v2/inverted_index_reader.h   |  1 +
 be/src/olap/rowset/segment_v2/segment_iterator.cpp   |  4 ++--
 4 files changed, 22 insertions(+), 7 deletions(-)

diff --git a/be/src/common/status.h b/be/src/common/status.h
index ba634df13c..146b4f34f7 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -262,7 +262,7 @@ E(INVERTED_INDEX_INVALID_PARAMETERS, -6000);
 E(INVERTED_INDEX_NOT_SUPPORTED, -6001);
 E(INVERTED_INDEX_CLUCENE_ERROR, -6002);
 E(INVERTED_INDEX_FILE_NOT_FOUND, -6003);
-E(INVERTED_INDEX_FILE_HIT_LIMIT, -6004);
+E(INVERTED_INDEX_BYPASS, -6004);
 E(INVERTED_INDEX_NO_TERMS, -6005);
 E(INVERTED_INDEX_RENAME_FILE_FAILED, -6006);
 E(INVERTED_INDEX_EVALUATE_SKIPPED, -6007);
@@ -293,7 +293,7 @@ constexpr bool capture_stacktrace() {
         && code != ErrorCode::INVERTED_INDEX_NOT_SUPPORTED
         && code != ErrorCode::INVERTED_INDEX_CLUCENE_ERROR
         && code != ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND
-        && code != ErrorCode::INVERTED_INDEX_FILE_HIT_LIMIT
+        && code != ErrorCode::INVERTED_INDEX_BYPASS
         && code != ErrorCode::INVERTED_INDEX_NO_TERMS
         && code != ErrorCode::INVERTED_INDEX_EVALUATE_SKIPPED
         && code != ErrorCode::META_KEY_NOT_FOUND
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
index a97eae836f..ad0deeafb2 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
@@ -73,6 +73,13 @@
 namespace doris {
 namespace segment_v2 {
 
+bool InvertedIndexReader::_is_range_query(InvertedIndexQueryType query_type) {
+    return (query_type == InvertedIndexQueryType::GREATER_THAN_QUERY ||
+            query_type == InvertedIndexQueryType::GREATER_EQUAL_QUERY ||
+            query_type == InvertedIndexQueryType::LESS_THAN_QUERY ||
+            query_type == InvertedIndexQueryType::LESS_EQUAL_QUERY);
+}
+
 bool InvertedIndexReader::_is_match_query(InvertedIndexQueryType query_type) {
     return (query_type == InvertedIndexQueryType::MATCH_ANY_QUERY ||
             query_type == InvertedIndexQueryType::MATCH_ALL_QUERY ||
@@ -491,8 +498,15 @@ Status StringTypeInvertedIndexReader::query(OlapReaderStatistics* stats,
                                     result.add(docid);
                                 });
     } catch (const CLuceneError& e) {
-        LOG(WARNING) << "CLuceneError occured, error msg: " << e.what();
-        return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>();
+        if (_is_range_query(query_type) && e.number() == CL_ERR_TooManyClauses) {
+            LOG(WARNING) << "range query term exceeds limits, try to downgrade from inverted index,"
+                         << "column name:" << column_name << " search_str:" << search_str;
+            return Status::Error<ErrorCode::INVERTED_INDEX_BYPASS>();
+        } else {
+            LOG(WARNING) << "CLuceneError occured, error msg: " << e.what()
+                         << "column name:" << column_name << " search_str:" << search_str;
+            return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>();
+        }
     }
 
     // add to cache
@@ -875,7 +889,7 @@ Status InvertedIndexIterator::read_from_inverted_index(const std::string& column
         if (hit_count > segment_num_rows * query_bkd_limit_percent / 100) {
             LOG(INFO) << "hit count: " << hit_count << ", bkd inverted reached limit "
                       << query_bkd_limit_percent << "%, segment num rows: " << segment_num_rows;
-            return Status::Error<ErrorCode::INVERTED_INDEX_FILE_HIT_LIMIT>();
+            return Status::Error<ErrorCode::INVERTED_INDEX_BYPASS>();
         }
     }
 
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_reader.h b/be/src/olap/rowset/segment_v2/inverted_index_reader.h
index d68939fd5c..f9272d16fa 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_reader.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_reader.h
@@ -109,6 +109,7 @@ public:
                                                         InvertedIndexCtx* inverted_index_ctx);
 
 protected:
+    bool _is_range_query(InvertedIndexQueryType query_type);
     bool _is_match_query(InvertedIndexQueryType query_type);
     friend class InvertedIndexIterator;
     io::FileSystemSPtr _fs;
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 284ae71f1a..92717945b1 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -752,13 +752,13 @@ Status SegmentIterator::_apply_index_except_leafnode_of_andnode() {
 
 bool SegmentIterator::_downgrade_without_index(Status res, bool need_remaining) {
     if (res.code() == ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND ||
-        res.code() == ErrorCode::INVERTED_INDEX_FILE_HIT_LIMIT ||
+        res.code() == ErrorCode::INVERTED_INDEX_BYPASS ||
         res.code() == ErrorCode::INVERTED_INDEX_EVALUATE_SKIPPED ||
         (res.code() == ErrorCode::INVERTED_INDEX_NO_TERMS && need_remaining)) {
         // 1. INVERTED_INDEX_FILE_NOT_FOUND means index file has not been built,
         //    usually occurs when creating a new index, queries can be downgraded
         //    without index.
-        // 2. INVERTED_INDEX_FILE_HIT_LIMIT means the hit of condition by index
+        // 2. INVERTED_INDEX_BYPASS means the hit of condition by index
         //    has reached the optimal limit, downgrade without index query can
         //    improve query performance.
         // 3. INVERTED_INDEX_EVALUATE_SKIPPED means the inverted index is not


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 11/13: [fix](lazy_open) fix lazy open null point (#20540)

Posted by kx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 2e4ba8252a2eabd74160a5ec58269afea7a1094e
Author: HHoflittlefish777 <77...@users.noreply.github.com>
AuthorDate: Wed Jun 7 17:56:31 2023 +0800

    [fix](lazy_open) fix lazy open null point (#20540)
---
 be/src/runtime/load_channel_mgr.cpp | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp
index 2453e0dcf6..3fe4c7986d 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -143,11 +143,15 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) {
 Status LoadChannelMgr::open_partition(const OpenPartitionRequest& params) {
     UniqueId load_id(params.id());
     std::shared_ptr<LoadChannel> channel;
-    auto it = _load_channels.find(load_id);
-    if (it != _load_channels.end()) {
-        channel = it->second;
-    } else {
-        return Status::InternalError("unknown load id, load id=" + load_id.to_string());
+    {
+        std::lock_guard<std::mutex> l(_lock);
+        auto it = _load_channels.find(load_id);
+        if (it != _load_channels.end()) {
+            channel = it->second;
+        } else {
+            DCHECK(false);
+            return Status::InternalError("unknown load id, load id=" + load_id.to_string());
+        }
     }
     RETURN_IF_ERROR(channel->open_partition(params));
     return Status::OK();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 12/13: [feature-wip](multi-catalog)(step2)support read max compute data by JNI (#19819)

Posted by kx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 23e760a0a9036ad3335dfc08278f409eeb71873e
Author: slothever <18...@users.noreply.github.com>
AuthorDate: Mon Jun 5 22:10:08 2023 +0800

    [feature-wip](multi-catalog)(step2)support read max compute data by JNI (#19819)
    
    Issue Number: #19679
---
 be/src/runtime/descriptors.cpp                     |  23 +-
 be/src/runtime/descriptors.h                       |  23 ++
 be/src/vec/CMakeLists.txt                          |   1 +
 be/src/vec/exec/jni_connector.cpp                  |  47 ++--
 be/src/vec/exec/jni_connector.h                    |   1 -
 be/src/vec/exec/scan/max_compute_jni_reader.cpp    | 100 ++++++++
 be/src/vec/exec/scan/max_compute_jni_reader.h      |  80 +++++++
 be/src/vec/exec/scan/vfile_scanner.cpp             |  14 ++
 fe/fe-core/pom.xml                                 |   1 -
 .../catalog/external/MaxComputeExternalTable.java  |  25 +-
 .../apache/doris/datasource/ExternalCatalog.java   |  15 +-
 .../datasource/MaxComputeExternalCatalog.java      |  61 ++++-
 .../property/constants/MCProperties.java           |   1 +
 .../doris/planner/external/FileQueryScanNode.java  |  11 +-
 .../doris/planner/external/MaxComputeScanNode.java |  48 +++-
 fe/java-udf/pom.xml                                |  39 +++-
 .../org/apache/doris/hudi/HudiColumnValue.java     |   5 +
 .../main/java/org/apache/doris/jni/JniScanner.java |   5 +-
 .../org/apache/doris/jni/MaxComputeJniScanner.java | 251 +++++++++++++++++++++
 .../java/org/apache/doris/jni/MockJniScanner.java  |   5 +
 .../java/org/apache/doris/jni/vec/ColumnValue.java |  34 +--
 .../doris/jni/vec/MaxComputeColumnValue.java       | 185 +++++++++++++++
 .../org/apache/doris/jni/vec/ScanPredicate.java    |   5 +
 .../org/apache/doris/jni/vec/VectorColumn.java     |   2 +-
 fe/pom.xml                                         |  19 +-
 gensrc/thrift/Descriptors.thrift                   |   6 +-
 gensrc/thrift/PlanNodes.thrift                     |   1 +
 gensrc/thrift/Types.thrift                         |   1 +
 28 files changed, 931 insertions(+), 78 deletions(-)

diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp
index f4cb3f6dff..a75d3d0e71 100644
--- a/be/src/runtime/descriptors.cpp
+++ b/be/src/runtime/descriptors.cpp
@@ -117,7 +117,8 @@ std::string SlotDescriptor::debug_string() const {
 }
 
 TableDescriptor::TableDescriptor(const TTableDescriptor& tdesc)
-        : _name(tdesc.tableName),
+        : _table_type(tdesc.tableType),
+          _name(tdesc.tableName),
           _database(tdesc.dbName),
           _table_id(tdesc.id),
           _num_cols(tdesc.numCols),
@@ -179,6 +180,23 @@ std::string IcebergTableDescriptor::debug_string() const {
     return out.str();
 }
 
+MaxComputeTableDescriptor::MaxComputeTableDescriptor(const TTableDescriptor& tdesc)
+        : TableDescriptor(tdesc),
+          _region(tdesc.mcTable.region),
+          _project(tdesc.mcTable.project),
+          _table(tdesc.mcTable.table),
+          _access_key(tdesc.mcTable.access_key),
+          _secret_key(tdesc.mcTable.secret_key),
+          _public_access(tdesc.mcTable.public_access) {}
+
+MaxComputeTableDescriptor::~MaxComputeTableDescriptor() {}
+
+std::string MaxComputeTableDescriptor::debug_string() const {
+    std::stringstream out;
+    out << "MaxComputeTable(" << TableDescriptor::debug_string() << ")";
+    return out.str();
+}
+
 EsTableDescriptor::EsTableDescriptor(const TTableDescriptor& tdesc) : TableDescriptor(tdesc) {}
 
 EsTableDescriptor::~EsTableDescriptor() {}
@@ -573,6 +591,9 @@ Status DescriptorTbl::create(ObjectPool* pool, const TDescriptorTable& thrift_tb
         case TTableType::JDBC_TABLE:
             desc = pool->add(new JdbcTableDescriptor(tdesc));
             break;
+        case TTableType::MAX_COMPUTE_TABLE:
+            desc = pool->add(new MaxComputeTableDescriptor(tdesc));
+            break;
         default:
             DCHECK(false) << "invalid table type: " << tdesc.tableType;
         }
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 48ea79d879..cdb20f0fb2 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -162,11 +162,13 @@ public:
         return slot_desc->col_pos() < _num_clustering_cols;
     }
 
+    ::doris::TTableType::type table_type() const { return _table_type; }
     const std::string& name() const { return _name; }
     const std::string& database() const { return _database; }
     int32_t table_id() const { return _table_id; }
 
 private:
+    ::doris::TTableType::type _table_type;
     std::string _name;
     std::string _database;
     int32_t _table_id;
@@ -218,6 +220,27 @@ public:
 private:
 };
 
+class MaxComputeTableDescriptor : public TableDescriptor {
+public:
+    MaxComputeTableDescriptor(const TTableDescriptor& tdesc);
+    ~MaxComputeTableDescriptor() override;
+    std::string debug_string() const override;
+    const std::string region() const { return _region; }
+    const std::string project() const { return _project; }
+    const std::string table() const { return _table; }
+    const std::string access_key() const { return _access_key; }
+    const std::string secret_key() const { return _secret_key; }
+    const std::string public_access() const { return _public_access; }
+
+private:
+    std::string _region;
+    std::string _project;
+    std::string _table;
+    std::string _access_key;
+    std::string _secret_key;
+    std::string _public_access;
+};
+
 class EsTableDescriptor : public TableDescriptor {
 public:
     EsTableDescriptor(const TTableDescriptor& tdesc);
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 725c4c88bc..2bf3f245a1 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -352,6 +352,7 @@ set(VEC_FILES
   exec/format/parquet/bool_rle_decoder.cpp
   exec/jni_connector.cpp
   exec/scan/jni_reader.cpp
+  exec/scan/max_compute_jni_reader.cpp
   )
 
 if (WITH_MYSQL)
diff --git a/be/src/vec/exec/jni_connector.cpp b/be/src/vec/exec/jni_connector.cpp
index fcddbad134..fd5cf52e2b 100644
--- a/be/src/vec/exec/jni_connector.cpp
+++ b/be/src/vec/exec/jni_connector.cpp
@@ -63,14 +63,17 @@ JniConnector::~JniConnector() {
 }
 
 Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) {
-    RETURN_IF_ERROR(JniUtil::GetJNIEnv(&_env));
-    if (_env == nullptr) {
+    // cannot put the env into fields, because frames in an env object is limited
+    // to avoid limited frames in a thread, we should get local env in a method instead of in whole object.
+    JNIEnv* env = nullptr;
+    RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
+    if (env == nullptr) {
         return Status::InternalError("Failed to get/create JVM");
     }
-    RETURN_IF_ERROR(_init_jni_scanner(_env, state->batch_size()));
+    RETURN_IF_ERROR(_init_jni_scanner(env, state->batch_size()));
     // Call org.apache.doris.jni.JniScanner#open
-    _env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_open);
-    RETURN_ERROR_IF_EXC(_env);
+    env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_open);
+    RETURN_ERROR_IF_EXC(env);
     return Status::OK();
 }
 
@@ -87,12 +90,12 @@ Status JniConnector::init(
 }
 
 Status JniConnector::get_nex_block(Block* block, size_t* read_rows, bool* eof) {
-    JniLocalFrame jni_frame;
-    RETURN_IF_ERROR(jni_frame.push(_env));
     // Call org.apache.doris.jni.JniScanner#getNextBatchMeta
     // return the address of meta information
-    long meta_address = _env->CallLongMethod(_jni_scanner_obj, _jni_scanner_get_next_batch);
-    RETURN_ERROR_IF_EXC(_env);
+    JNIEnv* env = nullptr;
+    RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
+    long meta_address = env->CallLongMethod(_jni_scanner_obj, _jni_scanner_get_next_batch);
+    RETURN_ERROR_IF_EXC(env);
     if (meta_address == 0) {
         // Address == 0 when there's no data in scanner
         *read_rows = 0;
@@ -109,25 +112,27 @@ Status JniConnector::get_nex_block(Block* block, size_t* read_rows, bool* eof) {
     RETURN_IF_ERROR(_fill_block(block, num_rows));
     *read_rows = num_rows;
     *eof = false;
-    _env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table);
-    RETURN_ERROR_IF_EXC(_env);
+    env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table);
+    RETURN_ERROR_IF_EXC(env);
     _has_read += num_rows;
     return Status::OK();
 }
 
 Status JniConnector::close() {
     if (!_closed) {
+        JNIEnv* env = nullptr;
+        RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
         // _fill_block may be failed and returned, we should release table in close.
         // org.apache.doris.jni.JniScanner#releaseTable is idempotent
-        _env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table);
-        _env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_close);
-        _env->DeleteLocalRef(_jni_scanner_obj);
-        _env->DeleteLocalRef(_jni_scanner_cls);
+        env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table);
+        env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_close);
+        env->DeleteGlobalRef(_jni_scanner_obj);
+        env->DeleteGlobalRef(_jni_scanner_cls);
         _closed = true;
-        jthrowable exc = (_env)->ExceptionOccurred();
+        jthrowable exc = (env)->ExceptionOccurred();
         if (exc != nullptr) {
             LOG(FATAL) << "Failed to release jni resource: "
-                       << JniUtil::GetJniExceptionMsg(_env).to_string();
+                       << JniUtil::GetJniExceptionMsg(env).to_string();
         }
     }
     return Status::OK();
@@ -170,7 +175,7 @@ Status JniConnector::_init_jni_scanner(JNIEnv* env, int batch_size) {
     RETURN_ERROR_IF_EXC(env);
     _jni_scanner_release_table = env->GetMethodID(_jni_scanner_cls, "releaseTable", "()V");
     RETURN_ERROR_IF_EXC(env);
-
+    RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, _jni_scanner_obj, &_jni_scanner_obj));
     return Status::OK();
 }
 
@@ -180,9 +185,11 @@ Status JniConnector::_fill_block(Block* block, size_t num_rows) {
         auto& column_ptr = column_with_type_and_name.column;
         auto& column_type = column_with_type_and_name.type;
         RETURN_IF_ERROR(_fill_column(column_ptr, column_type, num_rows));
+        JNIEnv* env = nullptr;
+        RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
         // Column is not released when _fill_column failed. It will be released when releasing table.
-        _env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_column, i);
-        RETURN_ERROR_IF_EXC(_env);
+        env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_column, i);
+        RETURN_ERROR_IF_EXC(env);
     }
     return Status::OK();
 }
diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h
index 14915ac533..90d5fbcd0a 100644
--- a/be/src/vec/exec/jni_connector.h
+++ b/be/src/vec/exec/jni_connector.h
@@ -227,7 +227,6 @@ private:
 
     long* _meta_ptr;
     int _meta_index;
-    JNIEnv* _env = nullptr;
 
     int _predicates_length = 0;
     std::unique_ptr<char[]> _predicates = nullptr;
diff --git a/be/src/vec/exec/scan/max_compute_jni_reader.cpp b/be/src/vec/exec/scan/max_compute_jni_reader.cpp
new file mode 100644
index 0000000000..f5182931d1
--- /dev/null
+++ b/be/src/vec/exec/scan/max_compute_jni_reader.cpp
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "max_compute_jni_reader.h"
+
+#include <glog/logging.h>
+
+#include <map>
+#include <ostream>
+
+#include "runtime/descriptors.h"
+#include "runtime/types.h"
+#include "vec/core/types.h"
+
+namespace doris {
+class RuntimeProfile;
+class RuntimeState;
+
+namespace vectorized {
+class Block;
+} // namespace vectorized
+} // namespace doris
+
+namespace doris::vectorized {
+
+MaxComputeJniReader::MaxComputeJniReader(const MaxComputeTableDescriptor* mc_desc,
+                                         const std::vector<SlotDescriptor*>& file_slot_descs,
+                                         const TFileRangeDesc& range, RuntimeState* state,
+                                         RuntimeProfile* profile)
+        : _file_slot_descs(file_slot_descs), _range(range), _state(state), _profile(profile) {
+    _table_desc = mc_desc;
+    std::ostringstream required_fields;
+    std::ostringstream columns_types;
+    std::vector<std::string> column_names;
+    int index = 0;
+    for (auto& desc : _file_slot_descs) {
+        std::string field = desc->col_name();
+        std::string type = JniConnector::get_hive_type(desc->type());
+        column_names.emplace_back(field);
+        if (index == 0) {
+            required_fields << field;
+            columns_types << type;
+        } else {
+            required_fields << "," << field;
+            columns_types << "#" << type;
+        }
+        index++;
+    }
+    std::map<String, String> params = {{"region", _table_desc->region()},
+                                       {"access_key", _table_desc->access_key()},
+                                       {"secret_key", _table_desc->secret_key()},
+                                       {"project", _table_desc->project()},
+                                       {"table", _table_desc->table()},
+                                       {"public_access", _table_desc->public_access()},
+                                       {"start_offset", std::to_string(_range.start_offset)},
+                                       {"split_size", std::to_string(_range.size)},
+                                       {"required_fields", required_fields.str()},
+                                       {"columns_types", columns_types.str()}};
+    _jni_connector = std::make_unique<JniConnector>("org/apache/doris/jni/MaxComputeJniScanner",
+                                                    params, column_names);
+}
+
+Status MaxComputeJniReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
+    RETURN_IF_ERROR(_jni_connector->get_nex_block(block, read_rows, eof));
+    if (*eof) {
+        RETURN_IF_ERROR(_jni_connector->close());
+    }
+    return Status::OK();
+}
+
+Status MaxComputeJniReader::get_columns(
+        std::unordered_map<std::string, TypeDescriptor>* name_to_type,
+        std::unordered_set<std::string>* missing_cols) {
+    for (auto& desc : _file_slot_descs) {
+        name_to_type->emplace(desc->col_name(), desc->type());
+    }
+    return Status::OK();
+}
+
+Status MaxComputeJniReader::init_reader(
+        std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
+    _colname_to_value_range = colname_to_value_range;
+    RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
+    return _jni_connector->open(_state, _profile);
+}
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/max_compute_jni_reader.h b/be/src/vec/exec/scan/max_compute_jni_reader.h
new file mode 100644
index 0000000000..0b3c809c50
--- /dev/null
+++ b/be/src/vec/exec/scan/max_compute_jni_reader.h
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stddef.h>
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "common/status.h"
+#include "exec/olap_common.h"
+#include "runtime/descriptors.h"
+#include "vec/exec/format/generic_reader.h"
+#include "vec/exec/jni_connector.h"
+
+namespace doris {
+class RuntimeProfile;
+class RuntimeState;
+class SlotDescriptor;
+namespace vectorized {
+class Block;
+} // namespace vectorized
+struct TypeDescriptor;
+} // namespace doris
+
+namespace doris::vectorized {
+
+/**
+ * The demo usage of JniReader, showing how to read data from java scanner.
+ * The java side is also a mock reader that provide values for each type.
+ * This class will only be retained during the functional testing phase to verify that
+ * the communication and data exchange with the jvm are correct.
+ */
+class MaxComputeJniReader : public GenericReader {
+    ENABLE_FACTORY_CREATOR(MaxComputeJniReader);
+
+public:
+    MaxComputeJniReader(const MaxComputeTableDescriptor* mc_desc,
+                        const std::vector<SlotDescriptor*>& file_slot_descs,
+                        const TFileRangeDesc& range, RuntimeState* state, RuntimeProfile* profile);
+
+    ~MaxComputeJniReader() override = default;
+
+    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+
+    Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
+                       std::unordered_set<std::string>* missing_cols) override;
+
+    Status init_reader(
+            std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
+
+private:
+    const MaxComputeTableDescriptor* _table_desc;
+    const std::vector<SlotDescriptor*>& _file_slot_descs;
+    const TFileRangeDesc& _range;
+    RuntimeState* _state;
+    RuntimeProfile* _profile;
+    std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
+    std::unique_ptr<JniConnector> _jni_connector;
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index a1a15e61e2..b7f8119553 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -60,6 +60,7 @@
 #include "vec/exec/format/orc/vorc_reader.h"
 #include "vec/exec/format/parquet/vparquet_reader.h"
 #include "vec/exec/format/table/iceberg_reader.h"
+#include "vec/exec/scan/max_compute_jni_reader.h"
 #include "vec/exec/scan/new_file_scan_node.h"
 #include "vec/exec/scan/vscan_node.h"
 #include "vec/exprs/vexpr.h"
@@ -588,6 +589,19 @@ Status VFileScanner::_get_next_reader() {
         Status init_status;
         // TODO: use data lake type
         switch (_params.format_type) {
+        case TFileFormatType::FORMAT_JNI: {
+            if (_real_tuple_desc->table_desc()->table_type() ==
+                ::doris::TTableType::type::MAX_COMPUTE_TABLE) {
+                const MaxComputeTableDescriptor* mc_desc =
+                        static_cast<const MaxComputeTableDescriptor*>(
+                                _real_tuple_desc->table_desc());
+                std::unique_ptr<MaxComputeJniReader> mc_reader = MaxComputeJniReader::create_unique(
+                        mc_desc, _file_slot_descs, range, _state, _profile);
+                init_status = mc_reader->init_reader(_colname_to_value_range);
+                _cur_reader = std::move(mc_reader);
+            }
+            break;
+        }
         case TFileFormatType::FORMAT_PARQUET: {
             std::unique_ptr<ParquetReader> parquet_reader = ParquetReader::create_unique(
                     _profile, _params, range, _state->query_options().batch_size,
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index 63f6b82143..4bce0193b4 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -410,7 +410,6 @@ under the License.
         <dependency>
             <groupId>com.aliyun.odps</groupId>
             <artifactId>odps-sdk-core</artifactId>
-            <version>0.43.3-public</version>
         </dependency>
         <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
         <dependency>
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java
index 5de781a686..012693bccd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java
@@ -32,10 +32,12 @@ import org.apache.doris.thrift.TTableType;
 import com.aliyun.odps.OdpsType;
 import com.aliyun.odps.Table;
 import com.aliyun.odps.type.ArrayTypeInfo;
+import com.aliyun.odps.type.CharTypeInfo;
 import com.aliyun.odps.type.DecimalTypeInfo;
 import com.aliyun.odps.type.MapTypeInfo;
 import com.aliyun.odps.type.StructTypeInfo;
 import com.aliyun.odps.type.TypeInfo;
+import com.aliyun.odps.type.VarcharTypeInfo;
 import com.google.common.collect.Lists;
 
 import java.util.ArrayList;
@@ -95,13 +97,15 @@ public class MaxComputeExternalTable extends ExternalTable {
                 return Type.BIGINT;
             }
             case CHAR: {
-                return Type.CHAR;
-            }
-            case VARCHAR: {
-                return Type.VARCHAR;
+                CharTypeInfo charType = (CharTypeInfo) typeInfo;
+                return ScalarType.createChar(charType.getLength());
             }
             case STRING: {
-                return Type.STRING;
+                return ScalarType.createStringType();
+            }
+            case VARCHAR: {
+                VarcharTypeInfo varcharType = (VarcharTypeInfo) typeInfo;
+                return ScalarType.createVarchar(varcharType.getLength());
             }
             case JSON: {
                 return Type.UNSUPPORTED;
@@ -158,7 +162,11 @@ public class MaxComputeExternalTable extends ExternalTable {
     public TTableDescriptor toThrift() {
         List<Column> schema = getFullSchema();
         TMCTable tMcTable = new TMCTable();
-        tMcTable.setTunnelUrl(((MaxComputeExternalCatalog) catalog).getTunnelUrl());
+        MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) catalog;
+        tMcTable.setRegion(mcCatalog.getRegion());
+        tMcTable.setAccessKey(mcCatalog.getAccessKey());
+        tMcTable.setSecretKey(mcCatalog.getSecretKey());
+        tMcTable.setPublicAccess(String.valueOf(mcCatalog.enablePublicAccess()));
         // use mc project as dbName
         tMcTable.setProject(dbName);
         tMcTable.setTable(name);
@@ -168,9 +176,14 @@ public class MaxComputeExternalTable extends ExternalTable {
         return tTableDescriptor;
     }
 
+    public Table getOdpsTable() {
+        return odpsTable;
+    }
+
     @Override
     public String getMysqlType() {
         return "BASE TABLE";
     }
+
 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index ae070bf507..a23842c9bd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -107,15 +107,6 @@ public abstract class ExternalCatalog
         // set some default properties when creating catalog
     }
 
-    /**
-     * @return names of database in this catalog.
-     */
-    // public abstract List<String> listDatabaseNames(SessionContext ctx);
-    public List<String> listDatabaseNames(SessionContext ctx) {
-        makeSureInitialized();
-        return new ArrayList<>(dbNameToId.keySet());
-    }
-
     /**
      * @param dbName
      * @return names of tables in specified database
@@ -315,9 +306,13 @@ public abstract class ExternalCatalog
         this.comment = comment;
     }
 
+    /**
+     * @return names of database in this catalog.
+     */
     @Override
     public List<String> getDbNames() {
-        return listDatabaseNames(null);
+        makeSureInitialized();
+        return new ArrayList<>(dbNameToId.keySet());
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java
index d3f77a985f..c62abf9ada 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java
@@ -24,7 +24,10 @@ import com.aliyun.odps.Odps;
 import com.aliyun.odps.OdpsException;
 import com.aliyun.odps.account.Account;
 import com.aliyun.odps.account.AliyunAccount;
+import com.aliyun.odps.tunnel.TableTunnel;
+import com.aliyun.odps.tunnel.TunnelException;
 import com.google.common.base.Strings;
+import com.google.gson.annotations.SerializedName;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -32,9 +35,16 @@ import java.util.Map;
 
 public class MaxComputeExternalCatalog extends ExternalCatalog {
     private Odps odps;
-    private String tunnelUrl;
+    @SerializedName(value = "region")
+    private String region;
+    @SerializedName(value = "accessKey")
+    private String accessKey;
+    @SerializedName(value = "secretKey")
+    private String secretKey;
+    @SerializedName(value = "publicAccess")
+    private boolean enablePublicAccess;
     private static final String odpsUrlTemplate = "http://service.{}.maxcompute.aliyun.com/api";
-    private static final String tunnelUrlTemplate = "http://dt.{}.maxcompute.aliyun.com";
+    private static final String tunnelUrlTemplate = "http://dt.{}.maxcompute.aliyun-inc.com";
 
     public MaxComputeExternalCatalog(long catalogId, String name, String resource, Map<String, String> props,
             String comment) {
@@ -57,12 +67,30 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
             // may use oss-cn-beijing, ensure compatible
             region = region.replace("oss-", "");
         }
-        this.tunnelUrl = tunnelUrlTemplate.replace("{}", region);
+        this.region = region;
         CloudCredential credential = MCProperties.getCredential(props);
-        Account account = new AliyunAccount(credential.getAccessKey(), credential.getSecretKey());
+        if (!credential.isWhole()) {
+            throw new IllegalArgumentException("Max-Compute credential properties '"
+                    + MCProperties.ACCESS_KEY + "' and  '" + MCProperties.SECRET_KEY + "' are required.");
+        }
+        accessKey = credential.getAccessKey();
+        secretKey = credential.getSecretKey();
+        Account account = new AliyunAccount(accessKey, secretKey);
         this.odps = new Odps(account);
         odps.setEndpoint(odpsUrlTemplate.replace("{}", region));
         odps.setDefaultProject(defaultProject);
+        enablePublicAccess = Boolean.parseBoolean(props.getOrDefault(MCProperties.PUBLIC_ACCESS, "false"));
+    }
+
+    public long getTotalRows(String project, String table) throws TunnelException {
+        makeSureInitialized();
+        TableTunnel tunnel = new TableTunnel(odps);
+        String tunnelUrl = tunnelUrlTemplate.replace("{}", region);
+        if (enablePublicAccess) {
+            tunnelUrl = tunnelUrlTemplate.replace("-inc", "");
+        }
+        tunnel.setEndpoint(tunnelUrl);
+        return tunnel.createDownloadSession(project, table).getRecordCount();
     }
 
     public Odps getClient() {
@@ -73,6 +101,8 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
     protected List<String> listDatabaseNames() {
         List<String> result = new ArrayList<>();
         try {
+            // TODO: How to get all privileged project from max compute as databases?
+            // Now only have permission to show default project.
             result.add(odps.projects().get(odps.getDefaultProject()).getName());
         } catch (OdpsException e) {
             throw new RuntimeException(e);
@@ -99,11 +129,26 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
     }
 
     /**
-     * data tunnel url
-     * @return tunnelUrl, required by jni scanner.
+     * use region to create data tunnel url
+     * @return region, required by jni scanner.
      */
-    public String getTunnelUrl() {
+    public String getRegion() {
+        makeSureInitialized();
+        return region;
+    }
+
+    public String getAccessKey() {
+        makeSureInitialized();
+        return accessKey;
+    }
+
+    public String getSecretKey() {
+        makeSureInitialized();
+        return secretKey;
+    }
+
+    public boolean enablePublicAccess() {
         makeSureInitialized();
-        return tunnelUrl;
+        return enablePublicAccess;
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java
index 0fb4274049..32c8534ace 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java
@@ -30,6 +30,7 @@ public class MCProperties extends BaseProperties {
     public static final String ACCESS_KEY = "mc.access_key";
     public static final String SECRET_KEY = "mc.secret_key";
     public static final String SESSION_TOKEN = "mc.session_token";
+    public static final String PUBLIC_ACCESS = "mc.public_access";
 
     public static CloudCredential getCredential(Map<String, String> props) {
         return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index b07a48da2b..508b174fb5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -194,7 +194,11 @@ public abstract class FileQueryScanNode extends FileScanNode {
             throws UserException {
         TableIf tbl = getTargetTable();
         List<Integer> columnIdxs = Lists.newArrayList();
-
+        // avoid null pointer, it maybe has no slots when two tables are joined
+        if (params.getRequiredSlots() == null) {
+            params.setColumnIdxs(columnIdxs);
+            return;
+        }
         for (TFileScanSlotInfo slot : params.getRequiredSlots()) {
             if (!slot.isIsFileSlot()) {
                 continue;
@@ -273,6 +277,7 @@ public abstract class FileQueryScanNode extends FileScanNode {
             TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys);
             // external data lake table
             if (fileSplit instanceof IcebergSplit) {
+                // TODO: extract all data lake split to factory
                 IcebergScanNode.setIcebergParams(rangeDesc, (IcebergSplit) fileSplit);
             }
 
@@ -329,7 +334,9 @@ public abstract class FileQueryScanNode extends FileScanNode {
 
         if (getLocationType() == TFileType.FILE_HDFS) {
             rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
-        } else if (getLocationType() == TFileType.FILE_S3 || getLocationType() == TFileType.FILE_BROKER) {
+        } else if (getLocationType() == TFileType.FILE_S3
+                || getLocationType() == TFileType.FILE_BROKER
+                || getLocationType() == TFileType.FILE_NET) {
             // need full path
             rangeDesc.setPath(fileSplit.getPath().toString());
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
index 367576ba6c..102292e4c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
@@ -20,13 +20,16 @@ package org.apache.doris.planner.external;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.external.MaxComputeExternalTable;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.MaxComputeExternalCatalog;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileType;
 
+import com.aliyun.odps.tunnel.TunnelException;
 import org.apache.hadoop.fs.Path;
 
 import java.util.ArrayList;
@@ -38,22 +41,24 @@ import java.util.Map;
 public class MaxComputeScanNode extends FileQueryScanNode {
 
     private final MaxComputeExternalTable table;
+    private final MaxComputeExternalCatalog catalog;
+    public static final int MIN_SPLIT_SIZE = 4096;
 
     public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName,
                               StatisticalType statisticalType, boolean needCheckColumnPriv) {
         super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
         table = (MaxComputeExternalTable) desc.getTable();
+        catalog = (MaxComputeExternalCatalog) table.getCatalog();
     }
 
     @Override
     protected TFileType getLocationType() throws UserException {
-        return TFileType.FILE_STREAM;
+        return TFileType.FILE_NET;
     }
 
     @Override
     public TFileFormatType getFileFormatType() {
-        // TODO: use max compute format
-        return TFileFormatType.FORMAT_PARQUET;
+        return TFileFormatType.FORMAT_JNI;
     }
 
     @Override
@@ -74,7 +79,42 @@ public class MaxComputeScanNode extends FileQueryScanNode {
     @Override
     protected List<Split> getSplits() throws UserException {
         List<Split> result = new ArrayList<>();
-        result.add(new FileSplit(new Path("/"), 0, -1, -1, 0L, new String[0], Collections.emptyList()));
+        // String splitPath = catalog.getTunnelUrl();
+        // TODO: use single max compute scan node rather than file scan node
+        com.aliyun.odps.Table odpsTable = table.getOdpsTable();
+        if (desc.getSlots().isEmpty() || odpsTable.getFileNum() <= 0) {
+            return result;
+        }
+        try {
+            List<Pair<Long, Long>> sliceRange = new ArrayList<>();
+            long totalRows = catalog.getTotalRows(table.getDbName(), table.getName());
+            long fileNum = odpsTable.getFileNum();
+            long start = 0;
+            long splitSize = (long) Math.ceil((double) totalRows / fileNum);
+            if (splitSize <= 0 || totalRows < MIN_SPLIT_SIZE) {
+                // use whole split
+                sliceRange.add(Pair.of(start, totalRows));
+            } else {
+                for (int i = 0; i < fileNum; i++) {
+                    if (start > totalRows) {
+                        break;
+                    }
+                    sliceRange.add(Pair.of(start, splitSize));
+                    start += splitSize;
+                }
+            }
+            long modificationTime = odpsTable.getLastDataModifiedTime().getTime();
+            if (!sliceRange.isEmpty()) {
+                for (int i = 0; i < sliceRange.size(); i++) {
+                    Pair<Long, Long> range = sliceRange.get(i);
+                    result.add(new FileSplit(new Path("/virtual_slice_" + i), range.first, range.second,
+                            totalRows, modificationTime, null, Collections.emptyList()));
+                }
+            }
+        } catch (TunnelException e) {
+            throw new UserException("Max Compute tunnel SDK exception.", e);
+
+        }
         return result;
     }
 }
diff --git a/fe/java-udf/pom.xml b/fe/java-udf/pom.xml
index 0242bc2370..66d0123795 100644
--- a/fe/java-udf/pom.xml
+++ b/fe/java-udf/pom.xml
@@ -50,10 +50,6 @@ under the License.
             <artifactId>fe-common</artifactId>
             <version>${project.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.doris</groupId>
-            <artifactId>hive-catalog-shade</artifactId>
-        </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-core</artifactId>
@@ -81,6 +77,36 @@ under the License.
             <artifactId>clickhouse-jdbc</artifactId>
             <classifier>all</classifier>
         </dependency>
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>odps-sdk-core</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.codehaus.jackson</groupId>
+                    <artifactId>jackson-core-asl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.codehaus.jackson</groupId>
+                    <artifactId>jackson-mapper-asl</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.arrow</groupId>
+            <artifactId>arrow-vector</artifactId>
+            <version>9.0.0</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-databind</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.arrow</groupId>
+            <artifactId>arrow-memory-unsafe</artifactId>
+            <version>9.0.0</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.hudi</groupId>
             <artifactId>hudi-presto-bundle</artifactId>
@@ -136,7 +162,10 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
-
+        <dependency>
+            <groupId>org.apache.doris</groupId>
+            <artifactId>hive-catalog-shade</artifactId>
+        </dependency>
     </dependencies>
     <build>
         <finalName>java-udf</finalName>
diff --git a/fe/java-udf/src/main/java/org/apache/doris/hudi/HudiColumnValue.java b/fe/java-udf/src/main/java/org/apache/doris/hudi/HudiColumnValue.java
index 3a5d4059b2..11fbd0558c 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/hudi/HudiColumnValue.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/hudi/HudiColumnValue.java
@@ -41,6 +41,11 @@ public class HudiColumnValue implements ColumnValue {
         return ((PrimitiveObjectInspector) fieldInspector).getPrimitiveJavaObject(fieldData);
     }
 
+    @Override
+    public boolean isNull() {
+        return false;
+    }
+
     @Override
     public boolean getBoolean() {
         return (boolean) inspectObject();
diff --git a/fe/java-udf/src/main/java/org/apache/doris/jni/JniScanner.java b/fe/java-udf/src/main/java/org/apache/doris/jni/JniScanner.java
index 3f1f1df462..3d0223ba75 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/jni/JniScanner.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/jni/JniScanner.java
@@ -72,6 +72,7 @@ public abstract class JniScanner {
             throw e;
         }
         if (numRows == 0) {
+            releaseTable();
             return 0;
         }
         return getMetaAddress(numRows);
@@ -83,7 +84,9 @@ public abstract class JniScanner {
     }
 
     protected void resetTable() {
-        vectorTable.reset();
+        if (vectorTable != null) {
+            vectorTable.reset();
+        }
     }
 
     protected void releaseColumn(int fieldId) {
diff --git a/fe/java-udf/src/main/java/org/apache/doris/jni/MaxComputeJniScanner.java b/fe/java-udf/src/main/java/org/apache/doris/jni/MaxComputeJniScanner.java
new file mode 100644
index 0000000000..65aa2ccded
--- /dev/null
+++ b/fe/java-udf/src/main/java/org/apache/doris/jni/MaxComputeJniScanner.java
@@ -0,0 +1,251 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jni;
+
+import org.apache.doris.jni.vec.ColumnType;
+import org.apache.doris.jni.vec.MaxComputeColumnValue;
+import org.apache.doris.jni.vec.ScanPredicate;
+
+import com.aliyun.odps.Column;
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.OdpsType;
+import com.aliyun.odps.account.AliyunAccount;
+import com.aliyun.odps.data.ArrowRecordReader;
+import com.aliyun.odps.tunnel.TableTunnel;
+import com.aliyun.odps.type.TypeInfo;
+import com.aliyun.odps.type.TypeInfoFactory;
+import com.google.common.base.Strings;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * MaxComputeJ JniScanner. BE will read data from the scanner object.
+ */
+public class MaxComputeJniScanner extends JniScanner {
+    private Odps odps;
+    private TableTunnel tunnel;
+
+    private static final Logger LOG = Logger.getLogger(MaxComputeJniScanner.class);
+    private static final String odpsUrlTemplate = "http://service.{}.maxcompute.aliyun.com/api";
+    private static final String tunnelUrlTemplate = "http://dt.{}.maxcompute.aliyun-inc.com";
+    private static final String REGION = "region";
+    private static final String PROJECT = "project";
+    private static final String TABLE = "table";
+    private static final String ACCESS_KEY = "access_key";
+    private static final String SECRET_KEY = "secret_key";
+    private static final String START_OFFSET = "start_offset";
+    private static final String SPLIT_SIZE = "split_size";
+    private static final String PUBLIC_ACCESS = "public_access";
+    private final String project;
+    private final String table;
+    private MaxComputeColumnValue columnValue;
+    private long remainBatchRows = 0;
+    private long totalRows = 0;
+    private TableTunnel.DownloadSession session;
+    private ArrowRecordReader curReader;
+    private List<Column> columns;
+    private Map<String, Integer> readColumnsId;
+    private long startOffset = -1L;
+    private long splitSize = -1L;
+
+    public MaxComputeJniScanner(int batchSize, Map<String, String> params) {
+        String region = Objects.requireNonNull(params.get(REGION), "required property '" + REGION + "'.");
+        project = Objects.requireNonNull(params.get(PROJECT), "required property '" + PROJECT + "'.");
+        table = Objects.requireNonNull(params.get(TABLE), "required property '" + TABLE + "'.");
+        if (!Strings.isNullOrEmpty(params.get(START_OFFSET))
+                && !Strings.isNullOrEmpty(params.get(SPLIT_SIZE))) {
+            startOffset = Long.parseLong(params.get(START_OFFSET));
+            splitSize = Long.parseLong(params.get(SPLIT_SIZE));
+        }
+        String accessKey = Objects.requireNonNull(params.get(ACCESS_KEY), "required property '" + ACCESS_KEY + "'.");
+        String secretKey = Objects.requireNonNull(params.get(SECRET_KEY), "required property '" + SECRET_KEY + "'.");
+        odps = new Odps(new AliyunAccount(accessKey, secretKey));
+        odps.setEndpoint(odpsUrlTemplate.replace("{}", region));
+        odps.setDefaultProject(project);
+        tunnel = new TableTunnel(odps);
+        String tunnelUrl = tunnelUrlTemplate.replace("{}", region);
+        boolean enablePublicAccess = Boolean.parseBoolean(params.getOrDefault(PUBLIC_ACCESS, "false"));
+        if (enablePublicAccess) {
+            tunnelUrl = tunnelUrlTemplate.replace("-inc", "");
+        }
+        tunnel.setEndpoint(tunnelUrl);
+        String[] requiredFields = params.get("required_fields").split(",");
+        String[] types = params.get("columns_types").split("#");
+        ColumnType[] columnTypes = new ColumnType[types.length];
+        for (int i = 0; i < types.length; i++) {
+            columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]);
+        }
+        ScanPredicate[] predicates = new ScanPredicate[0];
+        if (params.containsKey("push_down_predicates")) {
+            long predicatesAddress = Long.parseLong(params.get("push_down_predicates"));
+            if (predicatesAddress != 0) {
+                predicates = ScanPredicate.parseScanPredicates(predicatesAddress, columnTypes);
+                LOG.info("MaxComputeJniScanner gets pushed-down predicates:  " + ScanPredicate.dump(predicates));
+            }
+        }
+        initTableInfo(columnTypes, requiredFields, predicates, batchSize);
+    }
+
+    @Override
+    protected void initTableInfo(ColumnType[] requiredTypes, String[] requiredFields, ScanPredicate[] predicates,
+                                 int batchSize) {
+        super.initTableInfo(requiredTypes, requiredFields, predicates, batchSize);
+        columns = new ArrayList<>();
+        readColumnsId = new HashMap<>();
+        for (int i = 0; i < fields.length; i++) {
+            if (!Strings.isNullOrEmpty(fields[i])) {
+                columns.add(createOdpsColumn(i, types[i]));
+                readColumnsId.put(fields[i], i);
+            }
+        }
+        // reorder columns
+        List<Column> columnList = odps.tables().get(table).getSchema().getColumns();
+        Map<String, Integer> columnRank = new HashMap<>();
+        for (int i = 0; i < columnList.size(); i++) {
+            columnRank.put(columnList.get(i).getName(), i);
+        }
+        // Downloading columns data from Max compute only supports the order of table metadata.
+        // We might get an error message if no sort here: Column reorder is not supported in legacy arrow mode.
+        columns.sort((Comparator.comparing(o -> columnRank.get(o.getName()))));
+    }
+
+    @Override
+    public void open() throws IOException {
+        if (columns.isEmpty()) {
+            return;
+        }
+        try {
+            session = tunnel.createDownloadSession(project, table);
+            if (splitSize > 0) {
+                totalRows = Math.min(splitSize, session.getRecordCount());
+            } else {
+                totalRows = session.getRecordCount();
+            }
+            long start = startOffset == -1L ? 0 : startOffset;
+            curReader = session.openArrowRecordReader(start, totalRows, columns);
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+        remainBatchRows = totalRows;
+    }
+
+    private Column createOdpsColumn(int colIdx, ColumnType dorisType) {
+        TypeInfo odpsType;
+        switch (dorisType.getType()) {
+            case BOOLEAN:
+                odpsType = TypeInfoFactory.BOOLEAN;
+                break;
+            case TINYINT:
+                odpsType = TypeInfoFactory.TINYINT;
+                break;
+            case SMALLINT:
+                odpsType = TypeInfoFactory.SMALLINT;
+                break;
+            case INT:
+                odpsType = TypeInfoFactory.INT;
+                break;
+            case BIGINT:
+                odpsType = TypeInfoFactory.BIGINT;
+                break;
+            case DECIMAL32:
+            case DECIMAL64:
+            case DECIMAL128:
+            case DECIMALV2:
+                odpsType = TypeInfoFactory.getDecimalTypeInfo(dorisType.getPrecision(), dorisType.getScale());
+                break;
+            case FLOAT:
+                odpsType = TypeInfoFactory.FLOAT;
+                break;
+            case DOUBLE:
+                odpsType = TypeInfoFactory.DOUBLE;
+                break;
+            case DATETIMEV2:
+                odpsType = TypeInfoFactory.DATETIME;
+                break;
+            case DATEV2:
+                odpsType = TypeInfoFactory.DATE;
+                break;
+            case CHAR:
+                odpsType = TypeInfoFactory.getCharTypeInfo(dorisType.getLength());
+                break;
+            case VARCHAR:
+                odpsType = TypeInfoFactory.getVarcharTypeInfo(dorisType.getLength());
+                break;
+            case STRING:
+                odpsType = TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.STRING);
+                break;
+            default:
+                throw new RuntimeException("Unsupported transform for column type: " + dorisType.getType());
+        }
+        return new Column(fields[colIdx], odpsType);
+    }
+
+    @Override
+    public void close() throws IOException {
+        remainBatchRows = 0;
+        totalRows = 0;
+        startOffset = -1;
+        splitSize = -1;
+        if (curReader != null) {
+            curReader.close();
+        }
+    }
+
+    @Override
+    protected int getNext() throws IOException {
+        if (curReader == null) {
+            return 0;
+        }
+        columnValue = new MaxComputeColumnValue();
+        int expectedRows = (int) Math.min(batchSize, remainBatchRows);
+        int realRows = readVectors(expectedRows);
+        if (remainBatchRows <= 0) {
+            return 0;
+        }
+        remainBatchRows -= realRows;
+        return realRows;
+    }
+
+    private int readVectors(int expectedRows) throws IOException {
+        VectorSchemaRoot batch;
+        int curReadRows = 0;
+        while (curReadRows < expectedRows && (batch = curReader.read()) != null) {
+            List<FieldVector> fieldVectors = batch.getFieldVectors();
+            int batchRows = 0;
+            for (FieldVector column : fieldVectors) {
+                columnValue.reset(column);
+                // LOG.warn("MCJNI read getClass: " + column.getClass());
+                batchRows = column.getValueCount();
+                for (int j = 0; j < batchRows; j++) {
+                    appendData(readColumnsId.get(column.getName()), columnValue);
+                }
+            }
+            curReadRows += batchRows;
+        }
+        return curReadRows;
+    }
+}
diff --git a/fe/java-udf/src/main/java/org/apache/doris/jni/MockJniScanner.java b/fe/java-udf/src/main/java/org/apache/doris/jni/MockJniScanner.java
index afc957ec17..c4c4c5f80b 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/jni/MockJniScanner.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/jni/MockJniScanner.java
@@ -49,6 +49,11 @@ public class MockJniScanner extends JniScanner {
             this.j = j;
         }
 
+        @Override
+        public boolean isNull() {
+            return false;
+        }
+
         @Override
         public boolean getBoolean() {
             return (i + j) % 2 == 0;
diff --git a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ColumnValue.java b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ColumnValue.java
index da76b9cf33..8d190aa212 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ColumnValue.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ColumnValue.java
@@ -27,38 +27,40 @@ import java.util.List;
  * Column value in vector column
  */
 public interface ColumnValue {
-    public boolean getBoolean();
+    boolean isNull();
+
+    boolean getBoolean();
 
     // tinyint
-    public byte getByte();
+    byte getByte();
 
     // smallint
-    public short getShort();
+    short getShort();
 
-    public int getInt();
+    int getInt();
 
-    public float getFloat();
+    float getFloat();
 
     // bigint
-    public long getLong();
+    long getLong();
 
-    public double getDouble();
+    double getDouble();
 
-    public BigInteger getBigInteger();
+    BigInteger getBigInteger();
 
-    public BigDecimal getDecimal();
+    BigDecimal getDecimal();
 
-    public String getString();
+    String getString();
 
-    public LocalDate getDate();
+    LocalDate getDate();
 
-    public LocalDateTime getDateTime();
+    LocalDateTime getDateTime();
 
-    public byte[] getBytes();
+    byte[] getBytes();
 
-    public void unpackArray(List<ColumnValue> values);
+    void unpackArray(List<ColumnValue> values);
 
-    public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values);
+    void unpackMap(List<ColumnValue> keys, List<ColumnValue> values);
 
-    public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> values);
+    void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> values);
 }
diff --git a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/MaxComputeColumnValue.java b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/MaxComputeColumnValue.java
new file mode 100644
index 0000000000..0945f1f326
--- /dev/null
+++ b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/MaxComputeColumnValue.java
@@ -0,0 +1,185 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jni.vec;
+
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DateMilliVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.util.DecimalUtility;
+import org.apache.log4j.Logger;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.List;
+
+/**
+ * MaxCompute Column value in vector column
+ */
+public class MaxComputeColumnValue implements ColumnValue {
+    private static final Logger LOG = Logger.getLogger(MaxComputeColumnValue.class);
+    private int idx;
+    private FieldVector column;
+
+    public MaxComputeColumnValue() {
+        idx = 0;
+    }
+
+    public void reset(FieldVector column) {
+        this.column = column;
+        this.idx = 0;
+    }
+
+    @Override
+    public boolean isNull() {
+        return column.isNull(idx);
+    }
+
+    private void skippedIfNull() {
+        // null has been process by appendValue with isNull()
+        try {
+            if (column.isNull(idx)) {
+                idx++;
+            }
+        } catch (IndexOutOfBoundsException e) {
+            // skip left rows
+            idx++;
+        }
+    }
+
+    @Override
+    public boolean getBoolean() {
+        skippedIfNull();
+        TinyIntVector tinyIntCol = (TinyIntVector) column;
+        return tinyIntCol.get(idx++) > 0;
+    }
+
+    @Override
+    public byte getByte() {
+        skippedIfNull();
+        TinyIntVector tinyIntCol = (TinyIntVector) column;
+        return tinyIntCol.get(idx++);
+    }
+
+    @Override
+    public short getShort() {
+        skippedIfNull();
+        SmallIntVector smallIntCol = (SmallIntVector) column;
+        return smallIntCol.get(idx++);
+    }
+
+    @Override
+    public int getInt() {
+        skippedIfNull();
+        IntVector intCol = (IntVector) column;
+        return intCol.get(idx++);
+    }
+
+    @Override
+    public float getFloat() {
+        skippedIfNull();
+        Float4Vector floatCol = (Float4Vector) column;
+        return floatCol.get(idx++);
+    }
+
+    @Override
+    public long getLong() {
+        skippedIfNull();
+        BigIntVector longCol = (BigIntVector) column;
+        return longCol.get(idx++);
+    }
+
+    @Override
+    public double getDouble() {
+        skippedIfNull();
+        Float8Vector doubleCol = (Float8Vector) column;
+        return doubleCol.get(idx++);
+    }
+
+    @Override
+    public BigInteger getBigInteger() {
+        skippedIfNull();
+        BigIntVector longCol = (BigIntVector) column;
+        return BigInteger.valueOf(longCol.get(idx++));
+    }
+
+    @Override
+    public BigDecimal getDecimal() {
+        skippedIfNull();
+        DecimalVector decimalCol = (DecimalVector) column;
+        return DecimalUtility.getBigDecimalFromArrowBuf(column.getDataBuffer(), idx++,
+                    decimalCol.getScale(), DecimalVector.TYPE_WIDTH);
+    }
+
+    @Override
+    public String getString() {
+        skippedIfNull();
+        VarCharVector varcharCol = (VarCharVector) column;
+        String v = varcharCol.getObject(idx++).toString();
+        return v == null ? new String(new byte[0]) : v;
+    }
+
+    @Override
+    public LocalDate getDate() {
+        skippedIfNull();
+        DateDayVector dateCol = (DateDayVector) column;
+        Integer intVal = dateCol.getObject(idx++);
+        return LocalDate.ofEpochDay(intVal == null ? 0 : intVal);
+    }
+
+    @Override
+    public LocalDateTime getDateTime() {
+        skippedIfNull();
+        DateMilliVector datetimeCol = (DateMilliVector) column;
+        LocalDateTime v = datetimeCol.getObject(idx++);
+        return v == null ? LocalDateTime.MIN : v;
+    }
+
+    @Override
+    public byte[] getBytes() {
+        skippedIfNull();
+        VarBinaryVector binaryCol = (VarBinaryVector) column;
+        byte[] v = binaryCol.getObject(idx++);
+        return v == null ? new byte[0] : v;
+    }
+
+    @Override
+    public void unpackArray(List<ColumnValue> values) {
+
+    }
+
+    @Override
+    public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
+
+    }
+
+    @Override
+    public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> values) {
+
+    }
+}
diff --git a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ScanPredicate.java b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ScanPredicate.java
index a44de9062a..e02107f143 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ScanPredicate.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ScanPredicate.java
@@ -122,6 +122,11 @@ public class ScanPredicate {
             return inspectObject().toString();
         }
 
+        @Override
+        public boolean isNull() {
+            return false;
+        }
+
         @Override
         public boolean getBoolean() {
             return (boolean) inspectObject();
diff --git a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorColumn.java b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorColumn.java
index 8ce684ea95..4996191776 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorColumn.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorColumn.java
@@ -551,7 +551,7 @@ public class VectorColumn {
 
     public void appendValue(ColumnValue o) {
         ColumnType.Type typeValue = columnType.getType();
-        if (o == null) {
+        if (o == null || o.isNull()) {
             appendNull(typeValue);
             return;
         }
diff --git a/fe/pom.xml b/fe/pom.xml
index 9124594427..e2da4103d3 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -269,6 +269,7 @@ under the License.
         <!-- Please modify iceberg.version and avro.version together,
          you can find avro version info in iceberg mvn repository -->
         <iceberg.version>1.1.0</iceberg.version>
+        <maxcompute.version>0.43.3-public</maxcompute.version>
         <avro.version>1.11.1</avro.version>
         <!-- hudi -->
         <hudi.version>0.13.0</hudi.version>
@@ -1010,6 +1011,12 @@ under the License.
                 <groupId>org.apache.spark</groupId>
                 <artifactId>spark-sql_2.12</artifactId>
                 <version>${spark.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.apache.arrow</groupId>
+                        <artifactId>arrow-vector</artifactId>
+                    </exclusion>
+                </exclusions>
                 <scope>provided</scope>
             </dependency>
 
@@ -1102,7 +1109,17 @@ under the License.
                 <artifactId>iceberg-aws</artifactId>
                 <version>${iceberg.version}</version>
             </dependency>
-
+            <dependency>
+                <groupId>com.aliyun.odps</groupId>
+                <artifactId>odps-sdk-core</artifactId>
+                <version>${maxcompute.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.apache.arrow</groupId>
+                        <artifactId>arrow-vector</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
             <!-- For Iceberg, must be consistent with Iceberg version -->
             <dependency>
                 <groupId>org.apache.avro</groupId>
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index 631132f934..b848b847a4 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -307,12 +307,16 @@ struct TJdbcTable {
   6: optional string jdbc_resource_name
   7: optional string jdbc_driver_class
   8: optional string jdbc_driver_checksum
+  
 }
 
 struct TMCTable {
-  1: optional string tunnel_url
+  1: optional string region
   2: optional string project
   3: optional string table
+  4: optional string access_key
+  5: optional string secret_key
+  6: optional string public_access
 }
 
 // "Union" of all table types.
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 484b34c3d4..9c98cd4d28 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -114,6 +114,7 @@ enum TFileFormatType {
     FORMAT_ORC,
     FORMAT_JSON,
     FORMAT_PROTO,
+    FORMAT_JNI,
 }
 
 // In previous versions, the data compression format and file format were stored together, as TFileFormatType,
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index ee62f59aa5..5806483cba 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -645,6 +645,7 @@ enum TFileType {
     FILE_STREAM,    // file content is streaming in the buffer
     FILE_S3,
     FILE_HDFS,
+    FILE_NET,       // read file by network, such as http
 }
 
 struct TTabletCommitInfo {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 01/13: [fix](cooldown) fix bug due to tablets info changed (#20465)

Posted by kx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 41ec7010d6703335b1de44a7fb54cfac6e11316c
Author: AlexYue <yj...@gmail.com>
AuthorDate: Tue Jun 6 22:15:17 2023 +0800

    [fix](cooldown) fix bug due to tablets info changed (#20465)
---
 .../cold_heat_separation_p2/create_table_use_partition_policy.groovy    | 2 +-
 .../suites/cold_heat_separation_p2/create_table_use_policy.groovy       | 2 +-
 .../suites/cold_heat_separation_p2/modify_replica_use_partition.groovy  | 2 +-
 .../cold_heat_separation_p2/table_modify_resouce_and_policy.groovy      | 2 +-
 4 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/regression-test/suites/cold_heat_separation_p2/create_table_use_partition_policy.groovy b/regression-test/suites/cold_heat_separation_p2/create_table_use_partition_policy.groovy
index 122508918b..8af02ce0af 100644
--- a/regression-test/suites/cold_heat_separation_p2/create_table_use_partition_policy.groovy
+++ b/regression-test/suites/cold_heat_separation_p2/create_table_use_partition_policy.groovy
@@ -34,7 +34,7 @@ suite("create_table_use_partition_policy") {
     // data_sizes is one arrayList<Long>, t is tablet
     def fetchDataSize = { data_sizes, t ->
         def tabletId = t[0]
-        String meta_url = t[16]
+        String meta_url = t[17]
         def clos = {  respCode, body ->
             logger.info("test ttl expired resp Code {}", "${respCode}".toString())
             assertEquals("${respCode}".toString(), "200")
diff --git a/regression-test/suites/cold_heat_separation_p2/create_table_use_policy.groovy b/regression-test/suites/cold_heat_separation_p2/create_table_use_policy.groovy
index 7c9d5d794d..b8395d5b07 100644
--- a/regression-test/suites/cold_heat_separation_p2/create_table_use_policy.groovy
+++ b/regression-test/suites/cold_heat_separation_p2/create_table_use_policy.groovy
@@ -34,7 +34,7 @@ suite("create_table_use_policy") {
     // data_sizes is one arrayList<Long>, t is tablet
     def fetchDataSize = { data_sizes, t ->
         def tabletId = t[0]
-        String meta_url = t[16]
+        String meta_url = t[17]
         def clos = {  respCode, body ->
             logger.info("test ttl expired resp Code {}", "${respCode}".toString())
             assertEquals("${respCode}".toString(), "200")
diff --git a/regression-test/suites/cold_heat_separation_p2/modify_replica_use_partition.groovy b/regression-test/suites/cold_heat_separation_p2/modify_replica_use_partition.groovy
index 1373ef5685..bd22c3f7ef 100644
--- a/regression-test/suites/cold_heat_separation_p2/modify_replica_use_partition.groovy
+++ b/regression-test/suites/cold_heat_separation_p2/modify_replica_use_partition.groovy
@@ -34,7 +34,7 @@ suite("modify_replica_use_partition") {
     // data_sizes is one arrayList<Long>, t is tablet
     def fetchDataSize = { data_sizes, t ->
         def tabletId = t[0]
-        String meta_url = t[16]
+        String meta_url = t[17]
         def clos = {  respCode, body ->
             logger.info("test ttl expired resp Code {}", "${respCode}".toString())
             assertEquals("${respCode}".toString(), "200")
diff --git a/regression-test/suites/cold_heat_separation_p2/table_modify_resouce_and_policy.groovy b/regression-test/suites/cold_heat_separation_p2/table_modify_resouce_and_policy.groovy
index 5632a778e7..37f0bd1439 100644
--- a/regression-test/suites/cold_heat_separation_p2/table_modify_resouce_and_policy.groovy
+++ b/regression-test/suites/cold_heat_separation_p2/table_modify_resouce_and_policy.groovy
@@ -34,7 +34,7 @@ suite("table_modify_resouce") {
     // data_sizes is one arrayList<Long>, t is tablet
     def fetchDataSize = { data_sizes, t ->
         def tabletId = t[0]
-        String meta_url = t[16]
+        String meta_url = t[17]
         def clos = {  respCode, body ->
             logger.info("test ttl expired resp Code {}", "${respCode}".toString())
             assertEquals("${respCode}".toString(), "200")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 02/13: [improvement](sink) reuse rows buffer in msyql_result_writer (#20482)

Posted by kx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git

commit aff0907778ae9a5e0b627f48327d93d50921c56f
Author: Jerry Hu <mr...@gmail.com>
AuthorDate: Wed Jun 7 10:09:32 2023 +0800

    [improvement](sink) reuse rows buffer in msyql_result_writer (#20482)
    
    Creating a rows buffer for each block can impact non-negligible performance.
    So it is necessary to reuse the rows buffer.
    
    Test with a total of 1.7M rows, the AppendBatchTime reduced from 500ms to 280ms.
---
 be/src/util/mysql_row_buffer.cpp         | 20 +++++++++++
 be/src/util/mysql_row_buffer.h           |  2 ++
 be/src/vec/sink/vmysql_result_writer.cpp | 61 +++++++++++++++++++-------------
 be/src/vec/sink/vmysql_result_writer.h   |  3 ++
 4 files changed, 61 insertions(+), 25 deletions(-)

diff --git a/be/src/util/mysql_row_buffer.cpp b/be/src/util/mysql_row_buffer.cpp
index f408cb31a1..f346c2d43c 100644
--- a/be/src/util/mysql_row_buffer.cpp
+++ b/be/src/util/mysql_row_buffer.cpp
@@ -80,6 +80,25 @@ MysqlRowBuffer<is_binary_format>::MysqlRowBuffer()
           _dynamic_mode(0),
           _len_pos(0) {}
 
+template <bool is_binary_format>
+MysqlRowBuffer<is_binary_format>::MysqlRowBuffer(MysqlRowBuffer<is_binary_format>&& other) {
+    if (other._buf == other._default_buf) {
+        auto other_length = other.length();
+        memcpy(_default_buf, other._buf, other_length);
+        _buf = _default_buf;
+        _pos = _buf + other_length;
+    } else {
+        _buf = other._buf;
+        other._buf = other._default_buf;
+        _pos = other._pos;
+    }
+    _buf_size = other._buf_size;
+    _dynamic_mode = other._dynamic_mode;
+    _field_count = other._field_count;
+    _field_pos = other._field_pos;
+    _len_pos = other._len_pos;
+}
+
 template <bool is_binary_format>
 void MysqlRowBuffer<is_binary_format>::start_binary_row(uint32_t num_cols) {
     assert(is_binary_format);
@@ -94,6 +113,7 @@ template <bool is_binary_format>
 MysqlRowBuffer<is_binary_format>::~MysqlRowBuffer() {
     if (_buf != _default_buf) {
         delete[] _buf;
+        _buf = _default_buf;
     }
 }
 
diff --git a/be/src/util/mysql_row_buffer.h b/be/src/util/mysql_row_buffer.h
index 2df739450f..d0e91e766d 100644
--- a/be/src/util/mysql_row_buffer.h
+++ b/be/src/util/mysql_row_buffer.h
@@ -56,6 +56,8 @@ public:
     MysqlRowBuffer();
     ~MysqlRowBuffer();
 
+    MysqlRowBuffer(MysqlRowBuffer&& other);
+
     void reset() { _pos = _buf; }
 
     // Prepare for binary row buffer
diff --git a/be/src/vec/sink/vmysql_result_writer.cpp b/be/src/vec/sink/vmysql_result_writer.cpp
index 6e3e34bcc6..019c0556c4 100644
--- a/be/src/vec/sink/vmysql_result_writer.cpp
+++ b/be/src/vec/sink/vmysql_result_writer.cpp
@@ -98,6 +98,7 @@ void VMysqlResultWriter<is_binary_format>::_init_profile() {
     _append_row_batch_timer = ADD_TIMER(_parent_profile, "AppendBatchTime");
     _convert_tuple_timer = ADD_CHILD_TIMER(_parent_profile, "TupleConvertTime", "AppendBatchTime");
     _result_send_timer = ADD_CHILD_TIMER(_parent_profile, "ResultSendTime", "AppendBatchTime");
+    _copy_buffer_timer = ADD_CHILD_TIMER(_parent_profile, "CopyBufferTime", "AppendBatchTime");
     _sent_rows_counter = ADD_COUNTER(_parent_profile, "NumSentRows", TUnit::UNIT);
     _bytes_sent_counter = ADD_COUNTER(_parent_profile, "BytesSent", TUnit::BYTES);
 }
@@ -605,43 +606,53 @@ Status VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
     Block block;
     RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
                                                                        input_block, &block));
-    auto num_rows = block.rows();
-    std::vector<MysqlRowBuffer<is_binary_format>> rows_buffer;
-    rows_buffer.resize(num_rows);
-    if constexpr (is_binary_format) {
-        for (MysqlRowBuffer<is_binary_format>& buf : rows_buffer) {
-            buf.start_binary_row(_output_vexpr_ctxs.size());
-        }
-    }
 
     // convert one batch
     auto result = std::make_unique<TFetchDataResult>();
-    for (int i = 0; status.ok() && i < _output_vexpr_ctxs.size(); ++i) {
-        const auto& [column_ptr, col_const] = unpack_if_const(block.get_by_position(i).column);
-        auto type_ptr = block.get_by_position(i).type;
+    auto num_rows = block.rows();
 
-        DCHECK(num_rows == block.get_by_position(i).column->size())
-                << fmt::format("block's rows({}) != column{}'s size({})", num_rows, i,
-                               block.get_by_position(i).column->size());
+    {
+        SCOPED_TIMER(_convert_tuple_timer);
+        if (_rows_buffer.size() < num_rows) {
+            _rows_buffer.resize(num_rows);
+        }
 
-        RETURN_IF_ERROR(type_ptr->get_serde()->write_column_to_mysql(
-                *column_ptr, output_object_data(), rows_buffer, 0, 0, num_rows, col_const));
+        for (size_t i = 0; i != num_rows; ++i) {
+            _rows_buffer[i].reset();
+            if constexpr (is_binary_format) {
+                _rows_buffer[i].start_binary_row(_output_vexpr_ctxs.size());
+            }
+        }
 
-        if (!status) {
-            LOG(WARNING) << "convert row to mysql result failed. block_struct="
-                         << block.dump_structure();
-            break;
+        for (int i = 0; status.ok() && i < _output_vexpr_ctxs.size(); ++i) {
+            const auto& [column_ptr, col_const] = unpack_if_const(block.get_by_position(i).column);
+            auto type_ptr = block.get_by_position(i).type;
+
+            DCHECK(num_rows == block.get_by_position(i).column->size())
+                    << fmt::format("block's rows({}) != column{}'s size({})", num_rows, i,
+                                   block.get_by_position(i).column->size());
+
+            RETURN_IF_ERROR(type_ptr->get_serde()->write_column_to_mysql(
+                    *column_ptr, output_object_data(), _rows_buffer, 0, 0, num_rows, col_const));
+
+            if (!status) {
+                LOG(WARNING) << "convert row to mysql result failed. block_struct="
+                             << block.dump_structure();
+                break;
+            }
         }
     }
 
     uint64_t bytes_sent = 0;
     // copy MysqlRowBuffer to Thrift
-    result->result_batch.rows.resize(num_rows);
-    for (int i = 0; i < num_rows; ++i) {
-        result->result_batch.rows[i].append(rows_buffer[i].buf(), rows_buffer[i].length());
-        bytes_sent += rows_buffer[i].length();
+    {
+        SCOPED_TIMER(_copy_buffer_timer);
+        result->result_batch.rows.resize(num_rows);
+        for (int i = 0; i < num_rows; ++i) {
+            result->result_batch.rows[i].append(_rows_buffer[i].buf(), _rows_buffer[i].length());
+            bytes_sent += _rows_buffer[i].length();
+        }
     }
-
     if (status) {
         SCOPED_TIMER(_result_send_timer);
         // If this is a dry run task, no need to send data block
diff --git a/be/src/vec/sink/vmysql_result_writer.h b/be/src/vec/sink/vmysql_result_writer.h
index 0e0b4d9313..3b6b8579d7 100644
--- a/be/src/vec/sink/vmysql_result_writer.h
+++ b/be/src/vec/sink/vmysql_result_writer.h
@@ -69,6 +69,7 @@ private:
     BufferControlBlock* _sinker;
 
     const VExprContextSPtrs& _output_vexpr_ctxs;
+    std::vector<MysqlRowBuffer<is_binary_format>> _rows_buffer;
 
     RuntimeProfile* _parent_profile; // parent profile from result sink. not owned
     // total time cost on append batch operation
@@ -77,6 +78,8 @@ private:
     RuntimeProfile::Counter* _convert_tuple_timer = nullptr;
     // file write timer, child timer of _append_row_batch_timer
     RuntimeProfile::Counter* _result_send_timer = nullptr;
+    // timer of copying buffer to thrift
+    RuntimeProfile::Counter* _copy_buffer_timer = nullptr;
     // number of sent rows
     RuntimeProfile::Counter* _sent_rows_counter = nullptr;
     // size of sent data


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 06/13: [fix](stats) Make alter column stats no forward (#20501)

Posted by kx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git

commit e0e7e3e4fde4c30141a897c51948f8c5e0caa112
Author: AKIRA <33...@users.noreply.github.com>
AuthorDate: Wed Jun 7 11:14:44 2023 +0900

    [fix](stats) Make alter column stats no forward  (#20501)
    
    For test convenient, since daily regression tests queries would be sent any FE rather than master only.
---
 .../main/java/org/apache/doris/analysis/AlterColumnStatsStmt.java    | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColumnStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColumnStatsStmt.java
index bcb1aa1596..93edefcab1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColumnStatsStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColumnStatsStmt.java
@@ -175,4 +175,9 @@ public class AlterColumnStatsStmt extends DdlStmt {
     public String getValue(StatsType statsType) {
         return statsTypeToValue.get(statsType);
     }
+
+    @Override
+    public RedirectStatus getRedirectStatus() {
+        return RedirectStatus.NO_FORWARD;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 07/13: [fix](nereids) filter and project node should be pushed down through cte (#20508)

Posted by kx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 55698a240dd9382dabbe72f3259134dd7d926acd
Author: starocean999 <40...@users.noreply.github.com>
AuthorDate: Wed Jun 7 10:36:32 2023 +0800

    [fix](nereids) filter and project node should be pushed down through cte (#20508)
    
    1.move PushdownFilterThroughCTEAnchor and PushdownProjectThroughCTEAnchor into PUSH_DOWN_FILTERS rule set
    2.move PushdownFilterThroughProject before MergeProjectPostProcessor
---
 .../doris/nereids/jobs/batch/NereidsRewriter.java  |  6 ----
 .../nereids/processor/post/PlanPostProcessors.java |  2 +-
 .../org/apache/doris/nereids/rules/RuleSet.java    |  6 ++--
 .../org/apache/doris/nereids/rules/RuleType.java   |  4 +--
 ...TEAnchor.java => PushdownFilterThroughCTE.java} | 16 +++++-----
 ...EAnchor.java => PushdownProjectThroughCTE.java} | 16 +++++-----
 regression-test/data/nereids_syntax_p0/cte.out     | 10 +++++++
 .../suites/nereids_syntax_p0/cte.groovy            | 34 ++++++++++++++++++++++
 8 files changed, 67 insertions(+), 27 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/NereidsRewriter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/NereidsRewriter.java
index 8122f2669a..35689b9e5d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/NereidsRewriter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/NereidsRewriter.java
@@ -72,11 +72,9 @@ import org.apache.doris.nereids.rules.rewrite.logical.PruneFileScanPartition;
 import org.apache.doris.nereids.rules.rewrite.logical.PruneOlapScanPartition;
 import org.apache.doris.nereids.rules.rewrite.logical.PruneOlapScanTablet;
 import org.apache.doris.nereids.rules.rewrite.logical.PushFilterInsideJoin;
-import org.apache.doris.nereids.rules.rewrite.logical.PushdownFilterThroughCTEAnchor;
 import org.apache.doris.nereids.rules.rewrite.logical.PushdownFilterThroughProject;
 import org.apache.doris.nereids.rules.rewrite.logical.PushdownFilterThroughWindow;
 import org.apache.doris.nereids.rules.rewrite.logical.PushdownLimit;
-import org.apache.doris.nereids.rules.rewrite.logical.PushdownProjectThroughCTEAnchor;
 import org.apache.doris.nereids.rules.rewrite.logical.PushdownTopNThroughWindow;
 import org.apache.doris.nereids.rules.rewrite.logical.ReorderJoin;
 import org.apache.doris.nereids.rules.rewrite.logical.SemiJoinCommute;
@@ -127,10 +125,6 @@ public class NereidsRewriter extends BatchRewriteJob {
                     )
             ),
 
-            topic("Rewrite CTE", topDown(
-                    new PushdownFilterThroughCTEAnchor(),
-                    new PushdownProjectThroughCTEAnchor())),
-
             topic("Subquery unnesting",
                     custom(RuleType.AGG_SCALAR_SUBQUERY_TO_WINDOW_FUNCTION, AggScalarSubQueryToWindowFunction::new),
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
index 4e72b8738d..c7fe4309e6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
@@ -58,8 +58,8 @@ public class PlanPostProcessors {
     public List<PlanPostProcessor> getProcessors() {
         // add processor if we need
         Builder<PlanPostProcessor> builder = ImmutableList.builder();
-        builder.add(new MergeProjectPostProcessor());
         builder.add(new PushdownFilterThroughProject());
+        builder.add(new MergeProjectPostProcessor());
         builder.add(new FragmentProcessor());
         if (!cascadesContext.getConnectContext().getSessionVariable().getRuntimeFilterMode()
                         .toUpperCase().equals(TRuntimeFilterMode.OFF.name())) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
index 049680ee3f..2763c9ce9c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
@@ -75,7 +75,7 @@ import org.apache.doris.nereids.rules.rewrite.logical.MergeProjects;
 import org.apache.doris.nereids.rules.rewrite.logical.PushdownAliasThroughJoin;
 import org.apache.doris.nereids.rules.rewrite.logical.PushdownExpressionsInHashCondition;
 import org.apache.doris.nereids.rules.rewrite.logical.PushdownFilterThroughAggregation;
-import org.apache.doris.nereids.rules.rewrite.logical.PushdownFilterThroughCTEAnchor;
+import org.apache.doris.nereids.rules.rewrite.logical.PushdownFilterThroughCTE;
 import org.apache.doris.nereids.rules.rewrite.logical.PushdownFilterThroughJoin;
 import org.apache.doris.nereids.rules.rewrite.logical.PushdownFilterThroughProject;
 import org.apache.doris.nereids.rules.rewrite.logical.PushdownFilterThroughRepeat;
@@ -83,6 +83,7 @@ import org.apache.doris.nereids.rules.rewrite.logical.PushdownFilterThroughSetOp
 import org.apache.doris.nereids.rules.rewrite.logical.PushdownFilterThroughSort;
 import org.apache.doris.nereids.rules.rewrite.logical.PushdownFilterThroughWindow;
 import org.apache.doris.nereids.rules.rewrite.logical.PushdownJoinOtherCondition;
+import org.apache.doris.nereids.rules.rewrite.logical.PushdownProjectThroughCTE;
 import org.apache.doris.nereids.rules.rewrite.logical.PushdownProjectThroughLimit;
 
 import com.google.common.collect.ImmutableList;
@@ -129,7 +130,8 @@ public class RuleSet {
             new MergeFilters(),
             new MergeGenerates(),
             new MergeLimits(),
-            new PushdownFilterThroughCTEAnchor());
+            new PushdownFilterThroughCTE(),
+            new PushdownProjectThroughCTE());
 
     public static final List<Rule> IMPLEMENTATION_RULES = planRuleFactories()
             .add(new LogicalCTEProduceToPhysicalCTEProduce())
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index 6bce357c7a..fbe7deaae4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -141,7 +141,7 @@ public enum RuleType {
     PUSHDOWN_FILTER_THROUGH_SET_OPERATION(RuleTypeClass.REWRITE),
     PUSHDOWN_FILTER_THROUGH_SORT(RuleTypeClass.REWRITE),
 
-    PUSHDOWN_FILTER_THROUGH_CTE_ANCHOR(RuleTypeClass.REWRITE),
+    PUSHDOWN_FILTER_THROUGH_CTE(RuleTypeClass.REWRITE),
 
     COLUMN_PRUNING(RuleTypeClass.REWRITE),
 
@@ -230,7 +230,7 @@ public enum RuleType {
 
     COLLECT_PROJECT_ABOVE_FILTER_CONSUMER(RuleTypeClass.REWRITE),
     CTE_PRODUCER_REWRITE(RuleTypeClass.REWRITE),
-    PUSH_DOWN_PROJECT_THROUGH_CTE_ANCHOR(RuleTypeClass.REWRITE),
+    PUSH_DOWN_PROJECT_THROUGH_CTE(RuleTypeClass.REWRITE),
     INLINE_CTE(RuleTypeClass.REWRITE),
     REWRITE_SENTINEL(RuleTypeClass.REWRITE),
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownFilterThroughCTEAnchor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownFilterThroughCTE.java
similarity index 68%
rename from fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownFilterThroughCTEAnchor.java
rename to fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownFilterThroughCTE.java
index 25adf9a576..09f9c32db7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownFilterThroughCTEAnchor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownFilterThroughCTE.java
@@ -21,20 +21,20 @@ import org.apache.doris.nereids.rules.Rule;
 import org.apache.doris.nereids.rules.RuleType;
 import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory;
 import org.apache.doris.nereids.trees.plans.Plan;
-import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCTE;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
 
 /**
- * Push filter through CTEAnchor.
+ * Push filter through CTE.
  */
-public class PushdownFilterThroughCTEAnchor extends OneRewriteRuleFactory {
+public class PushdownFilterThroughCTE extends OneRewriteRuleFactory {
 
     @Override
     public Rule build() {
-        return logicalFilter(logicalCTEAnchor()).thenApply(ctx -> {
-            LogicalFilter<LogicalCTEAnchor<Plan, Plan>> filter = ctx.root;
-            LogicalCTEAnchor<Plan, Plan> anchor = filter.child();
-            return anchor.withChildren(anchor.left(), filter.withChildren((Plan) anchor.right()));
-        }).toRule(RuleType.PUSHDOWN_FILTER_THROUGH_CTE_ANCHOR);
+        return logicalFilter(logicalCTE()).thenApply(ctx -> {
+            LogicalFilter<LogicalCTE<Plan>> filter = ctx.root;
+            LogicalCTE<Plan> anchor = filter.child();
+            return anchor.withChildren(filter.withChildren(anchor.child()));
+        }).toRule(RuleType.PUSHDOWN_FILTER_THROUGH_CTE);
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownProjectThroughCTEAnchor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownProjectThroughCTE.java
similarity index 68%
rename from fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownProjectThroughCTEAnchor.java
rename to fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownProjectThroughCTE.java
index 1d6d64529a..8ff5df5c5a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownProjectThroughCTEAnchor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownProjectThroughCTE.java
@@ -21,20 +21,20 @@ import org.apache.doris.nereids.rules.Rule;
 import org.apache.doris.nereids.rules.RuleType;
 import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory;
 import org.apache.doris.nereids.trees.plans.Plan;
-import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCTE;
 import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
 
 /**
- * Push project through CTEAnchor.
+ * Push project through CTE.
  */
-public class PushdownProjectThroughCTEAnchor extends OneRewriteRuleFactory {
+public class PushdownProjectThroughCTE extends OneRewriteRuleFactory {
 
     @Override
     public Rule build() {
-        return logicalProject(logicalCTEAnchor()).thenApply(ctx -> {
-            LogicalProject<LogicalCTEAnchor<Plan, Plan>> project = ctx.root;
-            LogicalCTEAnchor<Plan, Plan> anchor = project.child();
-            return anchor.withChildren(anchor.child(0), project.withChildren(anchor.child(1)));
-        }).toRule(RuleType.PUSH_DOWN_PROJECT_THROUGH_CTE_ANCHOR);
+        return logicalProject(logicalCTE()).thenApply(ctx -> {
+            LogicalProject<LogicalCTE<Plan>> project = ctx.root;
+            LogicalCTE<Plan> anchor = project.child();
+            return anchor.withChildren(project.withChildren(anchor.child()));
+        }).toRule(RuleType.PUSH_DOWN_PROJECT_THROUGH_CTE);
     }
 }
diff --git a/regression-test/data/nereids_syntax_p0/cte.out b/regression-test/data/nereids_syntax_p0/cte.out
index 360e8fb37c..414dd7f9c1 100644
--- a/regression-test/data/nereids_syntax_p0/cte.out
+++ b/regression-test/data/nereids_syntax_p0/cte.out
@@ -72,3 +72,13 @@ ASIA	1
 29
 9
 
+-- !cte13 --
+9
+15
+29
+
+-- !cte14 --
+9
+15
+29
+
diff --git a/regression-test/suites/nereids_syntax_p0/cte.groovy b/regression-test/suites/nereids_syntax_p0/cte.groovy
index 9d847fffd0..d2b6eefa0d 100644
--- a/regression-test/suites/nereids_syntax_p0/cte.groovy
+++ b/regression-test/suites/nereids_syntax_p0/cte.groovy
@@ -237,6 +237,40 @@ suite("cte") {
      
     """
 
+    qt_cte13 """
+            SELECT abs(dd.s_suppkey)
+            FROM (
+            WITH part AS 
+                (SELECT s_suppkey
+                FROM supplier
+                WHERE s_suppkey < 30 )
+                    SELECT p1.s_suppkey
+                    FROM part p1
+                    JOIN part p2
+                        ON p1.s_suppkey = p2.s_suppkey
+                    WHERE p1.s_suppkey > 0 ) dd
+                WHERE dd.s_suppkey > 0
+                ORDER BY dd.s_suppkey;
+    """
+
+    sql "set experimental_enable_pipeline_engine=true"
+
+    qt_cte14 """
+            SELECT abs(dd.s_suppkey)
+            FROM (
+            WITH part AS 
+                (SELECT s_suppkey
+                FROM supplier
+                WHERE s_suppkey < 30 )
+                    SELECT p1.s_suppkey
+                    FROM part p1
+                    JOIN part p2
+                        ON p1.s_suppkey = p2.s_suppkey
+                    WHERE p1.s_suppkey > 0 ) dd
+                WHERE dd.s_suppkey > 0
+                ORDER BY dd.s_suppkey;
+    """
+
     test {
         sql = "WITH cte1 (a1, A1) AS (SELECT * FROM supplier) SELECT * FROM cte1"
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 03/13: [feature](backup-restore) Add local backup/restore not upload/download by broker (#20492)

Posted by kx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 053c8c303f7047e67677eae1c0cc138f54913fb9
Author: Jack Drogon <ja...@gmail.com>
AuthorDate: Wed Jun 7 21:35:15 2023 +0800

    [feature](backup-restore) Add local backup/restore not upload/download by broker (#20492)
---
 be/src/agent/task_worker_pool.cpp                  |  28 ++-
 be/src/runtime/snapshot_loader.cpp                 | 269 +++++++++++++++++++++
 be/src/runtime/snapshot_loader.h                   |   5 +
 be/src/service/backend_service.cpp                 |  14 +-
 .../org/apache/doris/analysis/RestoreStmt.java     |  32 ++-
 .../apache/doris/analysis/ShowSnapshotStmt.java    |  33 ++-
 .../org/apache/doris/backup/BackupHandler.java     | 132 +++++++---
 .../java/org/apache/doris/backup/BackupJob.java    |  25 +-
 .../org/apache/doris/backup/BackupJobInfo.java     |  93 ++++++-
 .../java/org/apache/doris/backup/BackupMeta.java   |  14 ++
 .../java/org/apache/doris/backup/Repository.java   |   2 +
 .../java/org/apache/doris/backup/RestoreJob.java   | 206 +++++++++++++++-
 .../java/org/apache/doris/backup/Snapshot.java     |  69 ++++++
 .../apache/doris/service/FrontendServiceImpl.java  | 194 ++++++++++++++-
 .../java/org/apache/doris/task/DownloadTask.java   |  36 ++-
 gensrc/thrift/AgentService.thrift                  |  11 +
 gensrc/thrift/FrontendService.thrift               |  48 ++++
 gensrc/thrift/Status.thrift                        |   3 +
 18 files changed, 1147 insertions(+), 67 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index f596603c35..d830d592e2 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -761,20 +761,28 @@ void TaskWorkerPool::_download_worker_thread_callback() {
             _tasks.pop_front();
         }
         LOG(INFO) << "get download task. signature=" << agent_task_req.signature
-                  << ", job_id=" << download_request.job_id;
+                  << ", job_id=" << download_request.job_id
+                  << "task detail: " << apache::thrift::ThriftDebugString(download_request);
 
         // TODO: download
         std::vector<int64_t> downloaded_tablet_ids;
 
-        std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>(
-                _env, download_request.job_id, agent_task_req.signature,
-                download_request.broker_addr, download_request.broker_prop);
-        Status status = loader->init(
-                download_request.__isset.storage_backend ? download_request.storage_backend
-                                                         : TStorageBackendType::type::BROKER,
-                download_request.__isset.location ? download_request.location : "");
-        if (status.ok()) {
-            status = loader->download(download_request.src_dest_map, &downloaded_tablet_ids);
+        auto status = Status::OK();
+        if (download_request.__isset.remote_tablet_snapshots) {
+            SnapshotLoader loader(_env, download_request.job_id, agent_task_req.signature);
+            loader.remote_http_download(download_request.remote_tablet_snapshots,
+                                        &downloaded_tablet_ids);
+        } else {
+            std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>(
+                    _env, download_request.job_id, agent_task_req.signature,
+                    download_request.broker_addr, download_request.broker_prop);
+            status = loader->init(
+                    download_request.__isset.storage_backend ? download_request.storage_backend
+                                                             : TStorageBackendType::type::BROKER,
+                    download_request.__isset.location ? download_request.location : "");
+            if (status.ok()) {
+                status = loader->download(download_request.src_dest_map, &downloaded_tablet_ids);
+            }
         }
 
         if (!status.ok()) {
diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp
index 4db803b10e..f1b58fa454 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -34,9 +34,12 @@
 #include <cstring>
 #include <filesystem>
 #include <istream>
+#include <unordered_map>
 #include <utility>
 
 #include "common/logging.h"
+#include "gutil/strings/split.h"
+#include "http/http_client.h"
 #include "io/fs/broker_file_system.h"
 #include "io/fs/file_system.h"
 #include "io/fs/hdfs_file_system.h"
@@ -370,6 +373,272 @@ Status SnapshotLoader::download(const std::map<std::string, std::string>& src_to
     return status;
 }
 
+Status SnapshotLoader::remote_http_download(
+        const std::vector<TRemoteTabletSnapshot>& remote_tablet_snapshots,
+        std::vector<int64_t>* downloaded_tablet_ids) {
+    LOG(INFO) << fmt::format("begin to download snapshots via http. job: {}, task id: {}", _job_id,
+                             _task_id);
+    constexpr uint32_t kListRemoteFileTimeout = 15;
+    constexpr uint32_t kDownloadFileMaxRetry = 3;
+    constexpr uint32_t kGetLengthTimeout = 10;
+
+    // check if job has already been cancelled
+    int tmp_counter = 1;
+    RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::DOWNLOAD));
+    Status status = Status::OK();
+
+    // Step before, validate all remote
+
+    // Step 1: Validate local tablet snapshot paths
+    for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
+        auto& path = remote_tablet_snapshot.local_snapshot_path;
+        bool res = true;
+        RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(path, &res));
+        if (!res) {
+            std::stringstream ss;
+            auto err_msg =
+                    fmt::format("snapshot path is not directory or does not exist: {}", path);
+            LOG(WARNING) << err_msg;
+            return Status::RuntimeError(err_msg);
+        }
+    }
+
+    // Step 2: get all local files
+    struct LocalFileStat {
+        uint64_t size;
+        // TODO(Drogon): add md5sum
+    };
+    std::unordered_map<std::string, std::unordered_map<std::string, LocalFileStat>> local_files_map;
+    for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
+        const auto& local_path = remote_tablet_snapshot.local_snapshot_path;
+        std::vector<std::string> local_files;
+        RETURN_IF_ERROR(_get_existing_files_from_local(local_path, &local_files));
+
+        auto& local_filestat = local_files_map[local_path];
+        for (auto& local_file : local_files) {
+            // add file size
+            std::string local_file_path = local_path + "/" + local_file;
+            std::error_code ec;
+            uint64_t local_file_size = std::filesystem::file_size(local_file_path, ec);
+            if (ec) {
+                LOG(WARNING) << "download file error" << ec.message();
+                return Status::IOError("can't retrive file_size of {}, due to {}", local_file_path,
+                                       ec.message());
+            }
+            local_filestat[local_file] = {local_file_size};
+        }
+    }
+
+    // Step 3: Validate remote tablet snapshot paths && remote files map
+    // TODO(Drogon): Add md5sum check
+    // key is remote snapshot paths, value is filelist
+    // get all these use http download action
+    // http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr
+    int report_counter = 0;
+    int total_num = remote_tablet_snapshots.size();
+    int finished_num = 0;
+    struct RemoteFileStat {
+        // TODO(Drogon): Add md5sum
+        std::string url;
+        uint64_t size;
+    };
+    std::unordered_map<std::string, std::unordered_map<std::string, RemoteFileStat>>
+            remote_files_map;
+    for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
+        const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path;
+        auto& remote_files = remote_files_map[remote_path];
+        const auto& token = remote_tablet_snapshot.remote_token;
+        const auto& remote_be_addr = remote_tablet_snapshot.remote_be_addr;
+
+        // HEAD http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180/
+        std::string remote_url_prefix =
+                fmt::format("http://{}:{}/api/_tablet/_download?token={}&file={}",
+                            remote_be_addr.hostname, remote_be_addr.port, token, remote_path);
+
+        string file_list_str;
+        auto list_files_cb = [&remote_url_prefix, &file_list_str](HttpClient* client) {
+            RETURN_IF_ERROR(client->init(remote_url_prefix));
+            client->set_timeout_ms(kListRemoteFileTimeout * 1000);
+            return client->execute(&file_list_str);
+        };
+        RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, list_files_cb));
+        std::vector<string> filename_list =
+                strings::Split(file_list_str, "\n", strings::SkipWhitespace());
+
+        for (const auto& filename : filename_list) {
+            std::string remote_file_url = fmt::format(
+                    "http://{}:{}/api/_tablet/_download?token={}&file={}/{}",
+                    remote_tablet_snapshot.remote_be_addr.hostname,
+                    remote_tablet_snapshot.remote_be_addr.port, remote_tablet_snapshot.remote_token,
+                    remote_tablet_snapshot.remote_snapshot_path, filename);
+
+            // get file length
+            uint64_t file_size = 0;
+            auto get_file_size_cb = [&remote_file_url, &file_size](HttpClient* client) {
+                RETURN_IF_ERROR(client->init(remote_file_url));
+                client->set_timeout_ms(kGetLengthTimeout * 1000);
+                RETURN_IF_ERROR(client->head());
+                RETURN_IF_ERROR(client->get_content_length(&file_size));
+                return Status::OK();
+            };
+            RETURN_IF_ERROR(
+                    HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, get_file_size_cb));
+
+            remote_files[filename] = RemoteFileStat {remote_file_url, file_size};
+        }
+    }
+
+    // Step 4: Compare local and remote files && get all need download files
+    for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
+        RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num,
+                                      TTaskType::type::DOWNLOAD));
+
+        const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path;
+        const auto& local_path = remote_tablet_snapshot.local_snapshot_path;
+        auto& remote_files = remote_files_map[remote_path];
+        auto& local_files = local_files_map[local_path];
+        auto remote_tablet_id = remote_tablet_snapshot.remote_tablet_id;
+
+        // get all need download files
+        std::vector<std::string> need_download_files;
+        for (const auto& [remote_file, remote_filestat] : remote_files) {
+            LOG(INFO) << fmt::format("remote file: {}, size: {}", remote_file,
+                                     remote_filestat.size);
+            auto it = local_files.find(remote_file);
+            if (it == local_files.end()) {
+                need_download_files.emplace_back(remote_file);
+                continue;
+            }
+            if (_end_with(remote_file, ".hdr")) {
+                need_download_files.emplace_back(remote_file);
+                continue;
+            }
+
+            if (auto& local_filestat = it->second; local_filestat.size != remote_filestat.size) {
+                need_download_files.emplace_back(remote_file);
+                continue;
+            }
+            // TODO(Drogon): check by md5sum, if not match then download
+
+            LOG(INFO) << fmt::format("file {} already exists, skip download", remote_file);
+        }
+
+        auto local_tablet_id = remote_tablet_snapshot.local_tablet_id;
+        TabletSharedPtr tablet =
+                _env->storage_engine()->tablet_manager()->get_tablet(local_tablet_id);
+        if (tablet == nullptr) {
+            std::stringstream ss;
+            ss << "failed to get local tablet: " << local_tablet_id;
+            LOG(WARNING) << ss.str();
+            return Status::InternalError(ss.str());
+        }
+        DataDir* data_dir = tablet->data_dir();
+
+        // download all need download files
+        uint64_t total_file_size = 0;
+        MonotonicStopWatch watch;
+        watch.start();
+        for (auto& filename : need_download_files) {
+            auto& remote_filestat = remote_files[filename];
+            auto file_size = remote_filestat.size;
+            auto& remote_file_url = remote_filestat.url;
+
+            // check disk capacity
+            if (data_dir->reach_capacity_limit(file_size)) {
+                return Status::InternalError("Disk reach capacity limit");
+            }
+
+            total_file_size += file_size;
+            uint64_t estimate_timeout = file_size / config::download_low_speed_limit_kbps / 1024;
+            if (estimate_timeout < config::download_low_speed_time) {
+                estimate_timeout = config::download_low_speed_time;
+            }
+
+            std::string local_filename;
+            RETURN_IF_ERROR(_replace_tablet_id(filename, local_tablet_id, &local_filename));
+            std::string local_file_path = local_path + "/" + local_filename;
+
+            LOG(INFO) << "clone begin to download file from: " << remote_file_url
+                      << " to: " << local_file_path << ". size(B): " << file_size
+                      << ", timeout(s): " << estimate_timeout;
+
+            auto download_cb = [&remote_file_url, estimate_timeout, &local_file_path,
+                                file_size](HttpClient* client) {
+                RETURN_IF_ERROR(client->init(remote_file_url));
+                client->set_timeout_ms(estimate_timeout * 1000);
+                RETURN_IF_ERROR(client->download(local_file_path));
+
+                std::error_code ec;
+                // Check file length
+                uint64_t local_file_size = std::filesystem::file_size(local_file_path, ec);
+                if (ec) {
+                    LOG(WARNING) << "download file error" << ec.message();
+                    return Status::IOError("can't retrive file_size of {}, due to {}",
+                                           local_file_path, ec.message());
+                }
+                if (local_file_size != file_size) {
+                    LOG(WARNING) << "download file length error"
+                                 << ", remote_path=" << remote_file_url
+                                 << ", file_size=" << file_size
+                                 << ", local_file_size=" << local_file_size;
+                    return Status::InternalError("downloaded file size is not equal");
+                }
+                chmod(local_file_path.c_str(), S_IRUSR | S_IWUSR);
+                return Status::OK();
+            };
+            RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, download_cb));
+
+            // local_files always keep the updated local files
+            local_files[filename] = LocalFileStat {file_size};
+        }
+
+        uint64_t total_time_ms = watch.elapsed_time() / 1000 / 1000;
+        total_time_ms = total_time_ms > 0 ? total_time_ms : 0;
+        double copy_rate = 0.0;
+        if (total_time_ms > 0) {
+            copy_rate = total_file_size / ((double)total_time_ms) / 1000;
+        }
+        LOG(INFO) << fmt::format(
+                "succeed to copy remote tablet {} to local tablet {}, total file size: {} B, cost: "
+                "{} ms, rate: {} MB/s",
+                remote_tablet_id, local_tablet_id, total_file_size, total_time_ms, copy_rate);
+
+        // local_files: contain all remote files and local files
+        // finally, delete local files which are not in remote
+        for (const auto& [local_file, local_filestat] : local_files) {
+            // replace the tablet id in local file name with the remote tablet id,
+            // in order to compare the file name.
+            std::string new_name;
+            Status st = _replace_tablet_id(local_file, remote_tablet_id, &new_name);
+            if (!st.ok()) {
+                LOG(WARNING) << "failed to replace tablet id. unknown local file: " << st
+                             << ". ignore it";
+                continue;
+            }
+            VLOG_CRITICAL << "new file name after replace tablet id: " << new_name;
+            const auto& find = remote_files.find(new_name);
+            if (find != remote_files.end()) {
+                continue;
+            }
+
+            // delete
+            std::string full_local_file = local_path + "/" + local_file;
+            LOG(INFO) << "begin to delete local snapshot file: " << full_local_file
+                      << ", it does not exist in remote";
+            if (remove(full_local_file.c_str()) != 0) {
+                LOG(WARNING) << "failed to delete unknown local file: " << full_local_file
+                             << ", error: " << strerror(errno)
+                             << ", file size: " << local_filestat.size << ", ignore it";
+            }
+        }
+
+        ++finished_num;
+    }
+
+    LOG(INFO) << "finished to download snapshots. job: " << _job_id << ", task id: " << _task_id;
+    return status;
+}
+
 // move the snapshot files in snapshot_path
 // to tablet_path
 // If overwrite, just replace the tablet_path with snapshot_path,
diff --git a/be/src/runtime/snapshot_loader.h b/be/src/runtime/snapshot_loader.h
index 9e7a22d7a3..c0d1f0f708 100644
--- a/be/src/runtime/snapshot_loader.h
+++ b/be/src/runtime/snapshot_loader.h
@@ -33,6 +33,8 @@ namespace io {
 class RemoteFileSystem;
 } // namespace io
 
+class TRemoteTabletSnapshot;
+
 struct FileStat {
     std::string name;
     std::string md5;
@@ -77,6 +79,9 @@ public:
     Status download(const std::map<std::string, std::string>& src_to_dest_path,
                     std::vector<int64_t>* downloaded_tablet_ids);
 
+    Status remote_http_download(const std::vector<TRemoteTabletSnapshot>& remote_tablets,
+                                std::vector<int64_t>* downloaded_tablet_ids);
+
     Status move(const std::string& snapshot_path, TabletSharedPtr tablet, bool overwrite);
 
 private:
diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp
index a5e528b45f..1b4a1ec944 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -396,49 +396,49 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result,
     }
 
     /// Check args: txn_id, remote_tablet_id, binlog_version, remote_host, remote_port, partition_id, load_id
-    if (request.__isset.txn_id) {
+    if (!request.__isset.txn_id) {
         LOG(WARNING) << "txn_id is empty";
         tstatus.__set_status_code(TStatusCode::ANALYSIS_ERROR);
         tstatus.__isset.error_msgs = true;
         tstatus.error_msgs.emplace_back("txn_id is empty");
         return;
     }
-    if (request.__isset.remote_tablet_id) {
+    if (!request.__isset.remote_tablet_id) {
         LOG(WARNING) << "remote_tablet_id is empty";
         tstatus.__set_status_code(TStatusCode::ANALYSIS_ERROR);
         tstatus.__isset.error_msgs = true;
         tstatus.error_msgs.emplace_back("remote_tablet_id is empty");
         return;
     }
-    if (request.__isset.binlog_version) {
+    if (!request.__isset.binlog_version) {
         LOG(WARNING) << "binlog_version is empty";
         tstatus.__set_status_code(TStatusCode::ANALYSIS_ERROR);
         tstatus.__isset.error_msgs = true;
         tstatus.error_msgs.emplace_back("binlog_version is empty");
         return;
     }
-    if (request.__isset.remote_host) {
+    if (!request.__isset.remote_host) {
         LOG(WARNING) << "remote_host is empty";
         tstatus.__set_status_code(TStatusCode::ANALYSIS_ERROR);
         tstatus.__isset.error_msgs = true;
         tstatus.error_msgs.emplace_back("remote_host is empty");
         return;
     }
-    if (request.__isset.remote_port) {
+    if (!request.__isset.remote_port) {
         LOG(WARNING) << "remote_port is empty";
         tstatus.__set_status_code(TStatusCode::ANALYSIS_ERROR);
         tstatus.__isset.error_msgs = true;
         tstatus.error_msgs.emplace_back("remote_port is empty");
         return;
     }
-    if (request.__isset.partition_id) {
+    if (!request.__isset.partition_id) {
         LOG(WARNING) << "partition_id is empty";
         tstatus.__set_status_code(TStatusCode::ANALYSIS_ERROR);
         tstatus.__isset.error_msgs = true;
         tstatus.error_msgs.emplace_back("partition_id is empty");
         return;
     }
-    if (request.__isset.load_id) {
+    if (!request.__isset.load_id) {
         LOG(WARNING) << "load_id is empty";
         tstatus.__set_status_code(TStatusCode::ANALYSIS_ERROR);
         tstatus.__isset.error_msgs = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java
index 679ebd8cb8..2382093d6f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.analysis;
 
+import org.apache.doris.backup.Repository;
 import org.apache.doris.catalog.ReplicaAllocation;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ErrorCode;
@@ -45,12 +46,22 @@ public class RestoreStmt extends AbstractBackupStmt {
     private int metaVersion = -1;
     private boolean reserveReplica = false;
     private boolean reserveDynamicPartitionEnable = false;
+    private boolean isLocal = false;
+    private byte[] meta = null;
+    private byte[] jobInfo = null;
 
     public RestoreStmt(LabelName labelName, String repoName, AbstractBackupTableRefClause restoreTableRefClause,
                        Map<String, String> properties) {
         super(labelName, repoName, restoreTableRefClause, properties);
     }
 
+    public RestoreStmt(LabelName labelName, String repoName, AbstractBackupTableRefClause restoreTableRefClause,
+                       Map<String, String> properties, byte[] meta, byte[] jobInfo) {
+        super(labelName, repoName, restoreTableRefClause, properties);
+        this.meta = meta;
+        this.jobInfo = jobInfo;
+    }
+
     public boolean allowLoad() {
         return allowLoad;
     }
@@ -75,8 +86,23 @@ public class RestoreStmt extends AbstractBackupStmt {
         return reserveDynamicPartitionEnable;
     }
 
+    public boolean isLocal() {
+        return isLocal;
+    }
+
+    public byte[] getMeta() {
+        return meta;
+    }
+
+    public byte[] getJobInfo() {
+        return jobInfo;
+    }
+
     @Override
     public void analyze(Analyzer analyzer) throws UserException {
+        if (repoName.equals(Repository.KEEP_ON_LOCAL_REPO_NAME)) {
+            isLocal = true;
+        }
         super.analyze(analyzer);
     }
 
@@ -148,8 +174,10 @@ public class RestoreStmt extends AbstractBackupStmt {
             backupTimestamp = copiedProperties.get(PROP_BACKUP_TIMESTAMP);
             copiedProperties.remove(PROP_BACKUP_TIMESTAMP);
         } else {
-            ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
-                    "Missing " + PROP_BACKUP_TIMESTAMP + " property");
+            if (!isLocal) {
+                ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
+                        "Missing " + PROP_BACKUP_TIMESTAMP + " property");
+            }
         }
 
         // meta version
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowSnapshotStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowSnapshotStmt.java
index bdf33ddfec..d10d216b12 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowSnapshotStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowSnapshotStmt.java
@@ -28,6 +28,11 @@ import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 
 public class ShowSnapshotStmt extends ShowStmt {
+    public enum SnapshotType {
+        REMOTE,
+        LOCAL
+    }
+
     public static final ImmutableList<String> SNAPSHOT_ALL = new ImmutableList.Builder<String>()
             .add("Snapshot").add("Timestamp").add("Status")
             .build();
@@ -39,6 +44,7 @@ public class ShowSnapshotStmt extends ShowStmt {
     private Expr where;
     private String snapshotName;
     private String timestamp;
+    private SnapshotType snapshotType = SnapshotType.REMOTE;
 
     public ShowSnapshotStmt(String repoName, Expr where) {
         this.repoName = repoName;
@@ -87,7 +93,7 @@ public class ShowSnapshotStmt extends ShowStmt {
 
             if (!ok) {
                 throw new AnalysisException("Where clause should looks like: SNAPSHOT = 'your_snapshot_name'"
-                        + " [AND TIMESTAMP = '2018-04-18-19-19-10']");
+                        + " [AND TIMESTAMP = '2018-04-18-19-19-10'] [AND SNAPSHOTTYPE = 'remote' | 'local']");
             }
         }
     }
@@ -116,10 +122,25 @@ public class ShowSnapshotStmt extends ShowStmt {
                 return false;
             }
             return true;
+        } else if (name.equalsIgnoreCase("snapshotType")) {
+            String snapshotTypeVal = ((StringLiteral) val).getStringValue();
+            if (Strings.isNullOrEmpty(snapshotTypeVal)) {
+                return false;
+            }
+            // snapshotType now only support "remote" and "local"
+            switch (snapshotTypeVal.toLowerCase()) {
+                case "remote":
+                    snapshotType = SnapshotType.REMOTE;
+                    return true;
+                case "local":
+                    snapshotType = SnapshotType.LOCAL;
+                    return true;
+                default:
+                    return false;
+            }
+        } else {
+            return false;
         }
-
-        return false;
-
     }
 
     public String getRepoName() {
@@ -134,6 +155,10 @@ public class ShowSnapshotStmt extends ShowStmt {
         return timestamp;
     }
 
+    public String getSnapshotType() {
+        return snapshotType.name();
+    }
+
     @Override
     public ShowResultSetMetaData getMetaData() {
         ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
index d149ad574d..6619c457b9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
@@ -46,6 +46,7 @@ import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.fs.FileSystemFactory;
 import org.apache.doris.fs.remote.RemoteFileSystem;
 import org.apache.doris.task.DirMoveTask;
@@ -56,13 +57,16 @@ import org.apache.doris.thrift.TFinishTaskRequest;
 import org.apache.doris.thrift.TTaskType;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.io.ByteArrayInputStream;
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.File;
 import java.io.IOException;
@@ -75,7 +79,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -104,6 +110,12 @@ public class BackupHandler extends MasterDaemon implements Writable {
 
     private Env env;
 
+    // map to store backup info, key is label name, value is Pair<meta, info>, meta && info is bytes
+    // this map not present in persist && only in fe master memory
+    // one table only keep one snapshot info, only keep last
+    private final Map<String, Snapshot> localSnapshots = new HashMap<>();
+    private ReadWriteLock localSnapshotsLock = new ReentrantReadWriteLock();
+
     public BackupHandler() {
         // for persist
     }
@@ -241,9 +253,13 @@ public class BackupHandler extends MasterDaemon implements Writable {
     public void process(AbstractBackupStmt stmt) throws DdlException {
         // check if repo exist
         String repoName = stmt.getRepoName();
-        Repository repository = repoMgr.getRepo(repoName);
-        if (repository == null) {
-            ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Repository " + repoName + " does not exist");
+        Repository repository = null;
+        if (!repoName.equals(Repository.KEEP_ON_LOCAL_REPO_NAME)) {
+            repository = repoMgr.getRepo(repoName);
+            if (repository == null) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
+                        "Repository " + repoName + " does not exist");
+            }
         }
 
         // check if db exist
@@ -286,7 +302,7 @@ public class BackupHandler extends MasterDaemon implements Writable {
     }
 
     private void backup(Repository repository, Database db, BackupStmt stmt) throws DdlException {
-        if (repository.isReadOnly()) {
+        if (repository != null && repository.isReadOnly()) {
             ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Repository " + repository.getName()
                     + " is read only");
         }
@@ -357,25 +373,29 @@ public class BackupHandler extends MasterDaemon implements Writable {
         }
 
         // Check if label already be used
-        List<String> existSnapshotNames = Lists.newArrayList();
-        Status st = repository.listSnapshots(existSnapshotNames);
-        if (!st.ok()) {
-            ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, st.getErrMsg());
-        }
-        if (existSnapshotNames.contains(stmt.getLabel())) {
-            if (stmt.getType() == BackupType.FULL) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Snapshot with name '"
-                        + stmt.getLabel() + "' already exist in repository");
-            } else {
-                ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Currently does not support "
-                        + "incremental backup");
+        long repoId = -1;
+        if (repository != null) {
+            List<String> existSnapshotNames = Lists.newArrayList();
+            Status st = repository.listSnapshots(existSnapshotNames);
+            if (!st.ok()) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, st.getErrMsg());
+            }
+            if (existSnapshotNames.contains(stmt.getLabel())) {
+                if (stmt.getType() == BackupType.FULL) {
+                    ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Snapshot with name '"
+                            + stmt.getLabel() + "' already exist in repository");
+                } else {
+                    ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Currently does not support "
+                            + "incremental backup");
+                }
             }
+            repoId = repository.getId();
         }
 
         // Create a backup job
         BackupJob backupJob = new BackupJob(stmt.getLabel(), db.getId(),
                 ClusterNamespace.getNameFromFullName(db.getFullName()),
-                tblRefs, stmt.getTimeoutMs(), stmt.getContent(), env, repository.getId());
+                tblRefs, stmt.getTimeoutMs(), stmt.getContent(), env, repoId);
         // write log
         env.getEditLog().logBackupJob(backupJob);
 
@@ -386,26 +406,62 @@ public class BackupHandler extends MasterDaemon implements Writable {
     }
 
     private void restore(Repository repository, Database db, RestoreStmt stmt) throws DdlException {
-        // Check if snapshot exist in repository
-        List<BackupJobInfo> infos = Lists.newArrayList();
-        Status status = repository.getSnapshotInfoFile(stmt.getLabel(), stmt.getBackupTimestamp(), infos);
-        if (!status.ok()) {
-            ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
-                                           "Failed to get info of snapshot '" + stmt.getLabel() + "' because: "
-                                                   + status.getErrMsg() + ". Maybe specified wrong backup timestamp");
+        BackupJobInfo jobInfo;
+        if (stmt.isLocal()) {
+            String jobInfoString = new String(stmt.getJobInfo());
+            jobInfo = BackupJobInfo.genFromJson(jobInfoString);
+
+            if (jobInfo.extraInfo == null) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Invalid job extra info empty");
+            }
+            if (jobInfo.extraInfo.beNetworkMap == null) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Invalid job extra info be network map");
+            }
+            if (Strings.isNullOrEmpty(jobInfo.extraInfo.token)) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Invalid job extra info token");
+            }
+        } else {
+            // Check if snapshot exist in repository
+            List<BackupJobInfo> infos = Lists.newArrayList();
+            Status status = repository.getSnapshotInfoFile(stmt.getLabel(), stmt.getBackupTimestamp(), infos);
+            if (!status.ok()) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
+                        "Failed to get info of snapshot '" + stmt.getLabel() + "' because: "
+                                + status.getErrMsg() + ". Maybe specified wrong backup timestamp");
+            }
+
+            // Check if all restore objects are exist in this snapshot.
+            // Also remove all unrelated objs
+            Preconditions.checkState(infos.size() == 1);
+            jobInfo = infos.get(0);
         }
 
-        // Check if all restore objects are exist in this snapshot.
-        // Also remove all unrelated objs
-        Preconditions.checkState(infos.size() == 1);
-        BackupJobInfo jobInfo = infos.get(0);
         checkAndFilterRestoreObjsExistInSnapshot(jobInfo, stmt.getAbstractBackupTableRefClause());
 
         // Create a restore job
-        RestoreJob restoreJob = new RestoreJob(stmt.getLabel(), stmt.getBackupTimestamp(),
+        RestoreJob restoreJob;
+        if (stmt.isLocal()) {
+            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(stmt.getMeta());
+            DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream);
+            try {
+                BackupMeta backupMeta = BackupMeta.read(dataInputStream);
+                String backupTimestamp =
+                        TimeUtils.longToTimeString(jobInfo.getBackupTime(), TimeUtils.DATETIME_FORMAT_WITH_HYPHEN);
+                restoreJob = new RestoreJob(stmt.getLabel(), backupTimestamp,
+                        db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(),
+                        stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(),
+                        stmt.reserveDynamicPartitionEnable(),
+                        env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta);
+            } catch (IOException e) {
+                throw new DdlException(e.getMessage());
+            }
+        } else {
+            restoreJob = new RestoreJob(stmt.getLabel(), stmt.getBackupTimestamp(),
                 db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(),
                 stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(), stmt.reserveDynamicPartitionEnable(),
                 env, repository.getId());
+        }
+
         env.getEditLog().logRestoreJob(restoreJob);
 
         // must put to dbIdToBackupOrRestoreJob after edit log, otherwise the state of job may be changed.
@@ -667,6 +723,24 @@ public class BackupHandler extends MasterDaemon implements Writable {
         return false;
     }
 
+    public void addSnapshot(String labelName, Snapshot snapshot) {
+        localSnapshotsLock.writeLock().lock();
+        try {
+            localSnapshots.put(labelName, snapshot);
+        } finally {
+            localSnapshotsLock.writeLock().unlock();
+        }
+    }
+
+    public Snapshot getSnapshot(String labelName) {
+        localSnapshotsLock.readLock().lock();
+        try {
+            return localSnapshots.get(labelName);
+        } finally {
+            localSnapshotsLock.readLock().unlock();
+        }
+    }
+
     public static BackupHandler read(DataInput in) throws IOException {
         BackupHandler backupHandler = new BackupHandler();
         backupHandler.readFields(in);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index 2300cd1e01..60d486c310 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -114,6 +114,9 @@ public class BackupJob extends AbstractJob {
     // backup properties
     private Map<String, String> properties = Maps.newHashMap();
 
+    private byte[] metaInfoBytes = null;
+    private byte[] jobInfoBytes = null;
+
     public BackupJob() {
         super(JobType.BACKUP);
     }
@@ -282,7 +285,7 @@ public class BackupJob extends AbstractJob {
         }
 
         // get repo if not set
-        if (repo == null) {
+        if (repo == null && repoId != Repository.KEEP_ON_LOCAL_REPO_ID) {
             repo = env.getBackupHandler().getRepoMgr().getRepo(repoId);
             if (repo == null) {
                 status = new Status(ErrCode.COMMON_ERROR, "failed to get repository: " + repoId);
@@ -565,6 +568,11 @@ public class BackupJob extends AbstractJob {
     }
 
     private void uploadSnapshot() {
+        if (repoId == Repository.KEEP_ON_LOCAL_REPO_ID) {
+            state = BackupJobState.UPLOADING;
+            return;
+        }
+
         // reuse this set to save all unfinished tablets
         unfinishedTaskIds.clear();
         taskProgress.clear();
@@ -673,6 +681,8 @@ public class BackupJob extends AbstractJob {
             }
             backupMeta.writeToFile(metaInfoFile);
             localMetaInfoFilePath = metaInfoFile.getAbsolutePath();
+            // read meta info to metaInfoBytes
+            metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());
 
             // 3. save job info file
             jobInfo = BackupJobInfo.fromCatalog(createTime, label, dbName, dbId,
@@ -685,6 +695,8 @@ public class BackupJob extends AbstractJob {
             }
             jobInfo.writeToFile(jobInfoFile);
             localJobInfoFilePath = jobInfoFile.getAbsolutePath();
+            // read job info to jobInfoBytes
+            jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath());
         } catch (Exception e) {
             status = new Status(ErrCode.COMMON_ERROR, "failed to save meta info and job info file: " + e.getMessage());
             return;
@@ -697,7 +709,9 @@ public class BackupJob extends AbstractJob {
         jobInfo = null;
 
         // release all snapshots before clearing the snapshotInfos.
-        releaseSnapshots();
+        if (repoId != Repository.KEEP_ON_LOCAL_REPO_ID) {
+            releaseSnapshots();
+        }
 
         snapshotInfos.clear();
 
@@ -724,6 +738,13 @@ public class BackupJob extends AbstractJob {
     }
 
     private void uploadMetaAndJobInfoFile() {
+        if (repoId == Repository.KEEP_ON_LOCAL_REPO_ID) {
+            state = BackupJobState.FINISHED;
+            Snapshot snapshot = new Snapshot(label, metaInfoBytes, jobInfoBytes);
+            env.getBackupHandler().addSnapshot(label, snapshot);
+            return;
+        }
+
         String remoteMetaInfoFile = repo.assembleMetaInfoFilePath(label);
         if (!uploadFile(localMetaInfoFilePath, remoteMetaInfoFile)) {
             return;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
index ec622dbdca..4457740440 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
@@ -36,6 +36,7 @@ import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.thrift.TNetworkAddress;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
@@ -92,10 +93,39 @@ public class BackupJobInfo implements Writable {
     @SerializedName("meta_version")
     public int metaVersion;
 
+    @SerializedName("tablet_be_map")
+    public Map<Long, Long> tabletBeMap = Maps.newHashMap();
+
+    @SerializedName("tablet_snapshot_path_map")
+    public Map<Long, String> tabletSnapshotPathMap = Maps.newHashMap();
+
+    public static class ExtraInfo {
+        public static class NetworkAddrss {
+            @SerializedName("ip")
+            public String ip;
+            @SerializedName("port")
+            public int port;
+        }
+
+        @SerializedName("be_network_map")
+        public Map<Long, NetworkAddrss> beNetworkMap = Maps.newHashMap();
+
+        @SerializedName("token")
+        public String token;
+    }
+
+    @SerializedName("extra_info")
+    public ExtraInfo extraInfo;
+
+
     // This map is used to save the table alias mapping info when processing a restore job.
     // origin -> alias
     public Map<String, String> tblAlias = Maps.newHashMap();
 
+    public long getBackupTime() {
+        return backupTime;
+    }
+
     public void initBackupJobInfoAfterDeserialize() {
         // transform success
         if (successJson.equals("succeed")) {
@@ -487,6 +517,62 @@ public class BackupJobInfo implements Writable {
         return Joiner.on("/").join(pathSeg);
     }
 
+    // struct TRemoteTabletSnapshot {
+    //     1: optional i64 local_tablet_id
+    //     2: optional string local_snapshot_path
+    //     3: optional i64 remote_tablet_id
+    //     4: optional i64 remote_be_id
+    //     5: optional Types.TSchemaHash schema_hash
+    //     6: optional Types.TNetworkAddress remote_be_addr
+    //     7: optional string remote_snapshot_path
+    //     8: optional string token
+    // }
+
+    public String getTabletSnapshotPath(Long tabletId) {
+        return tabletSnapshotPathMap.get(tabletId);
+    }
+
+    public Long getBeId(Long tabletId) {
+        return tabletBeMap.get(tabletId);
+    }
+
+    public String getToken() {
+        return extraInfo.token;
+    }
+
+    public TNetworkAddress getBeAddr(Long beId) {
+        ExtraInfo.NetworkAddrss addr = extraInfo.beNetworkMap.get(beId);
+        if (addr == null) {
+            return null;
+        }
+
+        return new TNetworkAddress(addr.ip, addr.port);
+    }
+
+    // TODO(Drogon): improve this find perfermance
+    public Long getSchemaHash(long tableId, long partitionId, long indexId) {
+        for (BackupOlapTableInfo backupOlapTableInfo : backupOlapTableObjects.values()) {
+            if (backupOlapTableInfo.id != tableId) {
+                continue;
+            }
+
+            for (BackupPartitionInfo backupPartitionInfo : backupOlapTableInfo.partitions.values()) {
+                if (backupPartitionInfo.id != partitionId) {
+                    continue;
+                }
+
+                for (BackupIndexInfo backupIndexInfo : backupPartitionInfo.indexes.values()) {
+                    if (backupIndexInfo.id != indexId) {
+                        continue;
+                    }
+
+                    return Long.valueOf(backupIndexInfo.schemaHash);
+                }
+            }
+        }
+        return null;
+    }
+
     public static BackupJobInfo fromCatalog(long backupTime, String label, String dbName, long dbId,
                                             BackupContent content, BackupMeta backupMeta,
                                             Map<Long, SnapshotInfo> snapshotInfos) {
@@ -526,8 +612,11 @@ public class BackupJobInfo implements Writable {
                             }
                         } else {
                             for (Tablet tablet : index.getTablets()) {
+                                SnapshotInfo snapshotInfo = snapshotInfos.get(tablet.getId());
                                 idxInfo.tablets.put(tablet.getId(),
-                                        Lists.newArrayList(snapshotInfos.get(tablet.getId()).getFiles()));
+                                        Lists.newArrayList(snapshotInfo.getFiles()));
+                                jobInfo.tabletBeMap.put(tablet.getId(), snapshotInfo.getBeId());
+                                jobInfo.tabletSnapshotPathMap.put(tablet.getId(), snapshotInfo.getPath());
                             }
                         }
                         idxInfo.tabletsOrder.addAll(index.getTabletIdsInOrder());
@@ -578,7 +667,7 @@ public class BackupJobInfo implements Writable {
         return genFromJson(json);
     }
 
-    private static BackupJobInfo genFromJson(String json) {
+    public static BackupJobInfo genFromJson(String json) {
         /* parse the json string:
          * {
          *   "backup_time": 1522231864000,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
index e059d55be6..e22bb7f33c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
@@ -21,8 +21,10 @@ import org.apache.doris.catalog.Resource;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.meta.MetaContext;
+import org.apache.doris.persist.gson.GsonUtils;
 
 import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
 
 import java.io.DataInput;
 import java.io.DataInputStream;
@@ -38,10 +40,13 @@ import java.util.Map;
 public class BackupMeta implements Writable {
 
     // tbl name -> tbl
+    @SerializedName(value = "tblNameMap")
     private Map<String, Table> tblNameMap = Maps.newHashMap();
     // tbl id -> tbl
+    @SerializedName(value = "tblIdMap")
     private Map<Long, Table> tblIdMap = Maps.newHashMap();
     // resource name -> resource
+    @SerializedName(value = "resourceNameMap")
     private Map<String, Resource> resourceNameMap = Maps.newHashMap();
 
     private BackupMeta() {
@@ -136,4 +141,13 @@ public class BackupMeta implements Writable {
             resourceNameMap.put(resource.getName(), resource);
         }
     }
+
+    public String toJson() {
+        return GsonUtils.GSON.toJson(this);
+    }
+
+    @Override
+    public String toString() {
+        return toJson();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
index 6d421332dc..ba95a77352 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
@@ -97,6 +97,8 @@ public class Repository implements Writable {
     public static final String FILE_REPO_INFO = "__repo_info";
     public static final String FILE_META_INFO = "__meta";
     public static final String DIR_SNAPSHOT_CONTENT = "__ss_content";
+    public static final String KEEP_ON_LOCAL_REPO_NAME = "__keep_on_local__";
+    public static final long KEEP_ON_LOCAL_REPO_ID = -1;
     private static final Logger LOG = LogManager.getLogger(Repository.class);
     private static final String PATH_DELIMITER = "/";
     private static final String CHECKSUM_SEPARATOR = ".";
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 60ff99c4b9..e9e2eee824 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -71,6 +71,8 @@ import org.apache.doris.task.DownloadTask;
 import org.apache.doris.task.ReleaseSnapshotTask;
 import org.apache.doris.task.SnapshotTask;
 import org.apache.doris.thrift.TFinishTaskRequest;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TRemoteTabletSnapshot;
 import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.thrift.TStorageType;
@@ -185,6 +187,18 @@ public class RestoreJob extends AbstractJob {
         properties.put(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE, String.valueOf(reserveDynamicPartitionEnable));
     }
 
+    public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad,
+            ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica,
+            boolean reserveDynamicPartitionEnable, Env env, long repoId, BackupMeta backupMeta) {
+        this(label, backupTs, dbId, dbName, jobInfo, allowLoad, replicaAlloc, timeoutMs, metaVersion, reserveReplica,
+                reserveDynamicPartitionEnable, env, repoId);
+        this.backupMeta = backupMeta;
+    }
+
+    public boolean isFromLocalSnapshot() {
+        return repoId == Repository.KEEP_ON_LOCAL_REPO_ID;
+    }
+
     public RestoreJobState getState() {
         return state;
     }
@@ -324,7 +338,7 @@ public class RestoreJob extends AbstractJob {
         }
 
         // get repo if not set
-        if (repo == null) {
+        if (repo == null && !isFromLocalSnapshot()) {
             repo = env.getBackupHandler().getRepoMgr().getRepo(repoId);
             if (repo == null) {
                 status = new Status(ErrCode.COMMON_ERROR, "failed to get repository: " + repoId);
@@ -1109,6 +1123,15 @@ public class RestoreJob extends AbstractJob {
     }
 
     private boolean downloadAndDeserializeMetaInfo() {
+        if (isFromLocalSnapshot()) {
+            if (backupMeta != null) {
+                return true;
+            }
+
+            status = new Status(ErrCode.COMMON_ERROR, "backupMeta is null");
+            return false;
+        }
+
         List<BackupMeta> backupMetas = Lists.newArrayList();
         Status st = repo.getSnapshotMetaFile(jobInfo.name, backupMetas,
                 this.metaVersion == -1 ? jobInfo.metaVersion : this.metaVersion);
@@ -1251,7 +1274,15 @@ public class RestoreJob extends AbstractJob {
     }
 
     private void downloadSnapshots() {
-        // Categorize snapshot infos by db id.
+        if (isFromLocalSnapshot()) {
+            downloadLocalSnapshots();
+        } else {
+            downloadRemoteSnapshots();
+        }
+    }
+
+    private void downloadRemoteSnapshots() {
+        // Categorize snapshot onfos by db id.
         ArrayListMultimap<Long, SnapshotInfo> dbToSnapshotInfos = ArrayListMultimap.create();
         for (SnapshotInfo info : snapshotInfos.values()) {
             dbToSnapshotInfos.put(info.getDbId(), info);
@@ -1289,7 +1320,8 @@ public class RestoreJob extends AbstractJob {
                     LOG.debug("backend {} has {} batch, total {} tasks, {}",
                               beId, batchNum, totalNum, this);
 
-                    List<FsBroker> brokerAddrs = Lists.newArrayList();
+                    List<FsBroker> brokerAddrs = null;
+                    brokerAddrs = Lists.newArrayList();
                     Status st = repo.getBrokerAddress(beId, env, brokerAddrs);
                     if (!st.ok()) {
                         status = st;
@@ -1401,6 +1433,174 @@ public class RestoreJob extends AbstractJob {
         LOG.info("finished to send download tasks to BE. num: {}. {}", batchTask.getTaskNum(), this);
     }
 
+    private void downloadLocalSnapshots() {
+        // Categorize snapshot infos by db id.
+        ArrayListMultimap<Long, SnapshotInfo> dbToSnapshotInfos = ArrayListMultimap.create();
+        for (SnapshotInfo info : snapshotInfos.values()) {
+            dbToSnapshotInfos.put(info.getDbId(), info);
+        }
+
+        // Send download tasks
+        unfinishedSignatureToId.clear();
+        taskProgress.clear();
+        taskErrMsg.clear();
+        AgentBatchTask batchTask = new AgentBatchTask();
+        for (long dbId : dbToSnapshotInfos.keySet()) {
+            List<SnapshotInfo> infos = dbToSnapshotInfos.get(dbId);
+
+            Database db = env.getInternalCatalog().getDbNullable(dbId);
+            if (db == null) {
+                status = new Status(ErrCode.NOT_FOUND, "db " + dbId + " does not exist");
+                return;
+            }
+
+            // We classify the snapshot info by backend
+            ArrayListMultimap<Long, SnapshotInfo> beToSnapshots = ArrayListMultimap.create();
+            for (SnapshotInfo info : infos) {
+                beToSnapshots.put(info.getBeId(), info);
+            }
+
+            db.readLock();
+            try {
+                for (Long beId : beToSnapshots.keySet()) {
+                    List<SnapshotInfo> beSnapshotInfos = beToSnapshots.get(beId);
+                    int totalNum = beSnapshotInfos.size();
+                    // each backend allot at most 3 tasks
+                    int batchNum = Math.min(totalNum, 3);
+                    // each task contains several upload sub tasks
+                    int taskNumPerBatch = Math.max(totalNum / batchNum, 1);
+
+                    // allot tasks
+                    int index = 0;
+                    for (int batch = 0; batch < batchNum; batch++) {
+                        List<TRemoteTabletSnapshot> remoteTabletSnapshots = Lists.newArrayList();
+                        int currentBatchTaskNum = (batch == batchNum - 1) ? totalNum - index : taskNumPerBatch;
+                        for (int j = 0; j < currentBatchTaskNum; j++) {
+                            TRemoteTabletSnapshot remoteTabletSnapshot = new TRemoteTabletSnapshot();
+
+                            SnapshotInfo info = beSnapshotInfos.get(index++);
+                            Table tbl = db.getTableNullable(info.getTblId());
+                            if (tbl == null) {
+                                status = new Status(ErrCode.NOT_FOUND, "restored table "
+                                        + info.getTabletId() + " does not exist");
+                                return;
+                            }
+                            OlapTable olapTbl = (OlapTable) tbl;
+                            olapTbl.readLock();
+                            try {
+                                Partition part = olapTbl.getPartition(info.getPartitionId());
+                                if (part == null) {
+                                    status = new Status(ErrCode.NOT_FOUND, "partition "
+                                            + info.getPartitionId() + " does not exist in restored table: "
+                                            + tbl.getName());
+                                    return;
+                                }
+
+                                MaterializedIndex idx = part.getIndex(info.getIndexId());
+                                if (idx == null) {
+                                    status = new Status(ErrCode.NOT_FOUND, "index " + info.getIndexId()
+                                            + " does not exist in partion " + part.getName()
+                                            + "of restored table " + tbl.getName());
+                                    return;
+                                }
+
+                                Tablet tablet  = idx.getTablet(info.getTabletId());
+                                if (tablet == null) {
+                                    status = new Status(ErrCode.NOT_FOUND,
+                                            "tablet " + info.getTabletId() + " does not exist in restored table "
+                                                    + tbl.getName());
+                                    return;
+                                }
+
+                                Replica replica = tablet.getReplicaByBackendId(info.getBeId());
+                                if (replica == null) {
+                                    status = new Status(ErrCode.NOT_FOUND,
+                                            "replica in be " + info.getBeId() + " of tablet "
+                                                    + tablet.getId() + " does not exist in restored table "
+                                                    + tbl.getName());
+                                    return;
+                                }
+
+                                IdChain catalogIds = new IdChain(tbl.getId(), part.getId(), idx.getId(),
+                                        info.getTabletId(), replica.getId());
+                                IdChain repoIds = fileMapping.get(catalogIds);
+                                if (repoIds == null) {
+                                    status = new Status(ErrCode.NOT_FOUND,
+                                            "failed to get id mapping of catalog ids: " + catalogIds.toString());
+                                    return;
+                                }
+
+                                SnapshotInfo snapshotInfo = snapshotInfos.get(info.getTabletId(), info.getBeId());
+                                Preconditions.checkNotNull(snapshotInfo, info.getTabletId() + "-" + info.getBeId());
+                                // download to previous exist snapshot dir
+                                String dest = snapshotInfo.getTabletPath();
+
+                                Long localTabletId = info.getTabletId();
+                                String localSnapshotPath = dest;
+                                Long remoteTabletId = repoIds.getTabletId();
+                                Long remoteBeId = jobInfo.getBeId(remoteTabletId);
+                                String remoteSnapshotPath = jobInfo.getTabletSnapshotPath(remoteTabletId);
+                                if (remoteSnapshotPath == null) {
+                                    status = new Status(ErrCode.NOT_FOUND,
+                                            "failed to get remote snapshot path of tablet: " + remoteTabletId);
+                                    return;
+                                }
+                                Long schemaHash = jobInfo.getSchemaHash(
+                                        repoIds.getTblId(), repoIds.getPartId(), repoIds.getIdxId());
+                                if (schemaHash == null) {
+                                    status = new Status(ErrCode.NOT_FOUND,
+                                            "failed to get schema hash of table: " + repoIds.getTblId()
+                                                    + ", partition: " + repoIds.getPartId()
+                                                    + ", index: " + repoIds.getIdxId());
+                                    return;
+                                }
+                                // remoteSnapshotPath = "${remoteSnapshotPath}/${remoteTabletId}/${schemaHash}"
+                                remoteSnapshotPath =
+                                        String.format("%s/%d/%d", remoteSnapshotPath, remoteTabletId, schemaHash);
+                                TNetworkAddress remoteBeAddr = jobInfo.getBeAddr(remoteBeId);
+                                if (remoteBeAddr == null) {
+                                    status = new Status(ErrCode.NOT_FOUND,
+                                            "failed to get remote be address of be: " + remoteBeId);
+                                    return;
+                                }
+                                String remoteToken = jobInfo.getToken();
+
+                                remoteTabletSnapshot.setLocalTabletId(localTabletId);
+                                remoteTabletSnapshot.setLocalSnapshotPath(localSnapshotPath);
+                                remoteTabletSnapshot.setRemoteTabletId(remoteTabletId);
+                                remoteTabletSnapshot.setRemoteBeId(remoteBeId);
+                                remoteTabletSnapshot.setRemoteBeAddr(remoteBeAddr);
+                                remoteTabletSnapshot.setRemoteSnapshotPath(remoteSnapshotPath);
+                                remoteTabletSnapshot.setRemoteToken(remoteToken);
+
+                                remoteTabletSnapshots.add(remoteTabletSnapshot);
+                            } finally {
+                                olapTbl.readUnlock();
+                            }
+                        }
+                        long signature = env.getNextId();
+                        DownloadTask task = new DownloadTask(null, beId, signature, jobId, dbId, remoteTabletSnapshots);
+                        batchTask.addTask(task);
+                        unfinishedSignatureToId.put(signature, beId);
+                    }
+                }
+            } finally {
+                db.readUnlock();
+            }
+        }
+
+        // send task
+        for (AgentTask task : batchTask.getAllTasks()) {
+            AgentTaskQueue.addTask(task);
+        }
+        AgentTaskExecutor.submit(batchTask);
+
+        state = RestoreJobState.DOWNLOADING;
+
+        // No edit log here
+        LOG.info("finished to send download tasks to BE. num: {}. {}", batchTask.getTaskNum(), this);
+    }
+
     private void waitingAllDownloadFinished() {
         if (unfinishedSignatureToId.isEmpty()) {
             downloadFinishedTime = System.currentTimeMillis();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
new file mode 100644
index 0000000000..b26cb2e1e7
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.backup;
+
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+
+public class Snapshot {
+    @SerializedName(value = "label")
+    private String label = null;
+
+    @SerializedName(value = "meta")
+    private byte[] meta = null;
+
+    @SerializedName(value = "jobInfo")
+    private byte[] jobInfo = null;
+
+    @SerializedName(value = "createTime")
+    private String createTime = null;
+
+    public Snapshot() {
+    }
+
+    public Snapshot(String label, byte[] meta, byte[] jobInfo) {
+        this.label = label;
+        this.meta = meta;
+        this.jobInfo = jobInfo;
+    }
+
+
+    public byte[] getMeta() {
+        return meta;
+    }
+
+    public byte[] getJobInfo() {
+        return jobInfo;
+    }
+
+    public String toJson() {
+        return GsonUtils.GSON.toJson(this);
+    }
+
+    @Override
+    public String toString() {
+        // return toJson();
+        return "Snapshot{"
+                + "label='" + label + '\''
+                + ", meta=" + meta
+                + ", jobInfo=" + jobInfo
+                + ", createTime='" + createTime + '\''
+                + '}';
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index b650100823..2dbb007498 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -19,11 +19,15 @@ package org.apache.doris.service;
 
 import org.apache.doris.alter.SchemaChangeHandler;
 import org.apache.doris.analysis.AddColumnsClause;
+import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.ColumnDef;
+import org.apache.doris.analysis.LabelName;
+import org.apache.doris.analysis.RestoreStmt;
 import org.apache.doris.analysis.SetType;
 import org.apache.doris.analysis.TableName;
 import org.apache.doris.analysis.TypeDef;
 import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.backup.Snapshot;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DatabaseIf;
@@ -63,6 +67,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.planner.StreamLoadPlanner;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.ConnectProcessor;
+import org.apache.doris.qe.DdlExecutor;
 import org.apache.doris.qe.QeProcessorImpl;
 import org.apache.doris.qe.QueryState;
 import org.apache.doris.qe.VariableMgr;
@@ -106,6 +111,8 @@ import org.apache.doris.thrift.TGetBinlogResult;
 import org.apache.doris.thrift.TGetDbsParams;
 import org.apache.doris.thrift.TGetDbsResult;
 import org.apache.doris.thrift.TGetQueryStatsRequest;
+import org.apache.doris.thrift.TGetSnapshotRequest;
+import org.apache.doris.thrift.TGetSnapshotResult;
 import org.apache.doris.thrift.TGetTablesParams;
 import org.apache.doris.thrift.TGetTablesResult;
 import org.apache.doris.thrift.TGetTabletReplicaInfosRequest;
@@ -137,11 +144,14 @@ import org.apache.doris.thrift.TReplicaInfo;
 import org.apache.doris.thrift.TReportExecStatusParams;
 import org.apache.doris.thrift.TReportExecStatusResult;
 import org.apache.doris.thrift.TReportRequest;
+import org.apache.doris.thrift.TRestoreSnapshotRequest;
+import org.apache.doris.thrift.TRestoreSnapshotResult;
 import org.apache.doris.thrift.TRollbackTxnRequest;
 import org.apache.doris.thrift.TRollbackTxnResult;
 import org.apache.doris.thrift.TShowVariableRequest;
 import org.apache.doris.thrift.TShowVariableResult;
 import org.apache.doris.thrift.TSnapshotLoaderReportRequest;
+import org.apache.doris.thrift.TSnapshotType;
 import org.apache.doris.thrift.TStatus;
 import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TStreamLoadPutRequest;
@@ -2124,15 +2134,15 @@ public class FrontendServiceImpl implements FrontendService.Iface {
             throw new UserException("prev_commit_seq is not set");
         }
 
+
+        // step 1: check auth
         String cluster = request.getCluster();
         if (Strings.isNullOrEmpty(cluster)) {
             cluster = SystemInfoService.DEFAULT_CLUSTER;
         }
-
-        // step 1: check auth
         if (Strings.isNullOrEmpty(request.getToken())) {
             checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), request.getTable(),
-                    request.getUserIp(), PrivPredicate.LOAD);
+                    request.getUserIp(), PrivPredicate.SELECT);
         }
 
         // step 3: check database
@@ -2181,4 +2191,182 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         }
         return result;
     }
+
+    // getSnapshot
+    public TGetSnapshotResult getSnapshot(TGetSnapshotRequest request) throws TException {
+        String clientAddr = getClientAddrAsString();
+        LOG.trace("receive get snapshot info request: {}", request);
+
+        TGetSnapshotResult result = new TGetSnapshotResult();
+        TStatus status = new TStatus(TStatusCode.OK);
+        result.setStatus(status);
+        try {
+            result = getSnapshotImpl(request, clientAddr);
+        } catch (UserException e) {
+            LOG.warn("failed to get snapshot info: {}", e.getMessage());
+            status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+            status.addToErrorMsgs(e.getMessage());
+        } catch (Throwable e) {
+            LOG.warn("catch unknown result.", e);
+            status.setStatusCode(TStatusCode.INTERNAL_ERROR);
+            status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
+            return result;
+        }
+
+        return result;
+    }
+
+    // getSnapshotImpl
+    private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String clientIp)
+            throws UserException {
+        // Step 1: Check all required arg: user, passwd, db, label_name, snapshot_name, snapshot_type
+        if (!request.isSetUser()) {
+            throw new UserException("user is not set");
+        }
+        if (!request.isSetPasswd()) {
+            throw new UserException("passwd is not set");
+        }
+        if (!request.isSetDb()) {
+            throw new UserException("db is not set");
+        }
+        if (!request.isSetLabelName()) {
+            throw new UserException("label_name is not set");
+        }
+        if (!request.isSetSnapshotName()) {
+            throw new UserException("snapshot_name is not set");
+        }
+        if (!request.isSetSnapshotType()) {
+            throw new UserException("snapshot_type is not set");
+        } else if (request.getSnapshotType() != TSnapshotType.LOCAL) {
+            throw new UserException("snapshot_type is not LOCAL");
+        }
+
+        // Step 2: check auth
+        String cluster = request.getCluster();
+        if (Strings.isNullOrEmpty(cluster)) {
+            cluster = SystemInfoService.DEFAULT_CLUSTER;
+        }
+
+        LOG.info("get snapshot info, user: {}, db: {}, label_name: {}, snapshot_name: {}, snapshot_type: {}",
+                request.getUser(), request.getDb(), request.getLabelName(), request.getSnapshotName(),
+                request.getSnapshotType());
+        if (Strings.isNullOrEmpty(request.getToken())) {
+            checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(),
+                    request.getTable(), clientIp, PrivPredicate.LOAD);
+        }
+
+        // Step 3: get snapshot
+        TGetSnapshotResult result = new TGetSnapshotResult();
+        result.setStatus(new TStatus(TStatusCode.OK));
+        Snapshot snapshot = Env.getCurrentEnv().getBackupHandler().getSnapshot(request.getLabelName());
+        if (snapshot == null) {
+            result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_NOT_EXIST);
+            result.getStatus().addToErrorMsgs("snapshot not exist");
+        } else {
+            result.setMeta(snapshot.getMeta());
+            result.setJobInfo(snapshot.getJobInfo());
+        }
+
+        return result;
+    }
+
+    // Restore Snapshot
+    public TRestoreSnapshotResult restoreSnapshot(TRestoreSnapshotRequest request) throws TException {
+        String clientAddr = getClientAddrAsString();
+        LOG.trace("receive restore snapshot info request: {}", request);
+
+        TRestoreSnapshotResult result = new TRestoreSnapshotResult();
+        TStatus status = new TStatus(TStatusCode.OK);
+        result.setStatus(status);
+        try {
+            result = restoreSnapshotImpl(request, clientAddr);
+        } catch (UserException e) {
+            LOG.warn("failed to get snapshot info: {}", e.getMessage());
+            status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+            status.addToErrorMsgs(e.getMessage());
+        } catch (Throwable e) {
+            LOG.warn("catch unknown result.", e);
+            status.setStatusCode(TStatusCode.INTERNAL_ERROR);
+            status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
+            return result;
+        }
+
+        return result;
+    }
+
+    // restoreSnapshotImpl
+    private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest request, String clientIp)
+            throws UserException {
+        // Step 1: Check all required arg: user, passwd, db, label_name, repo_name, meta, info
+        if (!request.isSetUser()) {
+            throw new UserException("user is not set");
+        }
+        if (!request.isSetPasswd()) {
+            throw new UserException("passwd is not set");
+        }
+        if (!request.isSetDb()) {
+            throw new UserException("db is not set");
+        }
+        if (!request.isSetLabelName()) {
+            throw new UserException("label_name is not set");
+        }
+        if (!request.isSetRepoName()) {
+            throw new UserException("repo_name is not set");
+        }
+        if (!request.isSetMeta()) {
+            throw new UserException("meta is not set");
+        }
+        if (!request.isSetJobInfo()) {
+            throw new UserException("job_info is not set");
+        }
+
+        // Step 2: check auth
+        String cluster = request.getCluster();
+        if (Strings.isNullOrEmpty(cluster)) {
+            cluster = SystemInfoService.DEFAULT_CLUSTER;
+        }
+
+        if (Strings.isNullOrEmpty(request.getToken())) {
+            checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(),
+                    request.getTable(), clientIp, PrivPredicate.LOAD);
+        }
+
+        // Step 3: get snapshot
+        TRestoreSnapshotResult result = new TRestoreSnapshotResult();
+        TStatus status = new TStatus(TStatusCode.OK);
+        result.setStatus(status);
+
+
+        LabelName label = new LabelName(request.getDb(), request.getLabelName());
+        String repoName = request.getRepoName();
+        Map<String, String> properties = request.getProperties();
+        RestoreStmt restoreStmt = new RestoreStmt(label, repoName, null, properties, request.getMeta(),
+                request.getJobInfo());
+        LOG.trace("restore snapshot info, restoreStmt: {}", restoreStmt);
+        try {
+            ConnectContext ctx = ConnectContext.get();
+            if (ctx == null) {
+                ctx = new ConnectContext();
+                ctx.setThreadLocalInfo();
+            }
+            ctx.setCluster(cluster);
+            ctx.setQualifiedUser(request.getUser());
+            UserIdentity currentUserIdentity = new UserIdentity(request.getUser(), "%");
+            ctx.setCurrentUserIdentity(currentUserIdentity);
+
+            Analyzer analyzer = new Analyzer(ctx.getEnv(), ctx);
+            restoreStmt.analyze(analyzer);
+            DdlExecutor.execute(Env.getCurrentEnv(), restoreStmt);
+        } catch (UserException e) {
+            LOG.warn("failed to get snapshot info: {}", e.getMessage());
+            status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+            status.addToErrorMsgs(e.getMessage());
+        } catch (Throwable e) {
+            LOG.warn("catch unknown result.", e);
+            status.setStatusCode(TStatusCode.INTERNAL_ERROR);
+            status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
+        }
+
+        return result;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java
index 64b75a70d3..6482c5f807 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java
@@ -21,9 +21,11 @@ import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.thrift.TDownloadReq;
 import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TRemoteTabletSnapshot;
 import org.apache.doris.thrift.TResourceInfo;
 import org.apache.doris.thrift.TTaskType;
 
+import java.util.List;
 import java.util.Map;
 
 public class DownloadTask extends AgentTask {
@@ -34,6 +36,9 @@ public class DownloadTask extends AgentTask {
     private Map<String, String> brokerProperties;
     private StorageBackend.StorageType storageType;
     private String location;
+    private List<TRemoteTabletSnapshot> remoteTabletSnapshots;
+    private boolean isFromLocalSnapshot = false;
+
 
     public DownloadTask(TResourceInfo resourceInfo, long backendId, long signature, long jobId, long dbId,
             Map<String, String> srcToDestPath, FsBroker brokerAddr, Map<String, String> brokerProperties,
@@ -45,6 +50,16 @@ public class DownloadTask extends AgentTask {
         this.brokerProperties = brokerProperties;
         this.storageType = storageType;
         this.location = location;
+        this.isFromLocalSnapshot = false;
+    }
+
+    public DownloadTask(TResourceInfo resourceInfo, long backendId, long signature, long jobId, long dbId,
+                        List<TRemoteTabletSnapshot> remoteTabletSnapshots) {
+        super(resourceInfo, backendId, TTaskType.DOWNLOAD, dbId, -1, -1, -1, -1, signature);
+        this.jobId = jobId;
+        this.srcToDestPath = new java.util.HashMap<String, String>();
+        this.remoteTabletSnapshots = remoteTabletSnapshots;
+        this.isFromLocalSnapshot = true;
     }
 
     public long getJobId() {
@@ -64,11 +79,22 @@ public class DownloadTask extends AgentTask {
     }
 
     public TDownloadReq toThrift() {
-        TNetworkAddress address = new TNetworkAddress(brokerAddr.host, brokerAddr.port);
-        TDownloadReq req = new TDownloadReq(jobId, srcToDestPath, address);
-        req.setBrokerProp(brokerProperties);
-        req.setStorageBackend(storageType.toThrift());
-        req.setLocation(location);
+        // these fields are required
+        // 1: required i64 job_id
+        // 2: required map<string, string> src_dest_map
+        // 3: required Types.TNetworkAddress broker_addr
+        TDownloadReq req;
+        if (isFromLocalSnapshot) {
+            TNetworkAddress brokerAddr = new TNetworkAddress("", 0); // mock broker address
+            req = new TDownloadReq(jobId, srcToDestPath, brokerAddr);
+            req.setRemoteTabletSnapshots(remoteTabletSnapshots);
+        } else {
+            TNetworkAddress address = new TNetworkAddress(brokerAddr.host, brokerAddr.port);
+            req = new TDownloadReq(jobId, srcToDestPath, address);
+            req.setBrokerProp(brokerProperties);
+            req.setStorageBackend(storageType.toThrift());
+            req.setLocation(location);
+        }
         return req;
     }
 }
diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift
index 6b74944cd7..b30c7ed26a 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -288,6 +288,16 @@ struct TUploadReq {
     6: optional string location // root path
 }
 
+struct TRemoteTabletSnapshot {
+    1: optional i64 local_tablet_id
+    2: optional string local_snapshot_path
+    3: optional i64 remote_tablet_id
+    4: optional i64 remote_be_id
+    5: optional Types.TNetworkAddress remote_be_addr
+    6: optional string remote_snapshot_path
+    7: optional string remote_token
+}
+
 struct TDownloadReq {
     1: required i64 job_id
     2: required map<string, string> src_dest_map
@@ -295,6 +305,7 @@ struct TDownloadReq {
     4: optional map<string, string> broker_prop
     5: optional Types.TStorageBackendType storage_backend = Types.TStorageBackendType.BROKER
     6: optional string location // root path
+    7: optional list<TRemoteTabletSnapshot> remote_tablet_snapshots
 }
 
 struct TSnapshotRequest {
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index 5aecdc4ba2..433b137a37 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -974,6 +974,52 @@ struct TGetTabletReplicaInfosResult {
     3: optional string token
 }
 
+enum TSnapshotType {
+    REMOTE = 0,
+    LOCAL  = 1,
+}
+
+struct TGetSnapshotRequest {
+    1: optional string cluster
+    2: optional string user
+    3: optional string passwd
+    4: optional string db
+    5: optional string table
+    6: optional string token
+    7: optional string label_name
+    8: optional string snapshot_name
+    9: optional TSnapshotType snapshot_type
+}
+
+struct TGetSnapshotResult {
+    1: optional Status.TStatus status
+    2: optional binary meta
+    3: optional binary job_info
+}
+
+struct TTableRef {
+    1: optional string table
+}
+
+struct TRestoreSnapshotRequest {
+    1: optional string cluster
+    2: optional string user
+    3: optional string passwd
+    4: optional string db
+    5: optional string table
+    6: optional string token
+    7: optional string label_name
+    8: optional string repo_name
+    9: optional list<TTableRef> table_refs
+    10: optional map<string, string> properties
+    11: optional binary meta
+    12: optional binary job_info
+}
+
+struct TRestoreSnapshotResult {
+    1: optional Status.TStatus status
+}
+
 service FrontendService {
     TGetDbsResult getDbNames(1: TGetDbsParams params)
     TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -1006,6 +1052,8 @@ service FrontendService {
     TCommitTxnResult commitTxn(1: TCommitTxnRequest request)
     TRollbackTxnResult rollbackTxn(1: TRollbackTxnRequest request)
     TGetBinlogResult getBinlog(1: TGetBinlogRequest request)
+    TGetSnapshotResult getSnapshot(1: TGetSnapshotRequest request)
+    TRestoreSnapshotResult restoreSnapshot(1: TRestoreSnapshotRequest request)
 
     TWaitingTxnStatusResult waitingTxnStatus(1: TWaitingTxnStatusRequest request)
 
diff --git a/gensrc/thrift/Status.thrift b/gensrc/thrift/Status.thrift
index 7edaecf7fb..0342f9bce0 100644
--- a/gensrc/thrift/Status.thrift
+++ b/gensrc/thrift/Status.thrift
@@ -91,6 +91,9 @@ enum TStatusCode {
     BINLOG_TOO_NEW_COMMIT_SEQ = 62,
     BINLOG_NOT_FOUND_DB = 63,
     BINLOG_NOT_FOUND_TABLE = 64,
+
+    // Snapshot Related from 70
+    SNAPSHOT_NOT_EXIST = 70,
 }
 
 struct TStatus {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 04/13: [fix](regex) String with Chinese characters matching failed (#20493)

Posted by kx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git

commit b2d47d98da512e5b8f11aa38644d8d1c391d1620
Author: Jerry Hu <mr...@gmail.com>
AuthorDate: Wed Jun 7 07:27:47 2023 +0800

    [fix](regex) String with Chinese characters matching failed (#20493)
---
 be/src/vec/functions/like.cpp                                       | 5 +++--
 .../sql_functions/string_functions/test_string_function_regexp.out  | 6 ++++++
 .../string_functions/test_string_function_regexp.groovy             | 3 +++
 3 files changed, 12 insertions(+), 2 deletions(-)

diff --git a/be/src/vec/functions/like.cpp b/be/src/vec/functions/like.cpp
index bcd8262d33..8d56c4f2d1 100644
--- a/be/src/vec/functions/like.cpp
+++ b/be/src/vec/functions/like.cpp
@@ -437,8 +437,9 @@ Status FunctionLikeBase::regexp_fn_predicate(LikeSearchState* state,
 Status FunctionLikeBase::hs_prepare(FunctionContext* context, const char* expression,
                                     hs_database_t** database, hs_scratch_t** scratch) {
     hs_compile_error_t* compile_err;
-    auto res = hs_compile(expression, HS_FLAG_DOTALL | HS_FLAG_ALLOWEMPTY, HS_MODE_BLOCK, nullptr,
-                          database, &compile_err);
+    auto res = hs_compile(expression, HS_FLAG_DOTALL | HS_FLAG_ALLOWEMPTY | HS_FLAG_UTF8,
+                          HS_MODE_BLOCK, nullptr, database, &compile_err);
+
     if (res != HS_SUCCESS) {
         *database = nullptr;
         if (context) {
diff --git a/regression-test/data/query_p0/sql_functions/string_functions/test_string_function_regexp.out b/regression-test/data/query_p0/sql_functions/string_functions/test_string_function_regexp.out
index 3c7d8473ae..415b8f2822 100644
--- a/regression-test/data/query_p0/sql_functions/string_functions/test_string_function_regexp.out
+++ b/regression-test/data/query_p0/sql_functions/string_functions/test_string_function_regexp.out
@@ -73,6 +73,12 @@ a-b c
 -- !sql --
 a <b> b
 
+-- !sql_utf1 --
+true
+
+-- !sql_utf2 --
+true
+
 -- !sql_regexp_null --
 \N
 \N
diff --git a/regression-test/suites/query_p0/sql_functions/string_functions/test_string_function_regexp.groovy b/regression-test/suites/query_p0/sql_functions/string_functions/test_string_function_regexp.groovy
index ba4f941a4b..cb80939adf 100644
--- a/regression-test/suites/query_p0/sql_functions/string_functions/test_string_function_regexp.groovy
+++ b/regression-test/suites/query_p0/sql_functions/string_functions/test_string_function_regexp.groovy
@@ -63,6 +63,9 @@ suite("test_string_function_regexp") {
     qt_sql "SELECT regexp_replace_one('a b c', \" \", \"-\");"
     qt_sql "SELECT regexp_replace_one('a b b','(b)','<\\\\1>');"
 
+    qt_sql_utf1 """ select 'çš–12345' REGEXP '^[çš–][0-9]{5}\$'; """
+    qt_sql_utf2 """ select 'çš– 12345' REGEXP '^[çš–] [0-9]{5}\$'; """
+
     // bug fix
     sql """
         INSERT INTO ${tbName} VALUES


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 05/13: [bug](table_function) fix table function node forget to call open function of expr (#20495)

Posted by kx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 317685ef791ef2b8bba8e0c0cf40b587004c0db9
Author: zhangstar333 <87...@users.noreply.github.com>
AuthorDate: Wed Jun 7 07:26:50 2023 +0800

    [bug](table_function) fix table function node forget to call open function of expr (#20495)
---
 be/src/vec/exec/vtable_function_node.h | 1 +
 1 file changed, 1 insertion(+)

diff --git a/be/src/vec/exec/vtable_function_node.h b/be/src/vec/exec/vtable_function_node.h
index d2ca9589c2..cfe74860b1 100644
--- a/be/src/vec/exec/vtable_function_node.h
+++ b/be/src/vec/exec/vtable_function_node.h
@@ -56,6 +56,7 @@ public:
     Status prepare(RuntimeState* state) override;
     Status open(RuntimeState* state) override {
         RETURN_IF_ERROR(alloc_resource(state));
+        RETURN_IF_ERROR(VExpr::open(_vfn_ctxs, state));
         return _children[0]->open(state);
     }
     Status get_next(RuntimeState* state, Block* block, bool* eos) override;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 08/13: [Bug](memleak) Fix emptyoperator may cause node not close (#20525)

Posted by kx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 4654fe0a748db8696ab7b8d8774290f1b55fec90
Author: wangbo <wa...@apache.org>
AuthorDate: Wed Jun 7 01:27:13 2023 +0800

    [Bug](memleak) Fix emptyoperator may cause node not close (#20525)
---
 be/src/pipeline/exec/empty_source_operator.cpp |  2 +-
 be/src/pipeline/exec/empty_source_operator.h   | 19 ++++++++++++++++---
 be/src/pipeline/pipeline_fragment_context.cpp  |  2 +-
 3 files changed, 18 insertions(+), 5 deletions(-)

diff --git a/be/src/pipeline/exec/empty_source_operator.cpp b/be/src/pipeline/exec/empty_source_operator.cpp
index 5142c0c55a..78f5c94662 100644
--- a/be/src/pipeline/exec/empty_source_operator.cpp
+++ b/be/src/pipeline/exec/empty_source_operator.cpp
@@ -21,7 +21,7 @@
 
 namespace doris::pipeline {
 OperatorPtr EmptySourceOperatorBuilder::build_operator() {
-    return std::make_shared<EmptySourceOperator>(this);
+    return std::make_shared<EmptySourceOperator>(this, _exec_node);
 }
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/empty_source_operator.h b/be/src/pipeline/exec/empty_source_operator.h
index fd12b27dab..4d93a310df 100644
--- a/be/src/pipeline/exec/empty_source_operator.h
+++ b/be/src/pipeline/exec/empty_source_operator.h
@@ -37,8 +37,10 @@ namespace doris::pipeline {
 
 class EmptySourceOperatorBuilder final : public OperatorBuilderBase {
 public:
-    EmptySourceOperatorBuilder(int32_t id, const RowDescriptor& row_descriptor)
-            : OperatorBuilderBase(id, "EmptySourceOperator"), _row_descriptor(row_descriptor) {}
+    EmptySourceOperatorBuilder(int32_t id, const RowDescriptor& row_descriptor, ExecNode* exec_node)
+            : OperatorBuilderBase(id, "EmptySourceOperator"),
+              _row_descriptor(row_descriptor),
+              _exec_node(exec_node) {}
 
     bool is_source() const override { return true; }
 
@@ -48,11 +50,14 @@ public:
 
 private:
     RowDescriptor _row_descriptor;
+    ExecNode* _exec_node = nullptr;
 };
 
 class EmptySourceOperator final : public OperatorBase {
 public:
-    EmptySourceOperator(OperatorBuilderBase* builder) : OperatorBase(builder) {}
+    EmptySourceOperator(OperatorBuilderBase* builder, ExecNode* exec_node)
+            : OperatorBase(builder), _exec_node(exec_node) {}
+
     bool can_read() override { return true; }
     bool is_pending_finish() const override { return false; }
 
@@ -67,6 +72,14 @@ public:
     }
 
     Status sink(RuntimeState*, vectorized::Block*, SourceState) override { return Status::OK(); }
+
+    Status close(RuntimeState* state) override {
+        _exec_node->close(state);
+        return Status::OK();
+    }
+
+private:
+    ExecNode* _exec_node = nullptr;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp
index 32207ad216..ebb30fe384 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -596,7 +596,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
             RETURN_IF_ERROR(_build_pipelines(node->child(1), new_pipe));
         } else {
             OperatorBuilderPtr builder = std::make_shared<EmptySourceOperatorBuilder>(
-                    next_operator_builder_id(), node->child(1)->row_desc());
+                    next_operator_builder_id(), node->child(1)->row_desc(), node->child(1));
             new_pipe->add_operator(builder);
         }
         OperatorBuilderPtr join_sink =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 13/13: [Feature](multi-catalog)support paimon catalog (#19681)

Posted by kx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f4f86b07d842a242edd1cf4abe1ede8019c89584
Author: yuxuan-luo <11...@users.noreply.github.com>
AuthorDate: Tue Jun 6 15:08:30 2023 +0800

    [Feature](multi-catalog)support paimon catalog (#19681)
    
    CREATE CATALOG paimon_n2 PROPERTIES (
    "dfs.ha.namenodes.HDFS1006531" = "nn2,nn1",
    "dfs.namenode.rpc-address.HDFS1006531.nn2" = "172.16.65.xx:4007",
    "dfs.namenode.rpc-address.HDFS1006531.nn1" = "172.16.65.xx:4007",
    "hive.metastore.uris" = "thrift://172.16.65.xx:7004",
    "type" = "paimon",
    "dfs.nameservices" = "HDFS1006531",
    "hadoop.username" = "hadoop",
    "paimon.catalog.type" = "hms",
    "warehouse" = "hdfs://HDFS1006531/data/paimon1",
    "dfs.client.failover.proxy.provider.HDFS1006531" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    );
---
 be/src/vec/CMakeLists.txt                          |   1 +
 be/src/vec/exec/scan/paimon_reader.cpp             |  84 ++++++++++
 be/src/vec/exec/scan/paimon_reader.h               |  76 +++++++++
 be/src/vec/exec/scan/vfile_scanner.cpp             |   8 +
 fe/fe-core/pom.xml                                 |  19 +++
 .../java/org/apache/doris/catalog/TableIf.java     |   4 +-
 .../catalog/external/PaimonExternalDatabase.java   |  72 ++++++++
 .../catalog/external/PaimonExternalTable.java      | 132 +++++++++++++++
 .../apache/doris/datasource/CatalogFactory.java    |   4 +
 .../apache/doris/datasource/ExternalCatalog.java   |   3 +
 .../apache/doris/datasource/InitCatalogLog.java    |   1 +
 .../apache/doris/datasource/InitDatabaseLog.java   |   1 +
 .../datasource/paimon/PaimonExternalCatalog.java   | 103 ++++++++++++
 .../paimon/PaimonHMSExternalCatalog.java           | 105 ++++++++++++
 .../property/constants/PaimonProperties.java}      |  18 +-
 .../glue/translator/PhysicalPlanTranslator.java    |   4 +
 .../org/apache/doris/persist/gson/GsonUtils.java   |   8 +
 .../apache/doris/planner/SingleNodePlanner.java    |   4 +
 .../doris/planner/external/FileQueryScanNode.java  |   4 +
 .../doris/planner/external/TableFormatType.java    |   3 +-
 .../planner/external/paimon/PaimonScanNode.java    | 177 ++++++++++++++++++++
 .../planner/external/paimon/PaimonSource.java      |  64 +++++++
 .../doris/planner/external/paimon/PaimonSplit.java |  65 +++++++
 .../org/apache/doris/statistics/DeriveFactory.java |   1 +
 .../apache/doris/statistics/StatisticalType.java   |   1 +
 fe/java-udf/pom.xml                                |  21 ++-
 .../org/apache/doris/jni/PaimonJniScanner.java     | 186 +++++++++++++++++++++
 .../apache/doris/jni/vec/PaimonColumnValue.java    | 131 +++++++++++++++
 fe/pom.xml                                         |   2 +-
 gensrc/thrift/PlanNodes.thrift                     |  24 ++-
 30 files changed, 1299 insertions(+), 27 deletions(-)

diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 2bf3f245a1..bec6a747b5 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -352,6 +352,7 @@ set(VEC_FILES
   exec/format/parquet/bool_rle_decoder.cpp
   exec/jni_connector.cpp
   exec/scan/jni_reader.cpp
+  exec/scan/paimon_reader.cpp
   exec/scan/max_compute_jni_reader.cpp
   )
 
diff --git a/be/src/vec/exec/scan/paimon_reader.cpp b/be/src/vec/exec/scan/paimon_reader.cpp
new file mode 100644
index 0000000000..906973d838
--- /dev/null
+++ b/be/src/vec/exec/scan/paimon_reader.cpp
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "paimon_reader.h"
+
+#include <map>
+#include <ostream>
+
+#include "runtime/descriptors.h"
+#include "runtime/types.h"
+#include "vec/core/types.h"
+
+namespace doris {
+class RuntimeProfile;
+class RuntimeState;
+
+namespace vectorized {
+class Block;
+} // namespace vectorized
+} // namespace doris
+
+namespace doris::vectorized {
+
+PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_descs,
+                                 RuntimeState* state, RuntimeProfile* profile,
+                                 const TFileRangeDesc& range)
+        : _file_slot_descs(file_slot_descs), _state(state), _profile(profile) {
+    std::vector<std::string> column_names;
+    for (auto& desc : _file_slot_descs) {
+        std::string field = desc->col_name();
+        column_names.emplace_back(field);
+    }
+    std::map<String, String> params;
+    params["required_fields"] = range.table_format_params.paimon_params.paimon_column_names;
+    params["columns_types"] = range.table_format_params.paimon_params.paimon_column_types;
+    params["columns_id"] = range.table_format_params.paimon_params.paimon_column_ids;
+    params["hive.metastore.uris"] = range.table_format_params.paimon_params.hive_metastore_uris;
+    params["warehouse"] = range.table_format_params.paimon_params.warehouse;
+    params["db_name"] = range.table_format_params.paimon_params.db_name;
+    params["table_name"] = range.table_format_params.paimon_params.table_name;
+    params["length_byte"] = range.table_format_params.paimon_params.length_byte;
+    params["split_byte"] =
+            std::to_string((int64_t)range.table_format_params.paimon_params.paimon_split.data());
+    _jni_connector = std::make_unique<JniConnector>("org/apache/doris/jni/PaimonJniScanner", params,
+                                                    column_names);
+}
+
+Status PaimonJniReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
+    RETURN_IF_ERROR(_jni_connector->get_nex_block(block, read_rows, eof));
+    if (*eof) {
+        RETURN_IF_ERROR(_jni_connector->close());
+    }
+    return Status::OK();
+}
+
+Status PaimonJniReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
+                                    std::unordered_set<std::string>* missing_cols) {
+    for (auto& desc : _file_slot_descs) {
+        name_to_type->emplace(desc->col_name(), desc->type());
+    }
+    return Status::OK();
+}
+
+Status PaimonJniReader::init_reader(
+        std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
+    _colname_to_value_range = colname_to_value_range;
+    RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
+    return _jni_connector->open(_state, _profile);
+}
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/paimon_reader.h b/be/src/vec/exec/scan/paimon_reader.h
new file mode 100644
index 0000000000..be90d6d849
--- /dev/null
+++ b/be/src/vec/exec/scan/paimon_reader.h
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stddef.h>
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "common/status.h"
+#include "exec/olap_common.h"
+#include "vec/exec/format/generic_reader.h"
+#include "vec/exec/jni_connector.h"
+
+namespace doris {
+class RuntimeProfile;
+class RuntimeState;
+class SlotDescriptor;
+namespace vectorized {
+class Block;
+} // namespace vectorized
+struct TypeDescriptor;
+} // namespace doris
+
+namespace doris::vectorized {
+
+/**
+ * The demo usage of JniReader, showing how to read data from java scanner.
+ * The java side is also a mock reader that provide values for each type.
+ * This class will only be retained during the functional testing phase to verify that
+ * the communication and data exchange with the jvm are correct.
+ */
+class PaimonJniReader : public GenericReader {
+    ENABLE_FACTORY_CREATOR(PaimonJniReader);
+
+public:
+    PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state,
+                    RuntimeProfile* profile, const TFileRangeDesc& range);
+
+    ~PaimonJniReader() override = default;
+
+    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+
+    Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
+                       std::unordered_set<std::string>* missing_cols) override;
+
+    Status init_reader(
+            std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
+
+private:
+    const std::vector<SlotDescriptor*>& _file_slot_descs;
+    RuntimeState* _state;
+    RuntimeProfile* _profile;
+    std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
+    std::unique_ptr<JniConnector> _jni_connector;
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index b7f8119553..a539abc9f0 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -62,6 +62,7 @@
 #include "vec/exec/format/table/iceberg_reader.h"
 #include "vec/exec/scan/max_compute_jni_reader.h"
 #include "vec/exec/scan/new_file_scan_node.h"
+#include "vec/exec/scan/paimon_reader.h"
 #include "vec/exec/scan/vscan_node.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/exprs/vexpr_context.h"
@@ -600,6 +601,13 @@ Status VFileScanner::_get_next_reader() {
                 init_status = mc_reader->init_reader(_colname_to_value_range);
                 _cur_reader = std::move(mc_reader);
             }
+            if (range.__isset.table_format_params &&
+                range.table_format_params.table_format_type == "paimon") {
+                _cur_reader =
+                        PaimonJniReader::create_unique(_file_slot_descs, _state, _profile, range);
+                init_status = ((PaimonJniReader*)(_cur_reader.get()))
+                                      ->init_reader(_colname_to_value_range);
+            }
             break;
         }
         case TFileFormatType::FORMAT_PARQUET: {
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index 4bce0193b4..db8e0b7ec1 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -34,6 +34,7 @@ under the License.
         <fe_ut_parallel>1</fe_ut_parallel>
         <antlr4.version>4.9.3</antlr4.version>
         <awssdk.version>2.17.257</awssdk.version>
+        <paimon.version>0.4-SNAPSHOT</paimon.version>
     </properties>
     <profiles>
         <profile>
@@ -529,6 +530,24 @@ under the License.
             <artifactId>iceberg-aws</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-core</artifactId>
+            <version>${paimon.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-common</artifactId>
+            <version>${paimon.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-format</artifactId>
+            <version>${paimon.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>software.amazon.awssdk</groupId>
             <artifactId>glue</artifactId>
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index c79acc79df..95f8873c60 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -134,7 +134,8 @@ public interface TableIf {
     enum TableType {
         MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE, ICEBERG, @Deprecated HUDI, JDBC,
         TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE, MATERIALIZED_VIEW, JDBC_EXTERNAL_TABLE,
-        ICEBERG_EXTERNAL_TABLE, TEST_EXTERNAL_TABLE, MAX_COMPUTE_EXTERNAL_TABLE, HUDI_EXTERNAL_TABLE;
+        ICEBERG_EXTERNAL_TABLE, TEST_EXTERNAL_TABLE, PAIMON_EXTERNAL_TABLE, MAX_COMPUTE_EXTERNAL_TABLE,
+        HUDI_EXTERNAL_TABLE;
 
         public String toEngineName() {
             switch (this) {
@@ -198,6 +199,7 @@ public interface TableIf {
                 case HMS_EXTERNAL_TABLE:
                 case ES_EXTERNAL_TABLE:
                 case ICEBERG_EXTERNAL_TABLE:
+                case PAIMON_EXTERNAL_TABLE:
                     return "EXTERNAL TABLE";
                 default:
                     return null;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalDatabase.java
new file mode 100644
index 0000000000..ac6d6932c6
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalDatabase.java
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog.external;
+
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.InitDatabaseLog;
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+import org.apache.doris.persist.gson.GsonPostProcessable;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class PaimonExternalDatabase extends ExternalDatabase<PaimonExternalTable> implements GsonPostProcessable {
+
+    private static final Logger LOG = LogManager.getLogger(PaimonExternalDatabase.class);
+
+    public PaimonExternalDatabase(ExternalCatalog extCatalog, Long id, String name) {
+        super(extCatalog, id, name, InitDatabaseLog.Type.PAIMON);
+    }
+
+    @Override
+    protected PaimonExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) {
+        return new PaimonExternalTable(tblId, tableName, name, (PaimonExternalCatalog) extCatalog);
+    }
+
+    @Override
+    public List<PaimonExternalTable> getTablesOnIdOrder() {
+        // Sort the name instead, because the id may change.
+        return getTables().stream().sorted(Comparator.comparing(TableIf::getName)).collect(Collectors.toList());
+    }
+
+    @Override
+    public void dropTable(String tableName) {
+        LOG.debug("drop table [{}]", tableName);
+        makeSureInitialized();
+        Long tableId = tableNameToId.remove(tableName);
+        if (tableId == null) {
+            LOG.warn("drop table [{}] failed", tableName);
+        }
+        idToTbl.remove(tableId);
+    }
+
+    @Override
+    public void createTable(String tableName, long tableId) {
+        LOG.debug("create table [{}]", tableName);
+        makeSureInitialized();
+        tableNameToId.put(tableName, tableId);
+        PaimonExternalTable table = new PaimonExternalTable(tableId, tableName, name,
+                (PaimonExternalCatalog) extCatalog);
+        idToTbl.put(tableId, table);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java
new file mode 100644
index 0000000000..c821160dd4
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog.external;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+import org.apache.doris.thrift.THiveTable;
+import org.apache.doris.thrift.TTableDescriptor;
+import org.apache.doris.thrift.TTableType;
+
+import com.google.common.collect.Lists;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DecimalType;
+
+import java.util.HashMap;
+import java.util.List;
+
+public class PaimonExternalTable extends ExternalTable {
+
+    private static final Logger LOG = LogManager.getLogger(PaimonExternalTable.class);
+
+    public static final int PAIMON_DATETIME_SCALE_MS = 3;
+    private Table originTable = null;
+
+    public PaimonExternalTable(long id, String name, String dbName, PaimonExternalCatalog catalog) {
+        super(id, name, catalog, dbName, TableType.PAIMON_EXTERNAL_TABLE);
+    }
+
+    public String getPaimonCatalogType() {
+        return ((PaimonExternalCatalog) catalog).getPaimonCatalogType();
+    }
+
+    protected synchronized void makeSureInitialized() {
+        super.makeSureInitialized();
+        if (!objectCreated) {
+            objectCreated = true;
+        }
+    }
+
+    public Table getOriginTable() {
+        if (originTable == null) {
+            originTable = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, name);
+        }
+        return originTable;
+    }
+
+    @Override
+    public List<Column> initSchema() {
+        Table table = getOriginTable();
+        TableSchema schema = ((AbstractFileStoreTable) table).schema();
+        List<DataField> columns = schema.fields();
+        List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size());
+        for (DataField field : columns) {
+            tmpSchema.add(new Column(field.name(),
+                    paimonTypeToDorisType(field.type()), true, null, true, field.description(), true,
+                    field.id()));
+        }
+        return tmpSchema;
+    }
+
+    private Type paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) {
+        switch (dataType.getTypeRoot()) {
+            case BOOLEAN:
+                return Type.BOOLEAN;
+            case INTEGER:
+                return Type.INT;
+            case BIGINT:
+                return Type.BIGINT;
+            case FLOAT:
+                return Type.FLOAT;
+            case DOUBLE:
+                return Type.DOUBLE;
+            case VARCHAR:
+            case BINARY:
+            case CHAR:
+                return Type.STRING;
+            case DECIMAL:
+                DecimalType decimal = (DecimalType) dataType;
+                return ScalarType.createDecimalV3Type(decimal.getPrecision(), decimal.getScale());
+            case DATE:
+                return ScalarType.createDateV2Type();
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return ScalarType.createDatetimeV2Type(PAIMON_DATETIME_SCALE_MS);
+            case TIME_WITHOUT_TIME_ZONE:
+                return Type.UNSUPPORTED;
+            default:
+                throw new IllegalArgumentException("Cannot transform unknown type: " + dataType.getTypeRoot());
+        }
+    }
+
+    protected Type paimonTypeToDorisType(org.apache.paimon.types.DataType type) {
+        return paimonPrimitiveTypeToDorisType(type);
+    }
+
+    @Override
+    public TTableDescriptor toThrift() {
+        List<Column> schema = getFullSchema();
+        if (getPaimonCatalogType().equals("hms")) {
+            THiveTable tHiveTable = new THiveTable(dbName, name, new HashMap<>());
+            TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.HIVE_TABLE, schema.size(), 0,
+                    getName(), dbName);
+            tTableDescriptor.setHiveTable(tHiveTable);
+            return tTableDescriptor;
+        } else {
+            throw new IllegalArgumentException("Currently only supports hms catalog,not support :"
+                + getPaimonCatalogType());
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
index f530bcc5f8..358b53a274 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
@@ -28,6 +28,7 @@ import org.apache.doris.catalog.Resource;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.datasource.iceberg.IcebergExternalCatalogFactory;
+import org.apache.doris.datasource.paimon.PaimonHMSExternalCatalog;
 import org.apache.doris.datasource.test.TestExternalCatalog;
 
 import com.google.common.base.Strings;
@@ -122,6 +123,9 @@ public class CatalogFactory {
             case "iceberg":
                 catalog = IcebergExternalCatalogFactory.createCatalog(catalogId, name, resource, props, comment);
                 break;
+            case "paimon":
+                catalog = new PaimonHMSExternalCatalog(catalogId, name, resource, props, comment);
+                break;
             case "max_compute":
                 catalog = new MaxComputeExternalCatalog(catalogId, name, resource, props, comment);
                 break;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index a23842c9bd..cf2de86494 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -27,6 +27,7 @@ import org.apache.doris.catalog.external.HMSExternalDatabase;
 import org.apache.doris.catalog.external.IcebergExternalDatabase;
 import org.apache.doris.catalog.external.JdbcExternalDatabase;
 import org.apache.doris.catalog.external.MaxComputeExternalDatabase;
+import org.apache.doris.catalog.external.PaimonExternalDatabase;
 import org.apache.doris.catalog.external.TestExternalDatabase;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.DdlException;
@@ -448,6 +449,8 @@ public abstract class ExternalCatalog
                 //return new HudiExternalDatabase(this, dbId, dbName);
             case TEST:
                 return new TestExternalDatabase(this, dbId, dbName);
+            case PAIMON:
+                return new PaimonExternalDatabase(this, dbId, dbName);
             default:
                 break;
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
index ecc284b325..73fbeeb781 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
@@ -37,6 +37,7 @@ public class InitCatalogLog implements Writable {
         ES,
         JDBC,
         ICEBERG,
+        PAIMON,
         MAX_COMPUTE,
         HUDI,
         TEST,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java
index a49dd5232c..14cd4410ec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java
@@ -39,6 +39,7 @@ public class InitDatabaseLog implements Writable {
         JDBC,
         MAX_COMPUTE,
         HUDI,
+        PAIMON,
         TEST,
         UNKNOWN;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
new file mode 100644
index 0000000000..3c024fadfb
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.InitCatalogLog;
+import org.apache.doris.datasource.SessionContext;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public abstract class PaimonExternalCatalog extends ExternalCatalog {
+
+    private static final Logger LOG = LogManager.getLogger(PaimonExternalCatalog.class);
+    public static final String PAIMON_HMS = "hms";
+    protected String paimonCatalogType;
+    protected Catalog catalog;
+
+    public PaimonExternalCatalog(long catalogId, String name, String comment) {
+        super(catalogId, name, InitCatalogLog.Type.PAIMON, comment);
+        this.type = "paimon";
+    }
+
+    @Override
+    protected void init() {
+        super.init();
+    }
+
+    protected Configuration getConfiguration() {
+        Configuration conf = new HdfsConfiguration();
+        Map<String, String> catalogProperties = catalogProperty.getHadoopProperties();
+        for (Map.Entry<String, String> entry : catalogProperties.entrySet()) {
+            conf.set(entry.getKey(), entry.getValue());
+        }
+        return conf;
+    }
+
+    public Catalog getCatalog() {
+        makeSureInitialized();
+        return catalog;
+    }
+
+    public String getPaimonCatalogType() {
+        makeSureInitialized();
+        return paimonCatalogType;
+    }
+
+    protected List<String> listDatabaseNames() {
+        return new ArrayList<>(catalog.listDatabases());
+    }
+
+    @Override
+    public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
+        makeSureInitialized();
+        return catalog.tableExists(Identifier.create(dbName, tblName));
+    }
+
+    @Override
+    public List<String> listTableNames(SessionContext ctx, String dbName) {
+        makeSureInitialized();
+        List<String> tableNames = null;
+        try {
+            tableNames = catalog.listTables(dbName);
+        } catch (Catalog.DatabaseNotExistException e) {
+            LOG.warn("DatabaseNotExistException", e);
+        }
+        return tableNames;
+    }
+
+    public org.apache.paimon.table.Table getPaimonTable(String dbName, String tblName) {
+        makeSureInitialized();
+        org.apache.paimon.table.Table table = null;
+        try {
+            table = catalog.getTable(Identifier.create(dbName, tblName));
+        } catch (Catalog.TableNotExistException e) {
+            LOG.warn("TableNotExistException", e);
+        }
+        return table;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java
new file mode 100644
index 0000000000..13775b0edf
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.datasource.CatalogProperty;
+import org.apache.doris.datasource.property.PropertyConverter;
+import org.apache.doris.datasource.property.constants.HMSProperties;
+import org.apache.doris.datasource.property.constants.PaimonProperties;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.hive.HiveCatalogOptions;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+
+
+
+public class PaimonHMSExternalCatalog extends PaimonExternalCatalog {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PaimonHMSExternalCatalog.class);
+    public static final String METASTORE = "metastore";
+    public static final String METASTORE_HIVE = "hive";
+    public static final String URI = "uri";
+    private static final ConfigOption<String> METASTORE_CLIENT_CLASS =
+            ConfigOptions.key("metastore.client.class")
+            .stringType()
+            .defaultValue("org.apache.hadoop.hive.metastore.HiveMetaStoreClient")
+            .withDescription(
+                "Class name of Hive metastore client.\n"
+                    + "NOTE: This class must directly implements "
+                    + "org.apache.hadoop.hive.metastore.IMetaStoreClient.");
+
+    public PaimonHMSExternalCatalog(long catalogId, String name, String resource,
+                                    Map<String, String> props, String comment) {
+        super(catalogId, name, comment);
+        props = PropertyConverter.convertToMetaProperties(props);
+        catalogProperty = new CatalogProperty(resource, props);
+        paimonCatalogType = PAIMON_HMS;
+    }
+
+    @Override
+    protected void initLocalObjectsImpl() {
+        String metastoreUris = catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, "");
+        String warehouse = catalogProperty.getOrDefault(PaimonProperties.WAREHOUSE, "");
+        Options options = new Options();
+        options.set(PaimonProperties.WAREHOUSE, warehouse);
+        // Currently, only supports hive
+        options.set(METASTORE, METASTORE_HIVE);
+        options.set(URI, metastoreUris);
+        CatalogContext context = CatalogContext.create(options, getConfiguration());
+        try {
+            catalog = create(context);
+        } catch (IOException e) {
+            LOG.warn("failed to create paimon external catalog ", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private Catalog create(CatalogContext context) throws IOException {
+        Path warehousePath = new Path(context.options().get(CatalogOptions.WAREHOUSE));
+        FileIO fileIO;
+        fileIO = FileIO.get(warehousePath, context);
+        String uri = context.options().get(CatalogOptions.URI);
+        String hiveConfDir = context.options().get(HiveCatalogOptions.HIVE_CONF_DIR);
+        String hadoopConfDir = context.options().get(HiveCatalogOptions.HADOOP_CONF_DIR);
+        HiveConf hiveConf = HiveCatalog.createHiveConf(hiveConfDir, hadoopConfDir);
+
+        // always using user-set parameters overwrite hive-site.xml parameters
+        context.options().toMap().forEach(hiveConf::set);
+        hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, uri);
+        // set the warehouse location to the hiveConf
+        hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, context.options().get(CatalogOptions.WAREHOUSE));
+
+        String clientClassName = context.options().get(METASTORE_CLIENT_CLASS);
+
+        return new HiveCatalog(fileIO, hiveConf, clientClassName, context.options().toMap());
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java
similarity index 70%
copy from fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
copy to fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java
index 6fc5d69544..e372dd5788 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java
@@ -15,20 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.planner.external;
+package org.apache.doris.datasource.property.constants;
 
-public enum TableFormatType {
-    HIVE("hive"),
-    ICEBERG("iceberg"),
-    HUDI("hudi");
-
-    private final String tableFormatType;
-
-    TableFormatType(String tableFormatType) {
-        this.tableFormatType = tableFormatType;
-    }
-
-    public String value() {
-        return tableFormatType;
-    }
+public class PaimonProperties {
+    public static final String WAREHOUSE = "warehouse";
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 5d2b3c8f36..03f95dff5a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -47,6 +47,7 @@ import org.apache.doris.catalog.Type;
 import org.apache.doris.catalog.external.ExternalTable;
 import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.catalog.external.IcebergExternalTable;
+import org.apache.doris.catalog.external.PaimonExternalTable;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.Util;
@@ -157,6 +158,7 @@ import org.apache.doris.planner.UnionNode;
 import org.apache.doris.planner.external.HiveScanNode;
 import org.apache.doris.planner.external.HudiScanNode;
 import org.apache.doris.planner.external.iceberg.IcebergScanNode;
+import org.apache.doris.planner.external.paimon.PaimonScanNode;
 import org.apache.doris.tablefunction.TableValuedFunctionIf;
 import org.apache.doris.thrift.TColumn;
 import org.apache.doris.thrift.TFetchOption;
@@ -717,6 +719,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
             }
         } else if (table instanceof IcebergExternalTable) {
             scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
+        } else if (table instanceof PaimonExternalTable) {
+            scanNode = new PaimonScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
         }
         Preconditions.checkNotNull(scanNode);
         fileScan.getConjuncts().stream()
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index bf7c0a5484..413a96eac2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -51,6 +51,8 @@ import org.apache.doris.catalog.external.JdbcExternalDatabase;
 import org.apache.doris.catalog.external.JdbcExternalTable;
 import org.apache.doris.catalog.external.MaxComputeExternalDatabase;
 import org.apache.doris.catalog.external.MaxComputeExternalTable;
+import org.apache.doris.catalog.external.PaimonExternalDatabase;
+import org.apache.doris.catalog.external.PaimonExternalTable;
 import org.apache.doris.common.util.RangeUtils;
 import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.EsExternalCatalog;
@@ -63,6 +65,8 @@ import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergGlueExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergHMSExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergRestExternalCatalog;
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+import org.apache.doris.datasource.paimon.PaimonHMSExternalCatalog;
 import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo;
 import org.apache.doris.load.loadv2.SparkLoadJob.SparkLoadJobStateUpdateInfo;
 import org.apache.doris.load.routineload.AbstractDataSourceProperties;
@@ -194,6 +198,8 @@ public class GsonUtils {
             .registerSubtype(IcebergGlueExternalCatalog.class, IcebergGlueExternalCatalog.class.getSimpleName())
             .registerSubtype(IcebergRestExternalCatalog.class, IcebergRestExternalCatalog.class.getSimpleName())
             .registerSubtype(IcebergDLFExternalCatalog.class, IcebergDLFExternalCatalog.class.getSimpleName())
+            .registerSubtype(PaimonExternalCatalog.class, PaimonExternalCatalog.class.getSimpleName())
+            .registerSubtype(PaimonHMSExternalCatalog.class, PaimonHMSExternalCatalog.class.getSimpleName())
             .registerSubtype(MaxComputeExternalCatalog.class, MaxComputeExternalCatalog.class.getSimpleName());
     // routine load data source
     private static RuntimeTypeAdapterFactory<AbstractDataSourceProperties> rdsTypeAdapterFactory =
@@ -208,6 +214,7 @@ public class GsonUtils {
             .registerSubtype(HMSExternalDatabase.class, HMSExternalDatabase.class.getSimpleName())
             .registerSubtype(JdbcExternalDatabase.class, JdbcExternalDatabase.class.getSimpleName())
             .registerSubtype(IcebergExternalDatabase.class, IcebergExternalDatabase.class.getSimpleName())
+            .registerSubtype(PaimonExternalDatabase.class, PaimonExternalDatabase.class.getSimpleName())
             .registerSubtype(MaxComputeExternalDatabase.class, MaxComputeExternalDatabase.class.getSimpleName());
 
     private static RuntimeTypeAdapterFactory<TableIf> tblTypeAdapterFactory = RuntimeTypeAdapterFactory.of(
@@ -216,6 +223,7 @@ public class GsonUtils {
             .registerSubtype(HMSExternalTable.class, HMSExternalTable.class.getSimpleName())
             .registerSubtype(JdbcExternalTable.class, JdbcExternalTable.class.getSimpleName())
             .registerSubtype(IcebergExternalTable.class, IcebergExternalTable.class.getSimpleName())
+            .registerSubtype(PaimonExternalTable.class, PaimonExternalTable.class.getSimpleName())
             .registerSubtype(MaxComputeExternalTable.class, MaxComputeExternalTable.class.getSimpleName());
 
     // runtime adapter for class "HeartbeatResponse"
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 0ae4a35edb..97de7d82ce 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -73,6 +73,7 @@ import org.apache.doris.planner.external.HiveScanNode;
 import org.apache.doris.planner.external.HudiScanNode;
 import org.apache.doris.planner.external.MaxComputeScanNode;
 import org.apache.doris.planner.external.iceberg.IcebergScanNode;
+import org.apache.doris.planner.external.paimon.PaimonScanNode;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
 import org.apache.doris.statistics.StatisticalType;
@@ -2019,6 +2020,9 @@ public class SingleNodePlanner {
             case ICEBERG_EXTERNAL_TABLE:
                 scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
                 break;
+            case PAIMON_EXTERNAL_TABLE:
+                scanNode = new PaimonScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
+                break;
             case MAX_COMPUTE_EXTERNAL_TABLE:
                 // TODO: support max compute scan node
                 scanNode = new MaxComputeScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "MCScanNode",
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index 508b174fb5..4c794d083f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -38,6 +38,8 @@ import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.external.iceberg.IcebergScanNode;
 import org.apache.doris.planner.external.iceberg.IcebergSplit;
+import org.apache.doris.planner.external.paimon.PaimonScanNode;
+import org.apache.doris.planner.external.paimon.PaimonSplit;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.system.Backend;
@@ -279,6 +281,8 @@ public abstract class FileQueryScanNode extends FileScanNode {
             if (fileSplit instanceof IcebergSplit) {
                 // TODO: extract all data lake split to factory
                 IcebergScanNode.setIcebergParams(rangeDesc, (IcebergSplit) fileSplit);
+            } else if (fileSplit instanceof PaimonSplit) {
+                PaimonScanNode.setPaimonParams(rangeDesc, (PaimonSplit) fileSplit);
             }
 
             // if (fileSplit instanceof HudiSplit) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
index 6fc5d69544..f97c8ea1ec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
@@ -20,7 +20,8 @@ package org.apache.doris.planner.external;
 public enum TableFormatType {
     HIVE("hive"),
     ICEBERG("iceberg"),
-    HUDI("hudi");
+    HUDI("hudi"),
+    PAIMON("paimon");
 
     private final String tableFormatType;
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
new file mode 100644
index 0000000000..13da29b2bd
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external.paimon;
+
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.external.ExternalTable;
+import org.apache.doris.catalog.external.PaimonExternalTable;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.S3Util;
+import org.apache.doris.datasource.property.constants.PaimonProperties;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.external.FileQueryScanNode;
+import org.apache.doris.planner.external.TableFormatType;
+import org.apache.doris.spi.Split;
+import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileRangeDesc;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TPaimonFileDesc;
+import org.apache.doris.thrift.TTableFormatFileDesc;
+
+import avro.shaded.com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.paimon.hive.mapred.PaimonInputSplit;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.DataField;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PaimonScanNode extends FileQueryScanNode {
+    private static PaimonSource source = null;
+
+    public PaimonScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) {
+        super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE, needCheckColumnPriv);
+    }
+
+    @Override
+    protected void doInitialize() throws UserException {
+        ExternalTable table = (ExternalTable) desc.getTable();
+        if (table.isView()) {
+            throw new AnalysisException(
+                String.format("Querying external view '%s.%s' is not supported", table.getDbName(), table.getName()));
+        }
+        computeColumnFilter();
+        initBackendPolicy();
+        source = new PaimonSource((PaimonExternalTable) table, desc, columnNameToRange);
+        Preconditions.checkNotNull(source);
+        initSchemaParams();
+    }
+
+    public static void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) {
+        TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
+        tableFormatFileDesc.setTableFormatType(paimonSplit.getTableFormatType().value());
+        TPaimonFileDesc fileDesc = new TPaimonFileDesc();
+        fileDesc.setPaimonSplit(paimonSplit.getSerializableSplit());
+        fileDesc.setLengthByte(Integer.toString(paimonSplit.getSerializableSplit().length));
+        //Paimon columnNames,columnTypes,columnIds that need to be transported into JNI
+        StringBuilder columnNamesBuilder = new StringBuilder();
+        StringBuilder columnTypesBuilder = new StringBuilder();
+        StringBuilder columnIdsBuilder = new StringBuilder();
+        Map<String, Integer> paimonFieldsId = new HashMap<>();
+        Map<String, String> paimonFieldsName = new HashMap<>();
+        for (DataField field : ((AbstractFileStoreTable) source.getPaimonTable()).schema().fields()) {
+            paimonFieldsId.put(field.name(), field.id());
+            paimonFieldsName.put(field.name(), field.type().toString());
+        }
+        boolean isFirst = true;
+        for (SlotDescriptor slot : source.getDesc().getSlots()) {
+            if (!isFirst) {
+                columnNamesBuilder.append(",");
+                columnTypesBuilder.append(",");
+                columnIdsBuilder.append(",");
+            }
+            columnNamesBuilder.append(slot.getColumn().getName());
+            columnTypesBuilder.append(paimonFieldsName.get(slot.getColumn().getName()));
+            columnIdsBuilder.append(paimonFieldsId.get(slot.getColumn().getName()));
+            isFirst = false;
+        }
+        fileDesc.setPaimonColumnIds(columnIdsBuilder.toString());
+        fileDesc.setPaimonColumnNames(columnNamesBuilder.toString());
+        fileDesc.setPaimonColumnTypes(columnTypesBuilder.toString());
+        fileDesc.setHiveMetastoreUris(source.getCatalog().getCatalogProperty().getProperties()
+                .get(HiveConf.ConfVars.METASTOREURIS.varname));
+        fileDesc.setWarehouse(source.getCatalog().getCatalogProperty().getProperties()
+                .get(PaimonProperties.WAREHOUSE));
+        fileDesc.setDbName(((PaimonExternalTable) source.getTargetTable()).getDbName());
+        fileDesc.setTableName(source.getTargetTable().getName());
+        tableFormatFileDesc.setPaimonParams(fileDesc);
+        rangeDesc.setTableFormatParams(tableFormatFileDesc);
+    }
+
+    @Override
+    public List<Split> getSplits() throws UserException {
+        List<Split> splits = new ArrayList<>();
+        ReadBuilder readBuilder = source.getPaimonTable().newReadBuilder();
+        List<org.apache.paimon.table.source.Split> paimonSplits = readBuilder.newScan().plan().splits();
+        for (org.apache.paimon.table.source.Split split : paimonSplits) {
+            PaimonInputSplit inputSplit = new PaimonInputSplit(
+                        "tempDir",
+                        (DataSplit) split
+            );
+            PaimonSplit paimonSplit = new PaimonSplit(inputSplit,
+                    ((AbstractFileStoreTable) source.getPaimonTable()).location().toString());
+            paimonSplit.setTableFormatType(TableFormatType.PAIMON);
+            splits.add(paimonSplit);
+        }
+        return splits;
+    }
+
+    @Override
+    public TFileType getLocationType() throws DdlException, MetaNotFoundException {
+        String location = ((AbstractFileStoreTable) source.getPaimonTable()).location().toString();
+        if (location != null && !location.isEmpty()) {
+            if (S3Util.isObjStorage(location)) {
+                return TFileType.FILE_S3;
+            } else if (location.startsWith(FeConstants.FS_PREFIX_HDFS)) {
+                return TFileType.FILE_HDFS;
+            } else if (location.startsWith(FeConstants.FS_PREFIX_FILE)) {
+                return TFileType.FILE_LOCAL;
+            }
+        }
+        throw new DdlException("Unknown file location " + location
+            + " for hms table " + source.getPaimonTable().name());
+    }
+
+    @Override
+    public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException {
+        return TFileFormatType.FORMAT_JNI;
+    }
+
+    @Override
+    public List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundException {
+        return new ArrayList<>(source.getPaimonTable().partitionKeys());
+    }
+
+    @Override
+    public TFileAttributes getFileAttributes() throws UserException {
+        return source.getFileAttributes();
+    }
+
+    @Override
+    public TableIf getTargetTable() {
+        return source.getTargetTable();
+    }
+
+    @Override
+    public Map<String, String> getLocationProperties() throws MetaNotFoundException, DdlException {
+        return source.getCatalog().getProperties();
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java
new file mode 100644
index 0000000000..2f55e30c08
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external.paimon;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.external.PaimonExternalTable;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.planner.ColumnRange;
+import org.apache.doris.thrift.TFileAttributes;
+
+import org.apache.paimon.table.Table;
+
+import java.util.Map;
+
+public class PaimonSource {
+    private final PaimonExternalTable paimonExtTable;
+    private final Table originTable;
+
+    private final TupleDescriptor desc;
+
+    public PaimonSource(PaimonExternalTable table, TupleDescriptor desc,
+                            Map<String, ColumnRange> columnNameToRange) {
+        this.paimonExtTable = table;
+        this.originTable = paimonExtTable.getOriginTable();
+        this.desc = desc;
+    }
+
+    public TupleDescriptor getDesc() {
+        return desc;
+    }
+
+    public Table getPaimonTable() {
+        return originTable;
+    }
+
+    public TableIf getTargetTable() {
+        return paimonExtTable;
+    }
+
+    public TFileAttributes getFileAttributes() throws UserException {
+        return new TFileAttributes();
+    }
+
+    public ExternalCatalog getCatalog() {
+        return paimonExtTable.getCatalog();
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java
new file mode 100644
index 0000000000..e36740f0fd
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external.paimon;
+
+import org.apache.doris.planner.external.FileSplit;
+import org.apache.doris.planner.external.TableFormatType;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.paimon.hive.mapred.PaimonInputSplit;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class PaimonSplit extends FileSplit {
+    private PaimonInputSplit split;
+    private TableFormatType tableFormatType;
+
+    public PaimonSplit(PaimonInputSplit split, String path) {
+        super(new Path(path), 0, 0, 0, null, null);
+        this.split = split;
+    }
+
+    public PaimonInputSplit getSplit() {
+        return split;
+    }
+
+    public void setSplit(PaimonInputSplit split) {
+        this.split = split;
+    }
+
+    public TableFormatType getTableFormatType() {
+        return tableFormatType;
+    }
+
+    public void setTableFormatType(TableFormatType tableFormatType) {
+        this.tableFormatType = tableFormatType;
+    }
+
+    public byte[] getSerializableSplit() {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream output = new DataOutputStream(baos);
+        try {
+            split.write(output);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return baos.toByteArray();
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java
index 35391191c7..ba22e067a8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java
@@ -51,6 +51,7 @@ public class DeriveFactory {
             case ES_SCAN_NODE:
             case HIVE_SCAN_NODE:
             case ICEBERG_SCAN_NODE:
+            case PAIMON_SCAN_NODE:
             case INTERSECT_NODE:
             case SCHEMA_SCAN_NODE:
             case STREAM_LOAD_SCAN_NODE:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
index 3a4a283c79..67dd9bb054 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
@@ -31,6 +31,7 @@ public enum StatisticalType {
     HASH_JOIN_NODE,
     HIVE_SCAN_NODE,
     ICEBERG_SCAN_NODE,
+    PAIMON_SCAN_NODE,
     HUDI_SCAN_NODE,
     TVF_SCAN_NODE,
     INTERSECT_NODE,
diff --git a/fe/java-udf/pom.xml b/fe/java-udf/pom.xml
index 66d0123795..bcebf3871b 100644
--- a/fe/java-udf/pom.xml
+++ b/fe/java-udf/pom.xml
@@ -37,6 +37,7 @@ under the License.
         <presto.hadoop.version>2.7.4-11</presto.hadoop.version>
         <presto.hive.version>3.0.0-8</presto.hive.version>
         <hudi.version>0.12.2</hudi.version>
+        <paimon.version>0.4-SNAPSHOT</paimon.version>
     </properties>
 
     <dependencies>
@@ -50,6 +51,21 @@ under the License.
             <artifactId>fe-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-bundle</artifactId>
+            <version>${paimon.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-hive-connector-2.3</artifactId>
+            <version>${paimon.version}</version>
+        </dependency>
+        <dependency>
+            <artifactId>hive-common</artifactId>
+            <groupId>org.apache.hive</groupId>
+            <version>2.3.9</version>
+        </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-core</artifactId>
@@ -162,10 +178,7 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
-        <dependency>
-            <groupId>org.apache.doris</groupId>
-            <artifactId>hive-catalog-shade</artifactId>
-        </dependency>
+
     </dependencies>
     <build>
         <finalName>java-udf</finalName>
diff --git a/fe/java-udf/src/main/java/org/apache/doris/jni/PaimonJniScanner.java b/fe/java-udf/src/main/java/org/apache/doris/jni/PaimonJniScanner.java
new file mode 100644
index 0000000000..03c8b6564e
--- /dev/null
+++ b/fe/java-udf/src/main/java/org/apache/doris/jni/PaimonJniScanner.java
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jni;
+
+import org.apache.doris.jni.utils.OffHeap;
+import org.apache.doris.jni.vec.ColumnType;
+import org.apache.doris.jni.vec.PaimonColumnValue;
+import org.apache.doris.jni.vec.ScanPredicate;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.log4j.Logger;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.columnar.ColumnarRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.hive.HiveCatalogOptions;
+import org.apache.paimon.hive.mapred.PaimonInputSplit;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableRead;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Map;
+
+
+public class PaimonJniScanner extends JniScanner {
+    private static final Logger LOG = Logger.getLogger(PaimonJniScanner.class);
+
+    private final String metastoreUris;
+    private final String warehouse;
+    private final String dbName;
+    private final String tblName;
+    private final String[] ids;
+    private final long splitAddress;
+    private final int lengthByte;
+    private PaimonInputSplit paimonInputSplit;
+    private Table table;
+    private RecordReader<InternalRow> reader;
+    private final PaimonColumnValue columnValue = new PaimonColumnValue();
+
+    public PaimonJniScanner(int batchSize, Map<String, String> params) {
+        metastoreUris = params.get("hive.metastore.uris");
+        warehouse = params.get("warehouse");
+        splitAddress = Long.parseLong(params.get("split_byte"));
+        lengthByte = Integer.parseInt(params.get("length_byte"));
+        LOG.info("splitAddress:" + splitAddress);
+        LOG.info("lengthByte:" + lengthByte);
+        dbName = params.get("db_name");
+        tblName = params.get("table_name");
+        String[] requiredFields = params.get("required_fields").split(",");
+        String[] types = params.get("columns_types").split(",");
+        ids = params.get("columns_id").split(",");
+        ColumnType[] columnTypes = new ColumnType[types.length];
+        for (int i = 0; i < types.length; i++) {
+            columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]);
+        }
+        ScanPredicate[] predicates = new ScanPredicate[0];
+        if (params.containsKey("push_down_predicates")) {
+            long predicatesAddress = Long.parseLong(params.get("push_down_predicates"));
+            if (predicatesAddress != 0) {
+                predicates = ScanPredicate.parseScanPredicates(predicatesAddress, columnTypes);
+                LOG.info("MockJniScanner gets pushed-down predicates:  " + ScanPredicate.dump(predicates));
+            }
+        }
+        initTableInfo(columnTypes, requiredFields, predicates, batchSize);
+    }
+
+    @Override
+    public void open() throws IOException {
+        getCatalog();
+        // deserialize it into split
+        byte[] splitByte = new byte[lengthByte];
+        OffHeap.copyMemory(null, splitAddress, splitByte, OffHeap.BYTE_ARRAY_OFFSET, lengthByte);
+        ByteArrayInputStream bais = new ByteArrayInputStream(splitByte);
+        DataInputStream input = new DataInputStream(bais);
+        try {
+            paimonInputSplit.readFields(input);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        ReadBuilder readBuilder = table.newReadBuilder();
+        TableRead read = readBuilder.newRead();
+        reader = read.createReader(paimonInputSplit.split());
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+
+    @Override
+    protected int getNext() throws IOException {
+        int rows = 0;
+        try {
+            RecordReader.RecordIterator batch;
+            while ((batch = reader.readBatch()) != null) {
+                Object record;
+                while ((record = batch.next()) != null) {
+                    columnValue.setOffsetRow((ColumnarRow) record);
+                    for (int i = 0; i < ids.length; i++) {
+                        columnValue.setIdx(Integer.parseInt(ids[i]));
+                        appendData(i, columnValue);
+                    }
+                    rows++;
+                }
+                batch.releaseBatch();
+            }
+        } catch (IOException e) {
+            LOG.warn("failed to getNext columnValue ", e);
+            throw new RuntimeException(e);
+        }
+        return rows;
+    }
+
+    private Catalog create(CatalogContext context) throws IOException {
+        Path warehousePath = new Path(context.options().get(CatalogOptions.WAREHOUSE));
+        FileIO fileIO;
+        fileIO = FileIO.get(warehousePath, context);
+        String uri = context.options().get(CatalogOptions.URI);
+        String hiveConfDir = context.options().get(HiveCatalogOptions.HIVE_CONF_DIR);
+        String hadoopConfDir = context.options().get(HiveCatalogOptions.HADOOP_CONF_DIR);
+        HiveConf hiveConf = HiveCatalog.createHiveConf(hiveConfDir, hadoopConfDir);
+
+        // always using user-set parameters overwrite hive-site.xml parameters
+        context.options().toMap().forEach(hiveConf::set);
+        hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, uri);
+        // set the warehouse location to the hiveConf
+        hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, context.options().get(CatalogOptions.WAREHOUSE));
+
+        String clientClassName = context.options().get(METASTORE_CLIENT_CLASS);
+
+        return new HiveCatalog(fileIO, hiveConf, clientClassName, context.options().toMap());
+    }
+
+    private void getCatalog() {
+        paimonInputSplit = new PaimonInputSplit();
+        Options options = new Options();
+        options.set("warehouse", warehouse);
+        // Currently, only supports hive
+        options.set("metastore", "hive");
+        options.set("uri", metastoreUris);
+        CatalogContext context = CatalogContext.create(options);
+        try {
+            Catalog catalog = create(context);
+            table = catalog.getTable(Identifier.create(dbName, tblName));
+        } catch (IOException | Catalog.TableNotExistException e) {
+            LOG.warn("failed to create paimon external catalog ", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static final ConfigOption<String> METASTORE_CLIENT_CLASS =
+            ConfigOptions.key("metastore.client.class")
+            .stringType()
+            .defaultValue("org.apache.hadoop.hive.metastore.HiveMetaStoreClient")
+            .withDescription(
+                "Class name of Hive metastore client.\n"
+                    + "NOTE: This class must directly implements "
+                    + "org.apache.hadoop.hive.metastore.IMetaStoreClient.");
+}
diff --git a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/PaimonColumnValue.java b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/PaimonColumnValue.java
new file mode 100644
index 0000000000..3d8bb5e42e
--- /dev/null
+++ b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/PaimonColumnValue.java
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jni.vec;
+
+import org.apache.paimon.data.columnar.ColumnarRow;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.List;
+
+public class PaimonColumnValue implements ColumnValue {
+    private int idx;
+    private ColumnarRow record;
+
+    public PaimonColumnValue() {
+    }
+
+    public void setIdx(int idx) {
+        this.idx = idx;
+    }
+
+    public void setOffsetRow(ColumnarRow record) {
+        this.record = record;
+    }
+
+    @Override
+    public boolean getBoolean() {
+        return record.getBoolean(idx);
+    }
+
+    @Override
+    public byte getByte() {
+        return record.getByte(idx);
+    }
+
+    @Override
+    public short getShort() {
+        return record.getShort(idx);
+    }
+
+    @Override
+    public int getInt() {
+        return record.getInt(idx);
+    }
+
+    @Override
+    public float getFloat() {
+        return record.getFloat(idx);
+    }
+
+    @Override
+    public long getLong() {
+        return record.getLong(idx);
+    }
+
+    @Override
+    public double getDouble() {
+        return record.getDouble(idx);
+    }
+
+    @Override
+    public BigInteger getBigInteger() {
+        return BigInteger.valueOf(record.getInt(idx));
+    }
+
+    @Override
+    public BigDecimal getDecimal() {
+        return BigDecimal.valueOf(getDouble());
+    }
+
+    @Override
+    public String getString() {
+        return record.getString(idx).toString();
+    }
+
+    @Override
+    public LocalDate getDate() {
+        return Instant.ofEpochMilli(record.getTimestamp(idx, 3)
+                .getMillisecond()).atZone(ZoneOffset.ofHours(8)).toLocalDate();
+    }
+
+    @Override
+    public LocalDateTime getDateTime() {
+        return Instant.ofEpochMilli(record.getTimestamp(idx, 3)
+            .getMillisecond()).atZone(ZoneOffset.ofHours(8)).toLocalDateTime();
+    }
+
+    @Override
+    public boolean isNull() {
+        return true;
+    }
+
+    @Override
+    public byte[] getBytes() {
+        return record.getBinary(idx);
+    }
+
+    @Override
+    public void unpackArray(List<ColumnValue> values) {
+
+    }
+
+    @Override
+    public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
+
+    }
+
+    @Override
+    public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> values) {
+
+    }
+}
diff --git a/fe/pom.xml b/fe/pom.xml
index e2da4103d3..5cb5f36482 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -195,7 +195,7 @@ under the License.
         <doris.home>${fe.dir}/../</doris.home>
         <revision>1.2-SNAPSHOT</revision>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <doris.hive.catalog.shade.version>1.0.3-SNAPSHOT</doris.hive.catalog.shade.version>
+        <doris.hive.catalog.shade.version>1.0.4-SNAPSHOT</doris.hive.catalog.shade.version>
         <maven.compiler.source>1.8</maven.compiler.source>
         <maven.compiler.target>1.8</maven.compiler.target>
         <!--plugin parameters-->
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 9c98cd4d28..d68007e69a 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -186,8 +186,8 @@ struct TBrokerScanRangeParams {
     1: required i8 column_separator;
     2: required i8 line_delimiter;
 
-    // We construct one line in file to a tuple. And each field of line 
-    // correspond to a slot in this tuple. 
+    // We construct one line in file to a tuple. And each field of line
+    // correspond to a slot in this tuple.
     // src_tuple_id is the tuple id of the input file
     3: required Types.TTupleId src_tuple_id
     // src_slot_ids is the slot_ids of the input file
@@ -288,6 +288,19 @@ struct TIcebergFileDesc {
     5: optional Exprs.TExpr file_select_conjunct;
 }
 
+struct TPaimonFileDesc {
+    1: optional binary paimon_split
+    2: optional string paimon_column_ids
+    3: optional string paimon_column_types
+    4: optional string paimon_column_names
+    5: optional string hive_metastore_uris
+    6: optional string warehouse
+    7: optional string db_name
+    8: optional string table_name
+    9: optional string length_byte
+}
+
+
 struct THudiFileDesc {
     1: optional string basePath;
     2: optional string dataFilePath;
@@ -300,6 +313,7 @@ struct TTableFormatFileDesc {
     1: optional string table_format_type
     2: optional TIcebergFileDesc iceberg_params
     3: optional THudiFileDesc hudi_params
+    4: optional TPaimonFileDesc paimon_params
 }
 
 struct TFileScanRangeParams {
@@ -566,7 +580,7 @@ struct TSortInfo {
   4: optional list<Exprs.TExpr> sort_tuple_slot_exprs
 
   // Indicates the nullable info of sort_tuple_slot_exprs is changed after substitute by child's smap
-  5: optional list<bool> slot_exprs_nullability_changed_flags   
+  5: optional list<bool> slot_exprs_nullability_changed_flags
   // Indicates whether topn query using two phase read
   6: optional bool use_two_phase_read
 }
@@ -606,7 +620,7 @@ struct TEqJoinCondition {
   // right-hand side of "<a> = <b>"
   2: required Exprs.TExpr right;
   // operator of equal join
-  3: optional Opcodes.TExprOpcode opcode; 
+  3: optional Opcodes.TExprOpcode opcode;
 }
 
 enum TJoinOp {
@@ -709,7 +723,7 @@ enum TAggregationOp {
   DENSE_RANK,
   ROW_NUMBER,
   LAG,
-  HLL_C, 
+  HLL_C,
   BITMAP_UNION,
   NTILE,
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 10/13: [fix](olapscanner) fix coredump caused by concurrent acccess of olap scan node _conjuncts (#20534)

Posted by kx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 8b5d439102d1a11a5a2886bcc1d088455a86bb54
Author: TengJianPing <18...@users.noreply.github.com>
AuthorDate: Wed Jun 7 17:00:29 2023 +0800

    [fix](olapscanner) fix coredump caused by concurrent acccess of olap scan node _conjuncts (#20534)
    
    =3073084==ERROR: AddressSanitizer: heap-buffer-overflow on address 0x60601897db80 at pc 0x55b2c993666e bp 0x7d1fbbfb66b0 sp 0x7d1fbbfb66a8
    READ of size 8 at 0x60601897db80 thread T610 (_scanner_scan)
        #0 0x55b2c993666d in std::__shared_ptr<doris::vectorized::VExprContext, (__gnu_cxx::_Lock_policy)2>::get() const /mnt/disk2/tengjianping/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:1291:16
        #1 0x55b2dae86ec5 in doris::vectorized::VExprContext::clone(doris::RuntimeState*, std::shared_ptr<doris::vectorized::VExprContext>&) /mnt/disk2/tengjianping/doris-master/be/src/vec/exprs/vexpr_context.cpp:98:5
        #2 0x55b2e757b6d8 in doris::vectorized::VScanner::prepare(doris::RuntimeState*, std::vector<std::shared_ptr<doris::vectorized::VExprContext>, std::allocator<std::shared_ptr<doris::vectorized::VExprContext>>> const&) /mnt/disk2/tengjianping/doris-master/be/src/vec/exec/scan/vscanner.cpp:47:13
        #3 0x55b2e78e8155 in doris::vectorized::NewOlapScanner::init() /mnt/disk2/tengjianping/doris-master/be/src/vec/exec/scan/new_olap_scanner.cpp:109:5
        #4 0x55b2e7551c81 in doris::vectorized::ScannerScheduler::_scanner_scan(doris::vectorized::ScannerScheduler*, doris::vectorized::ScannerContext*, std::shared_ptr<doris::vectorized::VScanner>) /mnt/disk2/tengjianping/doris-master/be/src/vec/exec/scan/scanner_scheduler.cpp:279:27
        #5 0x55b2e7554d5e in doris::vectorized::ScannerScheduler::_schedule_scanners(doris::vectorized::ScannerContext*)::$_0::operator()() const::'lambda0'()::operator()() const /mnt/disk2/tengjianping/doris-master/be/src/vec/exec/scan/scanner_scheduler.cpp:202:31
        #6 0x55b2e7554c14 in void std::__invoke_impl<void, doris::vectorized::ScannerScheduler::_schedule_scanners(doris::vectorized::ScannerContext*)::$_0::operator()() const::'lambda0'()&>(std::__invoke_other, doris::vectorized::ScannerScheduler::_schedule_scanners(doris::vectorized::ScannerContext*)::$_0::operator()() const::'lambda0'()&) /mnt/disk2/tengjianping/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:61:14
        #7 0x55b2e7554bb4 in std::enable_if<is_invocable_r_v<void, doris::vectorized::ScannerScheduler::_schedule_scanners(doris::vectorized::ScannerContext*)::$_0::operator()() const::'lambda0'()&>, void>::type std::__invoke_r<void, doris::vectorized::ScannerScheduler::_schedule_scanners(doris::vectorized::ScannerContext*)::$_0::operator()() const::'lambda0'()&>(doris::vectorized::ScannerScheduler::_schedule_scanners(doris::vectorized::ScannerContext*)::$_0::operator()() const::'lambda0' [...]
        #8 0x55b2e7554a1c in std::_Function_handler<void (), doris::vectorized::ScannerScheduler::_schedule_scanners(doris::vectorized::ScannerContext*)::$_0::operator()() const::'lambda0'()>::_M_invoke(std::_Any_data const&) /mnt/disk2/tengjianping/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_function.h:291:9
        #9 0x55b2c80f2cd2 in std::function<void ()>::operator()() const /mnt/disk2/tengjianping/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_function.h:560:9
        #10 0x55b2e755f3e4 in doris::PriorityWorkStealingThreadPool::work_thread(int) /mnt/disk2/tengjianping/doris-master/be/src/util/priority_work_stealing_thread_pool.hpp:135:17
        #11 0x55b2e7563c72 in void std::__invoke_impl<void, void (doris::PriorityWorkStealingThreadPool::* const&)(int), doris::PriorityWorkStealingThreadPool*&, int&>(std::__invoke_memfun_deref, void (doris::PriorityWorkStealingThreadPool::* const&)(int), doris::PriorityWorkStealingThreadPool*&, int&) /mnt/disk2/tengjianping/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:74:14
        #12 0x55b2e7563b44 in std::__invoke_result<void (doris::PriorityWorkStealingThreadPool::* const&)(int), doris::PriorityWorkStealingThreadPool*&, int&>::type std::__invoke<void (doris::PriorityWorkStealingThreadPool::* const&)(int), doris::PriorityWorkStealingThreadPool*&, int&>(void (doris::PriorityWorkStealingThreadPool::* const&)(int), doris::PriorityWorkStealingThreadPool*&, int&) /mnt/disk2/tengjianping/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include [...]
        #13 0x55b2e7563b14 in decltype(std::__invoke((*this)._M_pmf, std::forward<doris::PriorityWorkStealingThreadPool*&>(fp), std::forward<int&>(fp))) std::_Mem_fn_base<void (doris::PriorityWorkStealingThreadPool::*)(int), true>::operator()<doris::PriorityWorkStealingThreadPool*&, int&>(doris::PriorityWorkStealingThreadPool*&, int&) const /mnt/disk2/tengjianping/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/functional:131:11
        #14 0x55b2e7563ae4 in void std::__invoke_impl<void, std::_Mem_fn<void (doris::PriorityWorkStealingThreadPool::*)(int)>&, doris::PriorityWorkStealingThreadPool*&, int&>(std::__invoke_other, std::_Mem_fn<void (doris::PriorityWorkStealingThreadPool::*)(int)>&, doris::PriorityWorkStealingThreadPool*&, int&) /mnt/disk2/tengjianping/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:61:14
        #15 0x55b2e7563a54 in std::enable_if<is_invocable_r_v<void, std::_Mem_fn<void (doris::PriorityWorkStealingThreadPool::*)(int)>&, doris::PriorityWorkStealingThreadPool*&, int&>, void>::type std::__invoke_r<void, std::_Mem_fn<void (doris::PriorityWorkStealingThreadPool::*)(int)>&, doris::PriorityWorkStealingThreadPool*&, int&>(std::_Mem_fn<void (doris::PriorityWorkStealingThreadPool::*)(int)>&, doris::PriorityWorkStealingThreadPool*&, int&) /mnt/disk2/tengjianping/local/ldb_toolchai [...]
        #16 0x55b2e75639c3 in void std::_Bind_result<void, std::_Mem_fn<void (doris::PriorityWorkStealingThreadPool::*)(int)> (doris::PriorityWorkStealingThreadPool*, int)>::__call<void, 0ul, 1ul>(std::tuple<>&&, std::_Index_tuple<0ul, 1ul>) /mnt/disk2/tengjianping/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/functional:570:11
        #17 0x55b2e756382d in void std::_Bind_result<void, std::_Mem_fn<void (doris::PriorityWorkStealingThreadPool::*)(int)> (doris::PriorityWorkStealingThreadPool*, int)>::operator()<>() /mnt/disk2/tengjianping/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/functional:629:17
        #18 0x55b2e7563744 in void std::__invoke_impl<void, std::_Bind_result<void, std::_Mem_fn<void (doris::PriorityWorkStealingThreadPool::*)(int)> (doris::PriorityWorkStealingThreadPool*, int)>>(std::__invoke_other, std::_Bind_result<void, std::_Mem_fn<void (doris::PriorityWorkStealingThreadPool::*)(int)> (doris::PriorityWorkStealingThreadPool*, int)>&&) /mnt/disk2/tengjianping/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:61:14
        #19 0x55b2e7563704 in std::__invoke_result<std::_Bind_result<void, std::_Mem_fn<void (doris::PriorityWorkStealingThreadPool::*)(int)> (doris::PriorityWorkStealingThreadPool*, int)>>::type std::__invoke<std::_Bind_result<void, std::_Mem_fn<void (doris::PriorityWorkStealingThreadPool::*)(int)> (doris::PriorityWorkStealingThreadPool*, int)>>(std::_Bind_result<void, std::_Mem_fn<void (doris::PriorityWorkStealingThreadPool::*)(int)> (doris::PriorityWorkStealingThreadPool*, int)>&&) /mn [...]
        #20 0x55b2e75636dc in void std::thread::_Invoker<std::tuple<std::_Bind_result<void, std::_Mem_fn<void (doris::PriorityWorkStealingThreadPool::*)(int)> (doris::PriorityWorkStealingThreadPool*, int)>>>::_M_invoke<0ul>(std::_Index_tuple<0ul>) /mnt/disk2/tengjianping/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:253:13
        #21 0x55b2e75636b4 in std::thread::_Invoker<std::tuple<std::_Bind_result<void, std::_Mem_fn<void (doris::PriorityWorkStealingThreadPool::*)(int)> (doris::PriorityWorkStealingThreadPool*, int)>>>::operator()() /mnt/disk2/tengjianping/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:260:11
        #22 0x55b2e7563638 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<std::_Bind_result<void, std::_Mem_fn<void (doris::PriorityWorkStealingThreadPool::*)(int)> (doris::PriorityWorkStealingThreadPool*, int)>>>>::_M_run() /mnt/disk2/tengjianping/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_thread.h:211:13
        #23 0x55b2eb41d0ef in execute_native_thread_routine /data/gcc-11.1.0/build/x86_64-pc-linux-gnu/libstdc++-v3/src/c++11/../../../../../libstdc++-v3/src/c++11/thread.cc:82:18
        #24 0x7f1dfd4e1179 in start_thread pthread_create.c
        #25 0x7f1dfdd7bdf2 in clone (/lib64/libc.so.6+0xfcdf2) (BuildId: 20ee73ce1b6ac38a52440bab82ec7e28f0f5c5b9)
---
 be/src/vec/exec/scan/new_olap_scan_node.cpp | 1 +
 be/src/vec/exec/scan/new_olap_scanner.cpp   | 5 ++++-
 be/src/vec/exec/scan/new_olap_scanner.h     | 2 ++
 3 files changed, 7 insertions(+), 1 deletion(-)

diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index a5a18a97dd..ac916bfcd1 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -513,6 +513,7 @@ Status NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
                 key_ranges, rs_readers, rs_reader_seg_offsets, _need_agg_finalize,
                 _scanner_profile.get());
 
+        RETURN_IF_ERROR(scanner->prepare(_state, _conjuncts));
         scanner->set_compound_filters(_compound_filters);
         scanners->push_back(scanner);
         return Status::OK();
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp
index f87c247ea9..3537f9b194 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -103,10 +103,13 @@ static std::string read_columns_to_string(TabletSchemaSPtr tablet_schema,
     return read_columns_string;
 }
 
+Status NewOlapScanner::prepare(RuntimeState* state, const VExprContextSPtrs& conjuncts) {
+    return VScanner::prepare(state, conjuncts);
+}
+
 Status NewOlapScanner::init() {
     _is_init = true;
     auto parent = static_cast<NewOlapScanNode*>(_parent);
-    RETURN_IF_ERROR(VScanner::prepare(_state, parent->_conjuncts));
 
     for (auto& ctx : parent->_common_expr_ctxs_push_down) {
         VExprContextSPtr context;
diff --git a/be/src/vec/exec/scan/new_olap_scanner.h b/be/src/vec/exec/scan/new_olap_scanner.h
index 29178c1285..28adc4d5fe 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.h
+++ b/be/src/vec/exec/scan/new_olap_scanner.h
@@ -65,6 +65,8 @@ public:
 
     Status close(RuntimeState* state) override;
 
+    Status prepare(RuntimeState* state, const VExprContextSPtrs& conjuncts);
+
     const std::string& scan_disk() const { return _tablet->data_dir()->path(); }
 
     void set_compound_filters(const std::vector<TCondition>& compound_filters);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org