You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/10 15:49:48 UTC

[doris] 04/04: [feature-wip](multi-catalog) add iceberg tvf to read snapshots (#15618)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 7e9222733a778cc695f5246db37eac71b49c9995
Author: slothever <18...@users.noreply.github.com>
AuthorDate: Tue Jan 10 22:37:35 2023 +0800

    [feature-wip](multi-catalog) add iceberg tvf to read snapshots (#15618)
    
    Support new table value function `iceberg_meta("table" = "ctl.db.tbl", "query_type" = "snapshots")`
    we can use the sql `select * from iceberg_meta("table" = "ctl.db.tbl", "query_type" = "snapshots")` to get snapshots info  of a table. The other iceberg metadata will be supported later when needed.
    
    One of the usage:
    
    Before we use following sql to time travel:
    `select * from ice_table FOR TIME AS OF "2022-10-10 11:11:11"`;
    `select * from ice_table FOR VERSION AS OF "snapshot_id"`;
    we can use the snapshots metadata to get the `committed time` or `snapshot_id`,
    and then, we can use it as the time or version in time travel clause
---
 be/src/exec/exec_node.cpp                          |  10 +-
 be/src/runtime/plan_fragment_executor.cpp          |   4 +-
 be/src/vec/CMakeLists.txt                          |   2 +
 be/src/vec/exec/scan/vmeta_scan_node.cpp           |  68 +++++++
 be/src/vec/exec/scan/vmeta_scan_node.h             |  45 +++++
 be/src/vec/exec/scan/vmeta_scanner.cpp             | 209 +++++++++++++++++++++
 be/src/vec/exec/scan/vmeta_scanner.h               |  48 +++++
 .../doris/planner/external/MetadataScanNode.java   | 114 +++++++++++
 .../apache/doris/service/FrontendServiceImpl.java  |  93 +++++++++
 .../apache/doris/statistics/StatisticalType.java   |   1 +
 .../tablefunction/IcebergTableValuedFunction.java  | 123 ++++++++++++
 .../tablefunction/MetadataTableValuedFunction.java |  46 +++++
 .../doris/tablefunction/TableValuedFunctionIf.java |   2 +
 gensrc/thrift/FrontendService.thrift               |   9 +
 gensrc/thrift/PlanNodes.thrift                     |  18 +-
 15 files changed, 787 insertions(+), 5 deletions(-)

diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index e4bf0b3b13..d804533e88 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -66,6 +66,7 @@
 #include "vec/exec/scan/new_jdbc_scan_node.h"
 #include "vec/exec/scan/new_odbc_scan_node.h"
 #include "vec/exec/scan/new_olap_scan_node.h"
+#include "vec/exec/scan/vmeta_scan_node.h"
 #include "vec/exec/vaggregation_node.h"
 #include "vec/exec/vanalytic_eval_node.h"
 #include "vec/exec/vassert_num_rows_node.h"
@@ -416,6 +417,7 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
         case TPlanNodeType::DATA_GEN_SCAN_NODE:
         case TPlanNodeType::FILE_SCAN_NODE:
         case TPlanNodeType::JDBC_SCAN_NODE:
+        case TPlanNodeType::META_SCAN_NODE:
             break;
         default: {
             const auto& i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
@@ -481,6 +483,10 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
         }
         return Status::OK();
 
+    case TPlanNodeType::META_SCAN_NODE:
+        *node = pool->add(new vectorized::VMetaScanNode(pool, tnode, descs));
+        return Status::OK();
+
     case TPlanNodeType::OLAP_SCAN_NODE:
         if (state->enable_vectorized_exec()) {
             *node = pool->add(new vectorized::NewOlapScanNode(pool, tnode, descs));
@@ -705,6 +711,7 @@ void ExecNode::collect_scan_nodes(vector<ExecNode*>* nodes) {
     collect_nodes(TPlanNodeType::ES_HTTP_SCAN_NODE, nodes);
     collect_nodes(TPlanNodeType::DATA_GEN_SCAN_NODE, nodes);
     collect_nodes(TPlanNodeType::FILE_SCAN_NODE, nodes);
+    collect_nodes(TPlanNodeType::META_SCAN_NODE, nodes);
 }
 
 void ExecNode::try_do_aggregate_serde_improve() {
@@ -728,7 +735,8 @@ void ExecNode::try_do_aggregate_serde_improve() {
         typeid(*child0) == typeid(vectorized::NewFileScanNode) ||
         typeid(*child0) == typeid(vectorized::NewOdbcScanNode) ||
         typeid(*child0) == typeid(vectorized::NewEsScanNode) ||
-        typeid(*child0) == typeid(vectorized::NewJdbcScanNode)) {
+        typeid(*child0) == typeid(vectorized::NewJdbcScanNode) ||
+        typeid(*child0) == typeid(vectorized::VMetaScanNode)) {
         vectorized::VScanNode* scan_node =
                 static_cast<vectorized::VScanNode*>(agg_node[0]->_children[0]);
         scan_node->set_no_agg_finalize();
diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp
index e901a738e1..b59cba4896 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -48,6 +48,7 @@
 #include "vec/exec/scan/new_jdbc_scan_node.h"
 #include "vec/exec/scan/new_odbc_scan_node.h"
 #include "vec/exec/scan/new_olap_scan_node.h"
+#include "vec/exec/scan/vmeta_scan_node.h"
 #include "vec/exec/vexchange_node.h"
 #include "vec/runtime/vdata_stream_mgr.h"
 
@@ -173,7 +174,8 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
             typeid(*node) == typeid(vectorized::NewFileScanNode) ||
             typeid(*node) == typeid(vectorized::NewOdbcScanNode) ||
             typeid(*node) == typeid(vectorized::NewEsScanNode) ||
-            typeid(*node) == typeid(vectorized::NewJdbcScanNode)) {
+            typeid(*node) == typeid(vectorized::NewJdbcScanNode) ||
+            typeid(*node) == typeid(vectorized::VMetaScanNode)) {
             vectorized::VScanNode* scan_node = static_cast<vectorized::VScanNode*>(scan_nodes[i]);
             const std::vector<TScanRangeParams>& scan_ranges =
                     find_with_default(params.per_node_scan_ranges, scan_node->id(), no_scan_ranges);
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 0e83f98167..b0ce39f80f 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -275,6 +275,8 @@ set(VEC_FILES
   exec/scan/new_jdbc_scan_node.cpp
   exec/scan/new_es_scanner.cpp
   exec/scan/new_es_scan_node.cpp
+  exec/scan/vmeta_scan_node.cpp
+  exec/scan/vmeta_scanner.cpp
   exec/format/csv/csv_reader.cpp
   exec/format/orc/vorc_reader.cpp
   exec/format/json/new_json_reader.cpp
diff --git a/be/src/vec/exec/scan/vmeta_scan_node.cpp b/be/src/vec/exec/scan/vmeta_scan_node.cpp
new file mode 100644
index 0000000000..ae7585ecf4
--- /dev/null
+++ b/be/src/vec/exec/scan/vmeta_scan_node.cpp
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vmeta_scan_node.h"
+
+#include "vmeta_scanner.h"
+
+namespace doris::vectorized {
+
+VMetaScanNode::VMetaScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
+        : VScanNode(pool, tnode, descs),
+          _tuple_id(tnode.meta_scan_node.tuple_id),
+          _scan_params(tnode.meta_scan_node) {
+    _output_tuple_id = _tuple_id;
+}
+
+Status VMetaScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
+    RETURN_IF_ERROR(VScanNode::init(tnode, state));
+    return Status::OK();
+}
+
+Status VMetaScanNode::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(VScanNode::prepare(state));
+    return Status::OK();
+}
+
+void VMetaScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
+    _scan_ranges = scan_ranges;
+}
+
+Status VMetaScanNode::_init_profile() {
+    RETURN_IF_ERROR(VScanNode::_init_profile());
+    return Status::OK();
+}
+
+Status VMetaScanNode::_init_scanners(std::list<VScanner*>* scanners) {
+    if (_eos == true) {
+        return Status::OK();
+    }
+    for (auto& scan_range : _scan_ranges) {
+        VMetaScanner* scanner =
+                new VMetaScanner(_state, this, _tuple_id, scan_range, _limit_per_scanner);
+        RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get()));
+        _scanner_pool.add(scanner);
+        scanners->push_back(static_cast<VScanner*>(scanner));
+    }
+    return Status::OK();
+}
+
+Status VMetaScanNode::_process_conjuncts() {
+    return Status::OK();
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vmeta_scan_node.h b/be/src/vec/exec/scan/vmeta_scan_node.h
new file mode 100644
index 0000000000..93774f89d1
--- /dev/null
+++ b/be/src/vec/exec/scan/vmeta_scan_node.h
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "runtime/runtime_state.h"
+#include "vec/exec/scan/vscan_node.h"
+
+namespace doris::vectorized {
+
+class VMetaScanNode : public VScanNode {
+public:
+    VMetaScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+    ~VMetaScanNode() override = default;
+
+    Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
+    Status prepare(RuntimeState* state) override;
+    void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
+    const TMetaScanNode& scan_params() { return _scan_params; };
+
+private:
+    Status _init_profile() override;
+    Status _init_scanners(std::list<VScanner*>* scanners) override;
+    Status _process_conjuncts() override;
+
+    TupleId _tuple_id;
+    TMetaScanNode _scan_params;
+    std::vector<TScanRangeParams> _scan_ranges;
+};
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp b/be/src/vec/exec/scan/vmeta_scanner.cpp
new file mode 100644
index 0000000000..c36d7877c0
--- /dev/null
+++ b/be/src/vec/exec/scan/vmeta_scanner.cpp
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vmeta_scanner.h"
+
+#include <gen_cpp/FrontendService_types.h>
+#include <gen_cpp/HeartbeatService_types.h>
+
+#include "gen_cpp/FrontendService.h"
+#include "runtime/client_cache.h"
+#include "util/thrift_rpc_helper.h"
+#include "vec/runtime/vdatetime_value.h"
+
+namespace doris::vectorized {
+
+VMetaScanner::VMetaScanner(RuntimeState* state, VMetaScanNode* parent, int64_t tuple_id,
+                           const TScanRangeParams& scan_range, int64_t limit)
+        : VScanner(state, static_cast<VScanNode*>(parent), limit),
+          _parent(parent),
+          _meta_eos(false),
+          _tuple_id(tuple_id),
+          _scan_range(scan_range.scan_range) {}
+
+Status VMetaScanner::open(RuntimeState* state) {
+    VLOG_CRITICAL << "VMetaScanner::open";
+    RETURN_IF_ERROR(VScanner::open(state));
+    return Status::OK();
+}
+
+Status VMetaScanner::prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr) {
+    VLOG_CRITICAL << "VMetaScanner::prepare";
+    if (vconjunct_ctx_ptr != nullptr) {
+        // Copy vconjunct_ctx_ptr from scan node to this scanner's _vconjunct_ctx.
+        RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(_state, &_vconjunct_ctx));
+    }
+    _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
+    if (_scan_range.meta_scan_range.__isset.iceberg_params) {
+        RETURN_IF_ERROR(_fetch_iceberg_metadata_batch());
+    } else {
+        _meta_eos = true;
+    }
+    return Status::OK();
+}
+
+Status VMetaScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
+    VLOG_CRITICAL << "VMetaScanner::_get_block_impl";
+    if (nullptr == state || nullptr == block || nullptr == eof) {
+        return Status::InternalError("input is NULL pointer");
+    }
+    if (_meta_eos == true) {
+        *eof = true;
+        return Status::OK();
+    }
+
+    auto column_size = _tuple_desc->slots().size();
+    std::vector<MutableColumnPtr> columns(column_size);
+    bool mem_reuse = block->mem_reuse();
+    do {
+        RETURN_IF_CANCELLED(state);
+
+        columns.resize(column_size);
+        for (auto i = 0; i < column_size; i++) {
+            if (mem_reuse) {
+                columns[i] = std::move(*block->get_by_position(i).column).mutate();
+            } else {
+                columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column();
+            }
+        }
+        // fill block
+        _fill_block_with_remote_data(columns);
+        if (_meta_eos == true) {
+            if (block->rows() == 0) {
+                *eof = true;
+            }
+            break;
+        }
+        // Before really use the Block, must clear other ptr of column in block
+        // So here need do std::move and clear in `columns`
+        if (!mem_reuse) {
+            int column_index = 0;
+            for (const auto slot_desc : _tuple_desc->slots()) {
+                block->insert(ColumnWithTypeAndName(std::move(columns[column_index++]),
+                                                    slot_desc->get_data_type_ptr(),
+                                                    slot_desc->col_name()));
+            }
+        } else {
+            columns.clear();
+        }
+        VLOG_ROW << "VMetaScanNode output rows: " << block->rows();
+    } while (block->rows() == 0 && !(*eof));
+    return Status::OK();
+}
+
+Status VMetaScanner::_fill_block_with_remote_data(const std::vector<MutableColumnPtr>& columns) {
+    VLOG_CRITICAL << "VMetaScanner::_fill_block_with_remote_data";
+    for (int col_idx = 0; col_idx < columns.size(); col_idx++) {
+        auto slot_desc = _tuple_desc->slots()[col_idx];
+        // because the fe planner filter the non_materialize column
+        if (!slot_desc->is_materialized()) {
+            continue;
+        }
+
+        for (int _row_idx = 0; _row_idx < _batch_data.size(); _row_idx++) {
+            vectorized::IColumn* col_ptr = columns[col_idx].get();
+            if (slot_desc->is_nullable() == true) {
+                auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(col_ptr);
+                col_ptr = &nullable_column->get_nested_column();
+            }
+            switch (slot_desc->type().type) {
+            case TYPE_INT: {
+                int64_t data = _batch_data[_row_idx].column_value[col_idx].intVal;
+                reinterpret_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr)
+                        ->insert_value(data);
+                break;
+            }
+            case TYPE_BIGINT: {
+                int64_t data = _batch_data[_row_idx].column_value[col_idx].longVal;
+                reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)
+                        ->insert_value(data);
+                break;
+            }
+            case TYPE_DATETIMEV2: {
+                uint64_t data = _batch_data[_row_idx].column_value[col_idx].longVal;
+                reinterpret_cast<vectorized::ColumnVector<vectorized::UInt64>*>(col_ptr)
+                        ->insert_value(data);
+                break;
+            }
+            case TYPE_STRING:
+            case TYPE_CHAR:
+            case TYPE_VARCHAR: {
+                std::string data = _batch_data[_row_idx].column_value[col_idx].stringVal;
+                reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(data.c_str(),
+                                                                                  data.length());
+                break;
+            }
+            default: {
+                std::string error_msg =
+                        fmt::format("Invalid column type {} on column: {}.",
+                                    slot_desc->type().debug_string(), slot_desc->col_name());
+                return Status::InternalError(std::string(error_msg));
+            }
+            }
+        }
+    }
+    _meta_eos = true;
+    return Status::OK();
+}
+
+Status VMetaScanner::_fetch_iceberg_metadata_batch() {
+    VLOG_CRITICAL << "VMetaScanner::_fetch_iceberg_metadata_batch";
+    TFetchSchemaTableDataRequest request;
+    request.cluster_name = "";
+    request.__isset.cluster_name = true;
+    request.schema_table_name = TSchemaTableName::ICEBERG_TABLE_META;
+    request.__isset.schema_table_name = true;
+    auto scan_params = _parent->scan_params();
+    TMetadataTableRequestParams meta_table_params = TMetadataTableRequestParams();
+    meta_table_params.catalog = scan_params.catalog;
+    meta_table_params.__isset.catalog = true;
+    meta_table_params.database = scan_params.database;
+    meta_table_params.__isset.database = true;
+    meta_table_params.table = scan_params.table;
+    meta_table_params.__isset.table = true;
+
+    meta_table_params.iceberg_metadata_params = _scan_range.meta_scan_range.iceberg_params;
+    meta_table_params.__isset.iceberg_metadata_params = true;
+
+    request.metada_table_params = meta_table_params;
+    request.__isset.metada_table_params = true;
+
+    TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address;
+    TFetchSchemaTableDataResult result;
+
+    RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
+            master_addr.hostname, master_addr.port,
+            [&request, &result](FrontendServiceConnection& client) {
+                client->fetchSchemaTableData(result, request);
+            },
+            config::txn_commit_rpc_timeout_ms));
+
+    Status status(result.status);
+    if (!status.ok()) {
+        LOG(WARNING) << "fetch schema table data from master failed, errmsg=" << status;
+        return status;
+    }
+    _batch_data = std::move(result.data_batch);
+    return Status::OK();
+}
+
+Status VMetaScanner::close(RuntimeState* state) {
+    VLOG_CRITICAL << "VMetaScanner::close";
+    RETURN_IF_ERROR(VScanner::close(state));
+    return Status::OK();
+}
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vmeta_scanner.h b/be/src/vec/exec/scan/vmeta_scanner.h
new file mode 100644
index 0000000000..7663ffe1c8
--- /dev/null
+++ b/be/src/vec/exec/scan/vmeta_scanner.h
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "runtime/runtime_state.h"
+#include "vec/exec/scan/vscanner.h"
+#include "vmeta_scan_node.h"
+
+namespace doris::vectorized {
+
+class VMetaScanner : public VScanner {
+public:
+    VMetaScanner(RuntimeState* state, VMetaScanNode* parent, int64_t tuple_id,
+                 const TScanRangeParams& scan_range, int64_t limit);
+
+    Status open(RuntimeState* state) override;
+    Status close(RuntimeState* state) override;
+    Status prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr);
+
+protected:
+    Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override;
+    Status _fill_block_with_remote_data(const std::vector<MutableColumnPtr>& columns);
+    Status _fetch_iceberg_metadata_batch();
+
+private:
+    VMetaScanNode* _parent;
+    bool _meta_eos;
+    TupleId _tuple_id;
+    const TupleDescriptor* _tuple_desc;
+    std::vector<TRow> _batch_data;
+    const TScanRange& _scan_range;
+};
+} // namespace doris::vectorized
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java
new file mode 100644
index 0000000000..431a456646
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.common.UserException;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanNode;
+import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.system.Backend;
+import org.apache.doris.tablefunction.IcebergTableValuedFunction;
+import org.apache.doris.tablefunction.MetadataTableValuedFunction;
+import org.apache.doris.thrift.TIcebergMetadataParams;
+import org.apache.doris.thrift.TIcebergMetadataType;
+import org.apache.doris.thrift.TMetaScanNode;
+import org.apache.doris.thrift.TMetaScanRange;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPlanNode;
+import org.apache.doris.thrift.TPlanNodeType;
+import org.apache.doris.thrift.TScanRange;
+import org.apache.doris.thrift.TScanRangeLocation;
+import org.apache.doris.thrift.TScanRangeLocations;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+public class MetadataScanNode extends ScanNode {
+
+    private MetadataTableValuedFunction tvf;
+
+    private List<TScanRangeLocations> scanRangeLocations = Lists.newArrayList();
+
+    private final BackendPolicy backendPolicy = new BackendPolicy();
+
+    public MetadataScanNode(PlanNodeId id, TupleDescriptor desc, MetadataTableValuedFunction tvf) {
+        super(id, desc, "METADATA_SCAN_NODE", StatisticalType.METADATA_SCAN_NODE);
+        this.tvf = tvf;
+    }
+
+    @Override
+    public void init(Analyzer analyzer) throws UserException {
+        super.init(analyzer);
+        backendPolicy.init();
+    }
+
+    @Override
+    protected void toThrift(TPlanNode planNode) {
+        planNode.setNodeType(TPlanNodeType.META_SCAN_NODE);
+        TMetaScanNode metaScanNode = new TMetaScanNode();
+        metaScanNode.setCatalog(tvf.getMetadataTableName().getCtl());
+        metaScanNode.setDatabase(tvf.getMetadataTableName().getDb());
+        metaScanNode.setTable(tvf.getMetadataTableName().getTbl());
+        metaScanNode.setTupleId(desc.getId().asInt());
+        planNode.setMetaScanNode(metaScanNode);
+    }
+
+    @Override
+    public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength) {
+        return scanRangeLocations;
+    }
+
+    @Override
+    public void finalize(Analyzer analyzer) throws UserException {
+        buildScanRanges();
+    }
+
+    private void buildScanRanges() {
+        if (tvf.getMetaType() == MetadataTableValuedFunction.MetaType.ICEBERG) {
+            IcebergTableValuedFunction icebergTvf = (IcebergTableValuedFunction) tvf;
+            // todo: split
+            TScanRangeLocations locations = createIcebergTvfLocations(icebergTvf);
+            scanRangeLocations.add(locations);
+        }
+    }
+
+    private TScanRangeLocations createIcebergTvfLocations(IcebergTableValuedFunction icebergTvf) {
+        TScanRange scanRange = new TScanRange();
+        TMetaScanRange metaScanRange = new TMetaScanRange();
+        // set iceberg metadata params
+        TIcebergMetadataParams icebergMetadataParams = new TIcebergMetadataParams();
+        int metadataType = icebergTvf.getMetaQueryType().ordinal();
+        icebergMetadataParams.setMetadataType(TIcebergMetadataType.findByValue(metadataType));
+
+        metaScanRange.setIcebergParams(icebergMetadataParams);
+        scanRange.setMetaScanRange(metaScanRange);
+        // set location
+        TScanRangeLocation location = new TScanRangeLocation();
+        Backend backend = backendPolicy.getNextBe();
+        location.setBackendId(backend.getId());
+        location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort()));
+
+        TScanRangeLocations result = new TScanRangeLocations();
+        result.addToLocations(location);
+        result.setScanRange(scanRange);
+        return result;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 71784eae21..4c42ca5968 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -24,6 +24,7 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.HMSResource;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.S3Resource;
 import org.apache.doris.catalog.Table;
