You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/10/05 15:01:04 UTC
[doris] branch master updated: [fix](spark-load) no need to filter row group when doing spark load (#13116)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d286aa7bf7 [fix](spark-load) no need to filter row group when doing spark load (#13116)
d286aa7bf7 is described below
commit d286aa7bf7f0585fc1e4e9464e69eb8f378d6d40
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Wed Oct 5 23:00:56 2022 +0800
[fix](spark-load) no need to filter row group when doing spark load (#13116)
1. Fix issue #13115
2. Modify the method of `get_next_block` or `GenericReader`, to return "read_rows" explicitly.
Some columns in block may not be filled in reader, if the first column is not filled, use `block->rows()` can not return real row numbers.
3. Add more checks for broker load test cases.
---
be/src/common/config.h | 1 -
be/src/exec/arrow/arrow_reader.cpp | 5 +++--
be/src/exec/arrow/arrow_reader.h | 2 +-
be/src/exec/arrow/parquet_reader.cpp | 5 +++--
be/src/exec/arrow/parquet_reader.h | 1 +
be/src/exec/base_scanner.h | 2 +-
be/src/vec/exec/format/generic_reader.h | 2 +-
.../vec/exec/format/parquet/vparquet_group_reader.cpp | 4 +++-
.../vec/exec/format/parquet/vparquet_group_reader.h | 2 +-
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 5 +++--
be/src/vec/exec/format/parquet/vparquet_reader.h | 2 +-
be/src/vec/exec/scan/vfile_scanner.cpp | 10 ++++++----
be/test/vec/exec/parquet/parquet_reader_test.cpp | 3 ++-
be/test/vec/exec/parquet/parquet_thrift_test.cpp | 3 ++-
regression-test/conf/regression-conf.groovy | 2 ++
.../load_p0/broker_load/test_broker_load.groovy | 19 ++++++++++++++++++-
16 files changed, 48 insertions(+), 20 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 8ee1649378..736381873b 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -819,7 +819,6 @@ CONF_Int32(object_pool_buffer_size, "100");
// ParquetReaderWrap prefetch buffer size
CONF_Int32(parquet_reader_max_buffer_size, "50");
-CONF_Bool(parquet_predicate_push_down, "true");
// Max size of parquet page header in bytes
CONF_mInt32(parquet_header_max_size_mb, "1");
// Max buffer size for parquet row group
diff --git a/be/src/exec/arrow/arrow_reader.cpp b/be/src/exec/arrow/arrow_reader.cpp
index 72d4960a43..d4b8f11eaa 100644
--- a/be/src/exec/arrow/arrow_reader.cpp
+++ b/be/src/exec/arrow/arrow_reader.cpp
@@ -98,7 +98,7 @@ int ArrowReaderWrap::get_column_index(std::string column_name) {
}
}
-Status ArrowReaderWrap::get_next_block(vectorized::Block* block, bool* eof) {
+Status ArrowReaderWrap::get_next_block(vectorized::Block* block, size_t* read_row, bool* eof) {
size_t rows = 0;
bool tmp_eof = false;
do {
@@ -107,7 +107,7 @@ Status ArrowReaderWrap::get_next_block(vectorized::Block* block, bool* eof) {
// We need to make sure the eof is set to true iff block is empty.
if (tmp_eof) {
*eof = (rows == 0);
- return Status::OK();
+ break;
}
}
@@ -129,6 +129,7 @@ Status ArrowReaderWrap::get_next_block(vectorized::Block* block, bool* eof) {
rows += num_elements;
_arrow_batch_cur_idx += num_elements;
} while (!tmp_eof && rows < _state->batch_size());
+ *read_row = rows;
return Status::OK();
}
diff --git a/be/src/exec/arrow/arrow_reader.h b/be/src/exec/arrow/arrow_reader.h
index 2d83a1be01..561f67fe94 100644
--- a/be/src/exec/arrow/arrow_reader.h
+++ b/be/src/exec/arrow/arrow_reader.h
@@ -92,7 +92,7 @@ public:
return Status::NotSupported("Not Implemented read");
}
// for vec
- Status get_next_block(vectorized::Block* block, bool* eof) override;
+ Status get_next_block(vectorized::Block* block, size_t* read_row, bool* eof) override;
// This method should be deprecated once the old scanner is removed.
// And user should use "get_next_block" instead.
Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof);
diff --git a/be/src/exec/arrow/parquet_reader.cpp b/be/src/exec/arrow/parquet_reader.cpp
index fc8c1ca5f5..f4d5aa5342 100644
--- a/be/src/exec/arrow/parquet_reader.cpp
+++ b/be/src/exec/arrow/parquet_reader.cpp
@@ -100,7 +100,8 @@ Status ParquetReaderWrap::init_reader(const TupleDescriptor* tuple_desc,
_timezone = timezone;
RETURN_IF_ERROR(column_indices());
- if (config::parquet_predicate_push_down) {
+ _need_filter_row_group = (tuple_desc != nullptr);
+ if (_need_filter_row_group) {
int64_t file_size = 0;
size(&file_size);
_row_group_reader.reset(new RowGroupReader(_range_start_offset, _range_size,
@@ -551,7 +552,7 @@ void ParquetReaderWrap::read_batches(arrow::RecordBatchVector& batches, int curr
}
bool ParquetReaderWrap::filter_row_group(int current_group) {
- if (config::parquet_predicate_push_down) {
+ if (_need_filter_row_group) {
auto filter_group_set = _row_group_reader->filter_groups();
if (filter_group_set.end() != filter_group_set.find(current_group)) {
// find filter group, skip
diff --git a/be/src/exec/arrow/parquet_reader.h b/be/src/exec/arrow/parquet_reader.h
index a2a80a1966..fcb96d46d2 100644
--- a/be/src/exec/arrow/parquet_reader.h
+++ b/be/src/exec/arrow/parquet_reader.h
@@ -101,6 +101,7 @@ private:
std::string _timezone;
int64_t _range_start_offset;
int64_t _range_size;
+ bool _need_filter_row_group = false;
private:
std::unique_ptr<doris::RowGroupReader> _row_group_reader;
diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h
index 6711836e53..c6bcde2f67 100644
--- a/be/src/exec/base_scanner.h
+++ b/be/src/exec/base_scanner.h
@@ -153,7 +153,7 @@ protected:
int _num_of_columns_from_file;
// slot_ids for parquet predicate push down are in tuple desc
- TupleId _tupleId;
+ TupleId _tupleId = -1;
std::vector<ExprContext*> _conjunct_ctxs;
private:
diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h
index d838f4dac1..ea13a62627 100644
--- a/be/src/vec/exec/format/generic_reader.h
+++ b/be/src/vec/exec/format/generic_reader.h
@@ -28,7 +28,7 @@ class Block;
// a set of blocks with specified schema,
class GenericReader {
public:
- virtual Status get_next_block(Block* block, bool* eof) = 0;
+ virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof) = 0;
virtual std::unordered_map<std::string, TypeDescriptor> get_name_to_type() {
std::unordered_map<std::string, TypeDescriptor> map;
return map;
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index ddcc6494d0..a46813cbf2 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -60,7 +60,8 @@ Status RowGroupReader::init(const FieldDescriptor& schema, std::vector<RowRange>
return Status::OK();
}
-Status RowGroupReader::next_batch(Block* block, size_t batch_size, bool* _batch_eof) {
+Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_rows,
+ bool* _batch_eof) {
size_t batch_read_rows = 0;
bool has_eof = false;
int col_idx = 0;
@@ -86,6 +87,7 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, bool* _batch_
has_eof = col_eof;
col_idx++;
}
+ *read_rows = batch_read_rows;
_read_rows += batch_read_rows;
*_batch_eof = has_eof;
// use data fill utils read column data to column ptr
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index 27daffe6f7..e1b54bb529 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -31,7 +31,7 @@ public:
~RowGroupReader();
Status init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges,
std::unordered_map<int, tparquet::OffsetIndex>& col_offsets);
- Status next_batch(Block* block, size_t batch_size, bool* _batch_eof);
+ Status next_batch(Block* block, size_t batch_size, size_t* read_rows, bool* _batch_eof);
ParquetColumnReader::Statistics statistics();
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 6b8a01c83c..4252402ebd 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -177,7 +177,7 @@ Status ParquetReader::get_columns(std::unordered_map<std::string, TypeDescriptor
return Status::OK();
}
-Status ParquetReader::get_next_block(Block* block, bool* eof) {
+Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
int32_t num_of_readers = _row_group_readers.size();
DCHECK(num_of_readers <= _read_row_groups.size());
if (_read_row_groups.empty()) {
@@ -187,7 +187,8 @@ Status ParquetReader::get_next_block(Block* block, bool* eof) {
bool _batch_eof = false;
{
SCOPED_RAW_TIMER(&_statistics.column_read_time);
- RETURN_IF_ERROR(_current_group_reader->next_batch(block, _batch_size, &_batch_eof));
+ RETURN_IF_ERROR(
+ _current_group_reader->next_batch(block, _batch_size, read_rows, &_batch_eof));
}
if (_batch_eof) {
auto column_st = _current_group_reader->statistics();
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h
index ab44c31517..e495f7089f 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -61,7 +61,7 @@ public:
Status init_reader(
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
- Status get_next_block(Block* block, bool* eof) override;
+ Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
void close();
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index f6f8127146..b37ca30290 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -99,7 +99,7 @@ Status VFileScanner::open(RuntimeState* state) {
}
// For query:
-// [exist cols] [non-exist cols] [col from path] input ouput
+// [exist cols] [non-exist cols] [col from path] input output
// A B C D E
// _init_src_block x x x x x - x
// get_next_block x x x - - - x
@@ -109,7 +109,7 @@ Status VFileScanner::open(RuntimeState* state) {
// _convert_to_output_block - - - - - - -
//
// For load:
-// [exist cols] [non-exist cols] [col from path] input ouput
+// [exist cols] [non-exist cols] [col from path] input output
// A B C D E
// _init_src_block x x x x x x -
// get_next_block x x x - - x -
@@ -130,15 +130,17 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eo
// Init src block for load job based on the data file schema (e.g. parquet)
// For query job, simply set _src_block_ptr to block.
+ size_t read_rows = 0;
RETURN_IF_ERROR(_init_src_block(block));
{
SCOPED_TIMER(_get_block_timer);
// Read next block.
// Some of column in block may not be filled (column not exist in file)
- RETURN_IF_ERROR(_cur_reader->get_next_block(_src_block_ptr, &_cur_reader_eof));
+ RETURN_IF_ERROR(
+ _cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof));
}
- if (_src_block_ptr->rows() > 0) {
+ if (read_rows > 0) {
// Convert the src block columns type to string in-place.
RETURN_IF_ERROR(_cast_to_input_block(block));
// Fill rows in src block with partition columns from path. (e.g. Hive partition columns)
diff --git a/be/test/vec/exec/parquet/parquet_reader_test.cpp b/be/test/vec/exec/parquet/parquet_reader_test.cpp
index 68a2043d66..b2288338b3 100644
--- a/be/test/vec/exec/parquet/parquet_reader_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_reader_test.cpp
@@ -122,7 +122,8 @@ TEST_F(ParquetReaderTest, normal) {
ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name()));
}
bool eof = false;
- p_reader->get_next_block(block, &eof);
+ size_t read_row = 0;
+ p_reader->get_next_block(block, &read_row, &eof);
for (auto& col : block->get_columns_with_type_and_name()) {
ASSERT_EQ(col.column->size(), 10);
}
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index c18d3099d7..1e530d4410 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -411,7 +411,8 @@ TEST_F(ParquetThriftReaderTest, group_reader) {
ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name()));
}
bool batch_eof = false;
- auto stb = row_group_reader->next_batch(&block, 1024, &batch_eof);
+ size_t read_rows = 0;
+ auto stb = row_group_reader->next_batch(&block, 1024, &read_rows, &batch_eof);
EXPECT_TRUE(stb.ok());
LocalFileReader result("./be/test/exec/test_data/parquet_scanner/group-reader.txt", 0);
diff --git a/regression-test/conf/regression-conf.groovy b/regression-test/conf/regression-conf.groovy
index 170ae8cca5..506cf89bf7 100644
--- a/regression-test/conf/regression-conf.groovy
+++ b/regression-test/conf/regression-conf.groovy
@@ -63,3 +63,5 @@ brokerName = "broker_name"
// broker load test config
enableBrokerLoad=false
+ak=""
+sk=""
diff --git a/regression-test/suites/load_p0/broker_load/test_broker_load.groovy b/regression-test/suites/load_p0/broker_load/test_broker_load.groovy
index 8cfc912a40..0a9a6739df 100644
--- a/regression-test/suites/load_p0/broker_load/test_broker_load.groovy
+++ b/regression-test/suites/load_p0/broker_load/test_broker_load.groovy
@@ -60,6 +60,20 @@ suite("test_broker_load", "p0") {
]
def where_exprs = ["", "", "", "", "", "", "", "", "", "", "", "where p_partkey>10", ""]
+ def etl_info = ["unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+ "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+ "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+ "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+ "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+ "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+ "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+ "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+ "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+ "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+ "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+ "unselected.rows=163703; dpp.abnorm.ALL=0; dpp.norm.ALL=36294",
+ "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000"]
+
String ak = getS3AK()
String sk = getS3SK()
String enabled = context.config.otherConfigs.get("enableBrokerLoad")
@@ -126,12 +140,14 @@ suite("test_broker_load", "p0") {
i++
}
+ i = 0
for (String label in uuids) {
max_try_milli_secs = 600000
while (max_try_milli_secs > 0) {
- String[][] result = sql """ show load where label="$label"; """
+ String[][] result = sql """ show load where label="$label" order by createtime desc limit 1; """
if (result[0][2].equals("FINISHED")) {
logger.info("Load FINISHED " + label)
+ assertTrue(etl_info[i] == result[0][5], "expected: " + etl_info[i] + ", actual: " + result[0][5])
break;
}
if (result[0][2].equals("CANCELLED")) {
@@ -143,6 +159,7 @@ suite("test_broker_load", "p0") {
assertTrue(1 == 2, "Load Timeout.")
}
}
+ i++
}
} finally {
for (String table in tables) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org