You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2023/01/15 03:16:50 UTC

[GitHub] [doris] morningman commented on a diff in pull request #15836: [feature wip](multi catalog)Support iceberg schema evolution.

morningman commented on code in PR #15836:
URL: https://github.com/apache/doris/pull/15836#discussion_r1070489499


##########
fe/fe-core/src/main/java/org/apache/doris/datasource/HMSUtil.java:
##########
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.catalog.HMSResource;
+import org.apache.doris.catalog.external.HMSExternalTable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.iceberg.catalog.TableIdentifier;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class HMSUtil {

Review Comment:
   We can add these method in `HiveMetaStoreClientHelper`



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java:
##########
@@ -162,6 +166,16 @@ public PooledHiveMetaStoreClient getClient() {
     public List<Column> getSchema(String dbName, String tblName) {
         makeSureInitialized();
         List<FieldSchema> schema = getClient().getSchema(dbName, tblName);
+        Optional<ExternalDatabase> db = getDb(dbName);
+        if (db.isPresent()) {

Review Comment:
   The above `List<FieldSchema> schema = getClient().getSchema(dbName, tblName);` can be done after this `if` block



##########
be/src/vec/exec/format/parquet/vparquet_reader.cpp:
##########
@@ -180,8 +215,19 @@ Status ParquetReader::init_reader(
         return Status::EndOfFile("Empty Parquet File");
     }
     auto schema_desc = _file_metadata->schema();
+    if (_schema_evolution) {
+        _gen_col_name_maps(_col_id_name_map);
+    }
     for (int i = 0; i < schema_desc.size(); ++i) {
-        _map_column.emplace(schema_desc.get_column(i)->name, i);
+        if (_schema_evolution) {
+            if (_file_col_to_table_col.find(schema_desc.get_column(i)->name) !=

Review Comment:
   Extract the `_file_col_to_table_col.find(schema_desc.get_column(i)->name)` so that we don't need to call it twice



##########
be/src/vec/exec/format/parquet/vparquet_reader.cpp:
##########
@@ -167,8 +169,41 @@ Status ParquetReader::_open_file() {
     return Status::OK();
 }
 
+Status ParquetReader::_gen_col_name_maps(
+        const std::unordered_map<int, std::string> _col_id_name_map) {
+    std::vector<tparquet::KeyValue> key_values = _t_metadata->key_value_metadata;
+    std::string schema;
+    for (int i = 0; i < key_values.size(); ++i) {
+        tparquet::KeyValue kv = key_values[i];
+        if (kv.key == "iceberg.schema") {
+            schema = kv.value;
+            rapidjson::Document json;
+            json.Parse(schema.c_str());
+
+            if (json.HasMember("fields")) {
+                rapidjson::Value& fields = json["fields"];
+                if (fields.IsArray()) {
+                    for (int j = 0; j < fields.Size(); j++) {
+                        rapidjson::Value& e = fields[j];
+                        rapidjson::Value& id = e["id"];
+                        rapidjson::Value& name = e["name"];
+                        auto iter = _col_id_name_map.find(id.GetInt());
+                        if (iter != _col_id_name_map.end()) {
+                            _table_col_to_file_col.emplace(iter->second, name.GetString());
+                            _file_col_to_table_col.emplace(name.GetString(), iter->second);
+                        }
+                    }
+                }
+            }
+            break;
+        }
+    }
+    return Status::OK();
+}
+
 Status ParquetReader::init_reader(
         const std::vector<std::string>& column_names,
+        const std::unordered_map<int, std::string> _col_id_name_map,

Review Comment:
   ```suggestion
           const std::unordered_map<int, std::string>& _col_id_name_map,
   ```



##########
be/src/vec/exec/format/parquet/vparquet_reader.cpp:
##########
@@ -167,8 +169,41 @@ Status ParquetReader::_open_file() {
     return Status::OK();
 }
 
+Status ParquetReader::_gen_col_name_maps(
+        const std::unordered_map<int, std::string> _col_id_name_map) {

Review Comment:
   Please add more comment for this logic



##########
be/src/vec/exec/format/parquet/vparquet_reader.cpp:
##########
@@ -357,6 +410,14 @@ Status ParquetReader::get_columns(std::unordered_map<std::string, TypeDescriptor
 }
 
 Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
+    for (int i = 0; i < block->columns(); i++) {

Review Comment:
   Add comment for this logic



##########
be/src/vec/exec/format/parquet/vparquet_reader.h:
##########
@@ -152,6 +148,7 @@ class ParquetReader : public GenericReader {
     void _init_bloom_filter();
     Status _process_bloom_filter(bool* filter_group);
     int64_t _get_column_start_offset(const tparquet::ColumnMetaData& column_init_column_readers);
+    Status _gen_col_name_maps(const std::unordered_map<int, std::string> _col_id_name_map);

Review Comment:
   ```suggestion
       Status _gen_col_name_maps(const std::unordered_map<int, std::string>& _col_id_name_map);
   ```



##########
be/src/vec/exec/format/parquet/vparquet_reader.cpp:
##########
@@ -167,8 +169,41 @@ Status ParquetReader::_open_file() {
     return Status::OK();
 }
 
+Status ParquetReader::_gen_col_name_maps(
+        const std::unordered_map<int, std::string> _col_id_name_map) {

Review Comment:
   ```suggestion
           const std::unordered_map<int, std::string>& _col_id_name_map) {
   ```



##########
be/src/vec/exec/format/parquet/vparquet_reader.cpp:
##########
@@ -167,8 +169,41 @@ Status ParquetReader::_open_file() {
     return Status::OK();
 }
 
+Status ParquetReader::_gen_col_name_maps(
+        const std::unordered_map<int, std::string> _col_id_name_map) {

Review Comment:
   And I think this logic should be done in `iceberg_reader`, not `parquet_reader`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org