@@ -47,6 +48,7 @@ import org.apache.doris.common.Version;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.HMSExternalCatalog;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.master.MasterImpl;
 import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -85,6 +87,7 @@ import org.apache.doris.thrift.TGetStoragePolicy;
 import org.apache.doris.thrift.TGetStoragePolicyResult;
 import org.apache.doris.thrift.TGetTablesParams;
 import org.apache.doris.thrift.TGetTablesResult;
+import org.apache.doris.thrift.TIcebergMetadataType;
 import org.apache.doris.thrift.TInitExternalCtlMetaRequest;
 import org.apache.doris.thrift.TInitExternalCtlMetaResult;
 import org.apache.doris.thrift.TListPrivilegesResult;
@@ -100,6 +103,7 @@ import org.apache.doris.thrift.TLoadTxnRollbackResult;
 import org.apache.doris.thrift.TMasterOpRequest;
 import org.apache.doris.thrift.TMasterOpResult;
 import org.apache.doris.thrift.TMasterResult;
+import org.apache.doris.thrift.TMetadataTableRequestParams;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPrivilegeStatus;
 import org.apache.doris.thrift.TReportExecStatusParams;
@@ -131,11 +135,19 @@ import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.gson.Gson;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
+import org.jetbrains.annotations.NotNull;
 
