You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/10/26 04:47:42 UTC

[doris] branch master updated: [Fix](multi-catalog)Fix partition external table query bug. (#13535)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 44c9163b3c [Fix](multi-catalog)Fix partition external table query bug. (#13535)
44c9163b3c is described below

commit 44c9163b3c3144701dfa1bc7b91f20fcedb4b510
Author: Jibing-Li <64...@users.noreply.github.com>
AuthorDate: Wed Oct 26 12:47:37 2022 +0800

    [Fix](multi-catalog)Fix partition external table query bug. (#13535)
    
    The index for external table columns from path is incorrect in new scanner. This is a fix for it.
    e.g. In the next query, nation and city columns are from path
    ```
    mysql> select nation, city, count(*) from parquet_two_part group by nation, city;
    +--------+------------+----------+
    | nation | city       | count(*) |
    +--------+------------+----------+
    | cn     | beijing    |  1199969 |
    | cn     | shanghai   |  1199771 |
    | jp     | tokyo      |   599715 |
    | rus    | moscow     |   600659 |
    | us     | chicago    |  1199805 |
    | us     | washington |  1201296 |
    +--------+------------+----------+
    6 rows in set (0.39 sec)
    ```
---
 .../exec/format/parquet/vparquet_group_reader.cpp  | 22 ++++++++++++++++++
 .../exec/format/parquet/vparquet_group_reader.h    |  4 ++++
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  5 +++--
 be/src/vec/exec/scan/vfile_scanner.cpp             | 26 +++++++++++++++++++---
 .../doris/planner/external/HiveScanProvider.java   |  9 +++++---
 gensrc/thrift/PlanNodes.thrift                     |  2 ++
 6 files changed, 60 insertions(+), 8 deletions(-)

diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index a46813cbf2..3b50d32ef3 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -30,6 +30,7 @@ RowGroupReader::RowGroupReader(doris::FileReader* file_reader,
           _read_columns(read_columns),
           _row_group_id(row_group_id),
           _row_group_meta(row_group),
+          _remaining_rows(row_group.num_rows),
           _ctz(ctz) {}
 
 RowGroupReader::~RowGroupReader() {
@@ -38,6 +39,10 @@ RowGroupReader::~RowGroupReader() {
 
 Status RowGroupReader::init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges,
                             std::unordered_map<int, tparquet::OffsetIndex>& col_offsets) {
+    if (_read_columns.size() == 0) {
+        // Query task that only select columns in path.
+        return Status::OK();
+    }
     const size_t MAX_GROUP_BUF_SIZE = config::parquet_rowgroup_max_buffer_mb << 20;
     const size_t MAX_COLUMN_BUF_SIZE = config::parquet_column_max_buffer_mb << 20;
     size_t max_buf_size = std::min(MAX_COLUMN_BUF_SIZE, MAX_GROUP_BUF_SIZE / _read_columns.size());
@@ -62,6 +67,10 @@ Status RowGroupReader::init(const FieldDescriptor& schema, std::vector<RowRange>
 
 Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_rows,
                                   bool* _batch_eof) {
+    // Process external table query task that select columns are all from path.
+    if (_read_columns.empty()) {
+        return _read_empty_batch(batch_size, read_rows, _batch_eof);
+    }
     size_t batch_read_rows = 0;
     bool has_eof = false;
     int col_idx = 0;
@@ -94,6 +103,19 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_
     return Status::OK();
 }
 
+Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, bool* _batch_eof) {
+    if (batch_size < _remaining_rows) {
+        *read_rows = batch_size;
+        _remaining_rows -= batch_size;
+        *_batch_eof = false;
+    } else {
+        *read_rows = _remaining_rows;
+        _remaining_rows = 0;
+        *_batch_eof = true;
+    }
+    return Status::OK();
+}
+
 ParquetColumnReader::Statistics RowGroupReader::statistics() {
     ParquetColumnReader::Statistics st;
     for (auto& reader : _column_readers) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index e1b54bb529..0a18514c39 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -35,12 +35,16 @@ public:
 
     ParquetColumnReader::Statistics statistics();
 
+private:
+    Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool* _batch_eof);
+
 private:
     doris::FileReader* _file_reader;
     std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _column_readers;
     const std::vector<ParquetReadColumn>& _read_columns;
     const int32_t _row_group_id;
     const tparquet::RowGroup& _row_group_meta;
+    int64_t _remaining_rows;
     int64_t _read_rows = 0;
     cctz::time_zone* _ctz;
 };
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 7ff57ac05b..dc33f321d7 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -143,6 +143,7 @@ Status ParquetReader::init_reader(
     _colname_to_value_range = colname_to_value_range;
     RETURN_IF_ERROR(_init_read_columns());
     RETURN_IF_ERROR(_init_row_group_readers());
+
     return Status::OK();
 }
 
@@ -156,12 +157,12 @@ Status ParquetReader::_init_read_columns() {
             _missing_cols.push_back(file_col_name);
         }
     }
+    // It is legal to get empty include_column_ids in query task.
     if (include_column_ids.empty()) {
-        return Status::InternalError("No columns found in parquet file");
+        return Status::OK();
     }
     // The same order as physical columns
     std::sort(include_column_ids.begin(), include_column_ids.end());
-    _read_columns.clear();
     for (int& parquet_col_id : include_column_ids) {
         _read_columns.emplace_back(parquet_col_id,
                                    _file_metadata->schema().get_column(parquet_col_id)->name);
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index c9ccfbf46c..3503e8c468 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -257,7 +257,6 @@ Status VFileScanner::_fill_columns_from_path(size_t rows) {
                 return Status::InternalError(ss.str());
             }
             const std::string& column_from_path = range.columns_from_path[it->second];
-
             auto doris_column = _src_block_ptr->get_by_name(slot_desc->col_name()).column;
             IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
 
@@ -528,12 +527,28 @@ Status VFileScanner::_init_expr_ctxes() {
 
     std::map<SlotId, int> full_src_index_map;
     std::map<SlotId, SlotDescriptor*> full_src_slot_map;
+    std::map<std::string, int> partition_name_to_key_index_map;
     int index = 0;
     for (const auto& slot_desc : _real_tuple_desc->slots()) {
         full_src_slot_map.emplace(slot_desc->id(), slot_desc);
         full_src_index_map.emplace(slot_desc->id(), index++);
     }
 
+    // For external table query, find the index of column in path.
+    // Because query doesn't always search for all columns in a table
+    // and the order of selected columns is random.
+    // All ranges in _ranges vector should have identical columns_from_path_keys
+    // because they are all file splits for the same external table.
+    // So here use the first element of _ranges to fill the partition_name_to_key_index_map
+    if (_ranges[0].__isset.columns_from_path_keys) {
+        std::vector<std::string> key_map = _ranges[0].columns_from_path_keys;
+        if (!key_map.empty()) {
+            for (size_t i = 0; i < key_map.size(); i++) {
+                partition_name_to_key_index_map.emplace(key_map[i], i);
+            }
+        }
+    }
+
     _num_of_columns_from_file = _params.num_of_columns_from_file;
     for (const auto& slot_info : _params.required_slots) {
         auto slot_id = slot_info.slot_id;
@@ -551,8 +566,13 @@ Status VFileScanner::_init_expr_ctxes() {
             _file_col_names.push_back(it->second->col_name());
         } else {
             _partition_slot_descs.emplace_back(it->second);
-            auto iti = full_src_index_map.find(slot_id);
-            _partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file);
+            if (_is_load) {
+                auto iti = full_src_index_map.find(slot_id);
+                _partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file);
+            } else {
+                auto kit = partition_name_to_key_index_map.find(it->second->col_name());
+                _partition_slot_index_map.emplace(slot_id, kit->second);
+            }
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
index 04916fb105..8de5b5d312 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
@@ -304,10 +304,11 @@ public class HiveScanProvider implements HMSTableScanProviderIf {
 
             for (InputSplit split : inputSplits) {
                 FileSplit fileSplit = (FileSplit) split;
+                List<String> pathPartitionKeys = getPathPartitionKeys();
                 List<String> partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
-                        getPathPartitionKeys(), false);
+                        pathPartitionKeys, false);
 
-                TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath);
+                TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys);
 
                 curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
                 LOG.info(
@@ -366,12 +367,14 @@ public class HiveScanProvider implements HMSTableScanProviderIf {
         return locations;
     }
 
-    private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List<String> columnsFromPath)
+    private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List<String> columnsFromPath,
+                                               List<String> columnsFromPathKeys)
             throws DdlException, MetaNotFoundException {
         TFileRangeDesc rangeDesc = new TFileRangeDesc();
         rangeDesc.setStartOffset(fileSplit.getStart());
         rangeDesc.setSize(fileSplit.getLength());
         rangeDesc.setColumnsFromPath(columnsFromPath);
+        rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys);
 
         if (getLocationType() == TFileType.FILE_HDFS) {
             rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 498e8d2082..6dd774ea4a 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -306,6 +306,8 @@ struct TFileRangeDesc {
     5: optional i64 file_size;
     // columns parsed from file path should be after the columns read from file
     6: optional list<string> columns_from_path;
+    // column names from file path, in the same order with columns_from_path
+    7: optional list<string> columns_from_path_keys;
 }
 
 // TFileScanRange represents a set of descriptions of a file and the rules for reading and converting it.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org