You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/10/26 04:47:42 UTC
[doris] branch master updated: [Fix](multi-catalog)Fix partition external table query bug. (#13535)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 44c9163b3c [Fix](multi-catalog)Fix partition external table query bug. (#13535)
44c9163b3c is described below
commit 44c9163b3c3144701dfa1bc7b91f20fcedb4b510
Author: Jibing-Li <64...@users.noreply.github.com>
AuthorDate: Wed Oct 26 12:47:37 2022 +0800
[Fix](multi-catalog)Fix partition external table query bug. (#13535)
The index for external table columns from path is incorrect in new scanner. This is a fix for it.
e.g. In the next query, nation and city columns are from path
```
mysql> select nation, city, count(*) from parquet_two_part group by nation, city;
+--------+------------+----------+
| nation | city | count(*) |
+--------+------------+----------+
| cn | beijing | 1199969 |
| cn | shanghai | 1199771 |
| jp | tokyo | 599715 |
| rus | moscow | 600659 |
| us | chicago | 1199805 |
| us | washington | 1201296 |
+--------+------------+----------+
6 rows in set (0.39 sec)
```
---
.../exec/format/parquet/vparquet_group_reader.cpp | 22 ++++++++++++++++++
.../exec/format/parquet/vparquet_group_reader.h | 4 ++++
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 5 +++--
be/src/vec/exec/scan/vfile_scanner.cpp | 26 +++++++++++++++++++---
.../doris/planner/external/HiveScanProvider.java | 9 +++++---
gensrc/thrift/PlanNodes.thrift | 2 ++
6 files changed, 60 insertions(+), 8 deletions(-)
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index a46813cbf2..3b50d32ef3 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -30,6 +30,7 @@ RowGroupReader::RowGroupReader(doris::FileReader* file_reader,
_read_columns(read_columns),
_row_group_id(row_group_id),
_row_group_meta(row_group),
+ _remaining_rows(row_group.num_rows),
_ctz(ctz) {}
RowGroupReader::~RowGroupReader() {
@@ -38,6 +39,10 @@ RowGroupReader::~RowGroupReader() {
Status RowGroupReader::init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges,
std::unordered_map<int, tparquet::OffsetIndex>& col_offsets) {
+ if (_read_columns.size() == 0) {
+ // Query task that only select columns in path.
+ return Status::OK();
+ }
const size_t MAX_GROUP_BUF_SIZE = config::parquet_rowgroup_max_buffer_mb << 20;
const size_t MAX_COLUMN_BUF_SIZE = config::parquet_column_max_buffer_mb << 20;
size_t max_buf_size = std::min(MAX_COLUMN_BUF_SIZE, MAX_GROUP_BUF_SIZE / _read_columns.size());
@@ -62,6 +67,10 @@ Status RowGroupReader::init(const FieldDescriptor& schema, std::vector<RowRange>
Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_rows,
bool* _batch_eof) {
+ // Process external table query task that select columns are all from path.
+ if (_read_columns.empty()) {
+ return _read_empty_batch(batch_size, read_rows, _batch_eof);
+ }
size_t batch_read_rows = 0;
bool has_eof = false;
int col_idx = 0;
@@ -94,6 +103,19 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_
return Status::OK();
}
+Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, bool* _batch_eof) {
+ if (batch_size < _remaining_rows) {
+ *read_rows = batch_size;
+ _remaining_rows -= batch_size;
+ *_batch_eof = false;
+ } else {
+ *read_rows = _remaining_rows;
+ _remaining_rows = 0;
+ *_batch_eof = true;
+ }
+ return Status::OK();
+}
+
ParquetColumnReader::Statistics RowGroupReader::statistics() {
ParquetColumnReader::Statistics st;
for (auto& reader : _column_readers) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index e1b54bb529..0a18514c39 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -35,12 +35,16 @@ public:
ParquetColumnReader::Statistics statistics();
+private:
+ Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool* _batch_eof);
+
private:
doris::FileReader* _file_reader;
std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _column_readers;
const std::vector<ParquetReadColumn>& _read_columns;
const int32_t _row_group_id;
const tparquet::RowGroup& _row_group_meta;
+ int64_t _remaining_rows;
int64_t _read_rows = 0;
cctz::time_zone* _ctz;
};
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 7ff57ac05b..dc33f321d7 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -143,6 +143,7 @@ Status ParquetReader::init_reader(
_colname_to_value_range = colname_to_value_range;
RETURN_IF_ERROR(_init_read_columns());
RETURN_IF_ERROR(_init_row_group_readers());
+
return Status::OK();
}
@@ -156,12 +157,12 @@ Status ParquetReader::_init_read_columns() {
_missing_cols.push_back(file_col_name);
}
}
+ // It is legal to get empty include_column_ids in query task.
if (include_column_ids.empty()) {
- return Status::InternalError("No columns found in parquet file");
+ return Status::OK();
}
// The same order as physical columns
std::sort(include_column_ids.begin(), include_column_ids.end());
- _read_columns.clear();
for (int& parquet_col_id : include_column_ids) {
_read_columns.emplace_back(parquet_col_id,
_file_metadata->schema().get_column(parquet_col_id)->name);
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index c9ccfbf46c..3503e8c468 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -257,7 +257,6 @@ Status VFileScanner::_fill_columns_from_path(size_t rows) {
return Status::InternalError(ss.str());
}
const std::string& column_from_path = range.columns_from_path[it->second];
-
auto doris_column = _src_block_ptr->get_by_name(slot_desc->col_name()).column;
IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
@@ -528,12 +527,28 @@ Status VFileScanner::_init_expr_ctxes() {
std::map<SlotId, int> full_src_index_map;
std::map<SlotId, SlotDescriptor*> full_src_slot_map;
+ std::map<std::string, int> partition_name_to_key_index_map;
int index = 0;
for (const auto& slot_desc : _real_tuple_desc->slots()) {
full_src_slot_map.emplace(slot_desc->id(), slot_desc);
full_src_index_map.emplace(slot_desc->id(), index++);
}
+ // For external table query, find the index of column in path.
+ // Because query doesn't always search for all columns in a table
+ // and the order of selected columns is random.
+ // All ranges in _ranges vector should have identical columns_from_path_keys
+ // because they are all file splits for the same external table.
+ // So here use the first element of _ranges to fill the partition_name_to_key_index_map
+ if (_ranges[0].__isset.columns_from_path_keys) {
+ std::vector<std::string> key_map = _ranges[0].columns_from_path_keys;
+ if (!key_map.empty()) {
+ for (size_t i = 0; i < key_map.size(); i++) {
+ partition_name_to_key_index_map.emplace(key_map[i], i);
+ }
+ }
+ }
+
_num_of_columns_from_file = _params.num_of_columns_from_file;
for (const auto& slot_info : _params.required_slots) {
auto slot_id = slot_info.slot_id;
@@ -551,8 +566,13 @@ Status VFileScanner::_init_expr_ctxes() {
_file_col_names.push_back(it->second->col_name());
} else {
_partition_slot_descs.emplace_back(it->second);
- auto iti = full_src_index_map.find(slot_id);
- _partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file);
+ if (_is_load) {
+ auto iti = full_src_index_map.find(slot_id);
+ _partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file);
+ } else {
+ auto kit = partition_name_to_key_index_map.find(it->second->col_name());
+ _partition_slot_index_map.emplace(slot_id, kit->second);
+ }
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
index 04916fb105..8de5b5d312 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
@@ -304,10 +304,11 @@ public class HiveScanProvider implements HMSTableScanProviderIf {
for (InputSplit split : inputSplits) {
FileSplit fileSplit = (FileSplit) split;
+ List<String> pathPartitionKeys = getPathPartitionKeys();
List<String> partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
- getPathPartitionKeys(), false);
+ pathPartitionKeys, false);
- TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath);
+ TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
LOG.info(
@@ -366,12 +367,14 @@ public class HiveScanProvider implements HMSTableScanProviderIf {
return locations;
}
- private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List<String> columnsFromPath)
+ private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List<String> columnsFromPath,
+ List<String> columnsFromPathKeys)
throws DdlException, MetaNotFoundException {
TFileRangeDesc rangeDesc = new TFileRangeDesc();
rangeDesc.setStartOffset(fileSplit.getStart());
rangeDesc.setSize(fileSplit.getLength());
rangeDesc.setColumnsFromPath(columnsFromPath);
+ rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys);
if (getLocationType() == TFileType.FILE_HDFS) {
rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 498e8d2082..6dd774ea4a 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -306,6 +306,8 @@ struct TFileRangeDesc {
5: optional i64 file_size;
// columns parsed from file path should be after the columns read from file
6: optional list<string> columns_from_path;
+ // column names from file path, in the same order with columns_from_path
+ 7: optional list<string> columns_from_path_keys;
}
// TFileScanRange represents a set of descriptions of a file and the rules for reading and converting it.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org