+import java.time.Instant;
+import java.time.LocalDateTime;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -997,6 +1009,8 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         switch (request.getSchemaTableName()) {
             case BACKENDS:
                 return getBackendsSchemaTable(request);
+            case ICEBERG_TABLE_META:
+                return getIcebergMetadataTable(request);
             default:
                 break;
         }
@@ -1005,6 +1019,85 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         return result;
     }
 
+    private TFetchSchemaTableDataResult getIcebergMetadataTable(TFetchSchemaTableDataRequest request) {
+        if (!request.isSetMetadaTableParams()) {
+            return errorResult("Metadata table params is not set. ");
+        }
+        TMetadataTableRequestParams params = request.getMetadaTableParams();
+        if (!params.isSetIcebergMetadataParams()) {
+            return errorResult("Iceberg metadata params is not set. ");
+        }
+
+        HMSExternalCatalog catalog = (HMSExternalCatalog) Env.getCurrentEnv().getCatalogMgr()
+                .getCatalog(params.getCatalog());
+        org.apache.iceberg.Table table;
+        try {
+            table = getIcebergTable(catalog, params.getDatabase(), params.getTable());
+        } catch (MetaNotFoundException e) {
+            return errorResult(e.getMessage());
+        }
+        TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
+        List<TRow> dataBatch = Lists.newArrayList();
+        TIcebergMetadataType metadataType = params.getIcebergMetadataParams().getMetadataType();
+        switch (metadataType) {
+            case SNAPSHOTS:
+                for (Snapshot snapshot : table.snapshots()) {
+                    TRow trow = new TRow();
+                    LocalDateTime committedAt = LocalDateTime.ofInstant(Instant.ofEpochMilli(
+                            snapshot.timestampMillis()), TimeUtils.getTimeZone().toZoneId());
+                    long encodedDatetime = convertToDateTimeV2(committedAt.getYear(), committedAt.getMonthValue(),
+                            committedAt.getDayOfMonth(), committedAt.getHour(),
+                            committedAt.getMinute(), committedAt.getSecond());
+
+                    trow.addToColumnValue(new TCell().setLongVal(encodedDatetime));
+                    trow.addToColumnValue(new TCell().setLongVal(snapshot.snapshotId()));
+                    if (snapshot.parentId() == null) {
+                        trow.addToColumnValue(new TCell().setLongVal(-1L));
+                    } else {
+                        trow.addToColumnValue(new TCell().setLongVal(snapshot.parentId()));
+                    }
+                    trow.addToColumnValue(new TCell().setStringVal(snapshot.operation()));
+                    trow.addToColumnValue(new TCell().setStringVal(snapshot.manifestListLocation()));
+                    dataBatch.add(trow);
+                }
+                break;
+            default:
+                return errorResult("Unsupported metadata inspect type: " + metadataType);
+        }
+        result.setDataBatch(dataBatch);
+        result.setStatus(new TStatus(TStatusCode.OK));
+        return result;
+    }
+
+    public static long convertToDateTimeV2(int year, int month, int day, int hour, int minute, int second) {
+        return (long) second << 20 | (long) minute << 26 | (long) hour << 32
+            | (long) day << 37 | (long) month << 42 | (long) year << 46;
+    }
+
+    @NotNull
+    private TFetchSchemaTableDataResult errorResult(String msg) {
+        TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
+        result.setStatus(new TStatus(TStatusCode.INTERNAL_ERROR));
+        result.status.addToErrorMsgs(msg);
+        return result;
+    }
+
+    private org.apache.iceberg.Table getIcebergTable(HMSExternalCatalog catalog, String db, String tbl)
+                throws MetaNotFoundException {
+        org.apache.iceberg.hive.HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog();
+        Configuration conf = new HdfsConfiguration();
+        Map<String, String> properties = catalog.getCatalogProperty().getHadoopProperties();
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            conf.set(entry.getKey(), entry.getValue());
+        }
+        hiveCatalog.setConf(conf);
+        Map<String, String> catalogProperties = new HashMap<>();
+        catalogProperties.put(HMSResource.HIVE_METASTORE_URIS, catalog.getHiveMetastoreUris());
+        catalogProperties.put("uri", catalog.getHiveMetastoreUris());
+        hiveCatalog.initialize("hive", catalogProperties);
+        return hiveCatalog.loadTable(TableIdentifier.of(db, tbl));
+    }
+
     private TFetchSchemaTableDataResult getBackendsSchemaTable(TFetchSchemaTableDataRequest request) {
         final SystemInfoService clusterInfoService = Env.getCurrentSystemInfo();
         TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
index ebdfd0471e..47ad4a6161 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
@@ -46,5 +46,6 @@ public enum StatisticalType {
     UNION_NODE,
     TABLE_VALUED_FUNCTION_NODE,
     FILE_SCAN_NODE,
+    METADATA_SCAN_NODE,
     JDBC_SCAN_NODE,
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
new file mode 100644
index 0000000000..4e5aa7dff0
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.TableName;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Implement of table valued function
+ * iceberg_meta("table" = "ctl.db.tbl", "query_type" = "snapshots").
+ */
+public class IcebergTableValuedFunction extends MetadataTableValuedFunction {
+
+    public enum MetadataType { SNAPSHOTS }
+
+    public static final String NAME = "iceberg_meta";
+    private static final String TABLE = "table";
+    private static final String QUERY_TYPE = "query_type";
+
+    private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
+            .add(TABLE)
+            .add(QUERY_TYPE)
+            .build();
+
+    private final MetadataType queryType;
+    private final TableName tableName;
+
+    public IcebergTableValuedFunction(Map<String, String> params) throws AnalysisException {
+        super(MetaType.ICEBERG);
+        Map<String, String> validParams = Maps.newHashMap();
+        for (String key : params.keySet()) {
+            if (!PROPERTIES_SET.contains(key.toLowerCase())) {
+                throw new AnalysisException("'" + key + "' is invalid property");
+            }
+            // check ctl db tbl
+            validParams.put(key.toLowerCase(), params.get(key));
+        }
+        String tableName = validParams.get(TABLE);
+        String queryType = validParams.get(QUERY_TYPE);
+        if (tableName == null || queryType == null) {
+            throw new AnalysisException("Invalid iceberg metadata query");
+        }
+        String[] names = tableName.split("\\.");
+        if (names.length != 3) {
+            throw new AnalysisException("The iceberg table name contains the catalogName, databaseName, and tableName");
+        }
+        this.tableName = new TableName(names[0], names[1], names[2]);
+        // check auth
+        if (!Env.getCurrentEnv().getAuth().checkTblPriv(ConnectContext.get(), this.tableName, PrivPredicate.SELECT)) {
+            ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "SELECT",
+                    ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(),
+                    this.tableName.getDb() + ": " + this.tableName.getTbl());
+        }
+        try {
+            this.queryType = MetadataType.valueOf(queryType.toUpperCase());
+        } catch (IllegalArgumentException e) {
+            throw new AnalysisException("Unsupported iceberg metadata query type: " + queryType);
+        }
+    }
+
+    @Override
+    public String getTableName() {
+        return "IcebergMetadataTableValuedFunction";
+    }
+
+    public TableName getMetadataTableName() {
+        return tableName;
+    }
+
+    public MetadataType getMetaQueryType() {
+        return queryType;
+    }
+
+    /**
+     * The tvf can register columns of metadata table
+     * The data is provided by getIcebergMetadataTable in FrontendService
+     * @see org.apache.doris.service.FrontendServiceImpl
+     * @return metadata columns
+     */
+    @Override
+    public List<Column> getTableColumns() throws AnalysisException {
+        List<Column> resColumns = new ArrayList<>();
+        if (queryType == MetadataType.SNAPSHOTS) {
+            resColumns.add(new Column("committed_at", PrimitiveType.DATETIMEV2, false));
+            resColumns.add(new Column("snapshot_id", PrimitiveType.BIGINT, false));
+            resColumns.add(new Column("parent_id", PrimitiveType.BIGINT, false));
+            resColumns.add(new Column("operation", PrimitiveType.STRING, false));
+            // todo: compress manifest_list string
+            resColumns.add(new Column("manifest_list", PrimitiveType.STRING, false));
+            // resColumns.add(new Column("summary", PrimitiveType.MAP, false));
+        }
+        return resColumns;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
new file mode 100644
index 0000000000..fd83c59957
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.TableName;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanNode;
+import org.apache.doris.planner.external.MetadataScanNode;
+
+public abstract class MetadataTableValuedFunction extends TableValuedFunctionIf {
+
+    public enum MetaType { ICEBERG }
+
+    private final MetaType metaType;
+
+    public MetadataTableValuedFunction(MetaType metaType) {
+        this.metaType = metaType;
+    }
+
+    public MetaType getMetaType() {
+        return metaType;
+    }
+
+    public abstract TableName getMetadataTableName();
+
+    @Override
+    public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) {
+        return new MetadataScanNode(id, desc, this);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
index 862b986e97..fdd9e106fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
@@ -49,6 +49,8 @@ public abstract class TableValuedFunctionIf {
                 return new S3TableValuedFunction(params);
             case HdfsTableValuedFunction.NAME:
                 return new HdfsTableValuedFunction(params);
+            case IcebergTableValuedFunction.NAME:
+                return new IcebergTableValuedFunction(params);
             default:
                 throw new AnalysisException("Could not find table function " + funcName);
         }
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index 4c7d062727..32c73a5663 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -694,11 +694,20 @@ struct TInitExternalCtlMetaResult {
 
 enum TSchemaTableName{
   BACKENDS = 0,
+  ICEBERG_TABLE_META = 1,
+}
+
+struct TMetadataTableRequestParams {
+  1: optional PlanNodes.TIcebergMetadataParams iceberg_metadata_params
+  2: optional string catalog
+  3: optional string database
+  4: optional string table
 }
 
 struct TFetchSchemaTableDataRequest {
   1: optional string cluster_name
   2: optional TSchemaTableName schema_table_name
+  3: optional TMetadataTableRequestParams metada_table_params
 }
 
 struct TFetchSchemaTableDataResult {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index ff97361c6e..35ebab6373 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -370,6 +370,18 @@ struct TDataGenScanRange {
   1: optional TTVFNumbersScanRange numbers_params
 }
 
+enum TIcebergMetadataType {
+  SNAPSHOTS = 0,
+}
+
+struct TIcebergMetadataParams {
+  1: optional TIcebergMetadataType metadata_type
+}
+
+struct TMetaScanRange {
+  1: optional TIcebergMetadataParams iceberg_params
+}
+
 // Specification of an individual data range which is held in its entirety
 // by a storage server
 struct TScanRange {
@@ -380,6 +392,7 @@ struct TScanRange {
   7: optional TEsScanRange es_scan_range
   8: optional TExternalScanRange ext_scan_range
   9: optional TDataGenScanRange data_gen_scan_range
+  10: optional TMetaScanRange meta_scan_range
 }
 
 struct TMySQLScanNode {
@@ -513,10 +526,9 @@ struct TSchemaScanNode {
 
 struct TMetaScanNode {
   1: required Types.TTupleId tuple_id
-  2: required string table_name
-  3: optional string db
+  2: optional string catalog
+  3: optional string database
   4: optional string table
-  5: optional string user
 }
 
 struct TSortInfo {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org