You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/10/05 15:01:04 UTC

[doris] branch master updated: [fix](spark-load) no need to filter row group when doing spark load (#13116)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d286aa7bf7 [fix](spark-load) no need to filter row group when doing spark load (#13116)
d286aa7bf7 is described below

commit d286aa7bf7f0585fc1e4e9464e69eb8f378d6d40
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Wed Oct 5 23:00:56 2022 +0800

    [fix](spark-load) no need to filter row group when doing spark load (#13116)
    
    1. Fix issue #13115
    2. Modify the method of `get_next_block` or `GenericReader`, to return "read_rows" explicitly.
        Some columns in block may not be filled in reader, if the first column is not filled, use `block->rows()` can not return real row numbers.
    3. Add more checks for broker load test cases.
---
 be/src/common/config.h                                |  1 -
 be/src/exec/arrow/arrow_reader.cpp                    |  5 +++--
 be/src/exec/arrow/arrow_reader.h                      |  2 +-
 be/src/exec/arrow/parquet_reader.cpp                  |  5 +++--
 be/src/exec/arrow/parquet_reader.h                    |  1 +
 be/src/exec/base_scanner.h                            |  2 +-
 be/src/vec/exec/format/generic_reader.h               |  2 +-
 .../vec/exec/format/parquet/vparquet_group_reader.cpp |  4 +++-
 .../vec/exec/format/parquet/vparquet_group_reader.h   |  2 +-
 be/src/vec/exec/format/parquet/vparquet_reader.cpp    |  5 +++--
 be/src/vec/exec/format/parquet/vparquet_reader.h      |  2 +-
 be/src/vec/exec/scan/vfile_scanner.cpp                | 10 ++++++----
 be/test/vec/exec/parquet/parquet_reader_test.cpp      |  3 ++-
 be/test/vec/exec/parquet/parquet_thrift_test.cpp      |  3 ++-
 regression-test/conf/regression-conf.groovy           |  2 ++
 .../load_p0/broker_load/test_broker_load.groovy       | 19 ++++++++++++++++++-
 16 files changed, 48 insertions(+), 20 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 8ee1649378..736381873b 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -819,7 +819,6 @@ CONF_Int32(object_pool_buffer_size, "100");
 
 // ParquetReaderWrap prefetch buffer size
 CONF_Int32(parquet_reader_max_buffer_size, "50");
-CONF_Bool(parquet_predicate_push_down, "true");
 // Max size of parquet page header in bytes
 CONF_mInt32(parquet_header_max_size_mb, "1");
 // Max buffer size for parquet row group
diff --git a/be/src/exec/arrow/arrow_reader.cpp b/be/src/exec/arrow/arrow_reader.cpp
index 72d4960a43..d4b8f11eaa 100644
--- a/be/src/exec/arrow/arrow_reader.cpp
+++ b/be/src/exec/arrow/arrow_reader.cpp
@@ -98,7 +98,7 @@ int ArrowReaderWrap::get_column_index(std::string column_name) {
     }
 }
 
-Status ArrowReaderWrap::get_next_block(vectorized::Block* block, bool* eof) {
+Status ArrowReaderWrap::get_next_block(vectorized::Block* block, size_t* read_row, bool* eof) {
     size_t rows = 0;
     bool tmp_eof = false;
     do {
@@ -107,7 +107,7 @@ Status ArrowReaderWrap::get_next_block(vectorized::Block* block, bool* eof) {
             // We need to make sure the eof is set to true iff block is empty.
             if (tmp_eof) {
                 *eof = (rows == 0);
-                return Status::OK();
+                break;
             }
         }
 
@@ -129,6 +129,7 @@ Status ArrowReaderWrap::get_next_block(vectorized::Block* block, bool* eof) {
         rows += num_elements;
         _arrow_batch_cur_idx += num_elements;
     } while (!tmp_eof && rows < _state->batch_size());
+    *read_row = rows;
     return Status::OK();
 }
 
diff --git a/be/src/exec/arrow/arrow_reader.h b/be/src/exec/arrow/arrow_reader.h
index 2d83a1be01..561f67fe94 100644
--- a/be/src/exec/arrow/arrow_reader.h
+++ b/be/src/exec/arrow/arrow_reader.h
@@ -92,7 +92,7 @@ public:
         return Status::NotSupported("Not Implemented read");
     }
     // for vec
-    Status get_next_block(vectorized::Block* block, bool* eof) override;
+    Status get_next_block(vectorized::Block* block, size_t* read_row, bool* eof) override;
     // This method should be deprecated once the old scanner is removed.
     // And user should use "get_next_block" instead.
     Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof);
diff --git a/be/src/exec/arrow/parquet_reader.cpp b/be/src/exec/arrow/parquet_reader.cpp
index fc8c1ca5f5..f4d5aa5342 100644
--- a/be/src/exec/arrow/parquet_reader.cpp
+++ b/be/src/exec/arrow/parquet_reader.cpp
@@ -100,7 +100,8 @@ Status ParquetReaderWrap::init_reader(const TupleDescriptor* tuple_desc,
         _timezone = timezone;
 
         RETURN_IF_ERROR(column_indices());
-        if (config::parquet_predicate_push_down) {
+        _need_filter_row_group = (tuple_desc != nullptr);
+        if (_need_filter_row_group) {
             int64_t file_size = 0;
             size(&file_size);
             _row_group_reader.reset(new RowGroupReader(_range_start_offset, _range_size,
@@ -551,7 +552,7 @@ void ParquetReaderWrap::read_batches(arrow::RecordBatchVector& batches, int curr
 }
 
 bool ParquetReaderWrap::filter_row_group(int current_group) {
-    if (config::parquet_predicate_push_down) {
+    if (_need_filter_row_group) {
         auto filter_group_set = _row_group_reader->filter_groups();
         if (filter_group_set.end() != filter_group_set.find(current_group)) {
             // find filter group, skip
diff --git a/be/src/exec/arrow/parquet_reader.h b/be/src/exec/arrow/parquet_reader.h
index a2a80a1966..fcb96d46d2 100644
--- a/be/src/exec/arrow/parquet_reader.h
+++ b/be/src/exec/arrow/parquet_reader.h
@@ -101,6 +101,7 @@ private:
     std::string _timezone;
     int64_t _range_start_offset;
     int64_t _range_size;
+    bool _need_filter_row_group = false;
 
 private:
     std::unique_ptr<doris::RowGroupReader> _row_group_reader;
diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h
index 6711836e53..c6bcde2f67 100644
--- a/be/src/exec/base_scanner.h
+++ b/be/src/exec/base_scanner.h
@@ -153,7 +153,7 @@ protected:
     int _num_of_columns_from_file;
 
     // slot_ids for parquet predicate push down are in tuple desc
-    TupleId _tupleId;
+    TupleId _tupleId = -1;
     std::vector<ExprContext*> _conjunct_ctxs;
 
 private:
diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h
index d838f4dac1..ea13a62627 100644
--- a/be/src/vec/exec/format/generic_reader.h
+++ b/be/src/vec/exec/format/generic_reader.h
@@ -28,7 +28,7 @@ class Block;
 // a set of blocks with specified schema,
 class GenericReader {
 public:
-    virtual Status get_next_block(Block* block, bool* eof) = 0;
+    virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof) = 0;
     virtual std::unordered_map<std::string, TypeDescriptor> get_name_to_type() {
         std::unordered_map<std::string, TypeDescriptor> map;
         return map;
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index ddcc6494d0..a46813cbf2 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -60,7 +60,8 @@ Status RowGroupReader::init(const FieldDescriptor& schema, std::vector<RowRange>
     return Status::OK();
 }
 
-Status RowGroupReader::next_batch(Block* block, size_t batch_size, bool* _batch_eof) {
+Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_rows,
+                                  bool* _batch_eof) {
     size_t batch_read_rows = 0;
     bool has_eof = false;
     int col_idx = 0;
@@ -86,6 +87,7 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, bool* _batch_
         has_eof = col_eof;
         col_idx++;
     }
+    *read_rows = batch_read_rows;
     _read_rows += batch_read_rows;
     *_batch_eof = has_eof;
     // use data fill utils read column data to column ptr
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index 27daffe6f7..e1b54bb529 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -31,7 +31,7 @@ public:
     ~RowGroupReader();
     Status init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges,
                 std::unordered_map<int, tparquet::OffsetIndex>& col_offsets);
-    Status next_batch(Block* block, size_t batch_size, bool* _batch_eof);
+    Status next_batch(Block* block, size_t batch_size, size_t* read_rows, bool* _batch_eof);
 
     ParquetColumnReader::Statistics statistics();
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 6b8a01c83c..4252402ebd 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -177,7 +177,7 @@ Status ParquetReader::get_columns(std::unordered_map<std::string, TypeDescriptor
     return Status::OK();
 }
 
-Status ParquetReader::get_next_block(Block* block, bool* eof) {
+Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
     int32_t num_of_readers = _row_group_readers.size();
     DCHECK(num_of_readers <= _read_row_groups.size());
     if (_read_row_groups.empty()) {
@@ -187,7 +187,8 @@ Status ParquetReader::get_next_block(Block* block, bool* eof) {
     bool _batch_eof = false;
     {
         SCOPED_RAW_TIMER(&_statistics.column_read_time);
-        RETURN_IF_ERROR(_current_group_reader->next_batch(block, _batch_size, &_batch_eof));
+        RETURN_IF_ERROR(
+                _current_group_reader->next_batch(block, _batch_size, read_rows, &_batch_eof));
     }
     if (_batch_eof) {
         auto column_st = _current_group_reader->statistics();
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h
index ab44c31517..e495f7089f 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -61,7 +61,7 @@ public:
     Status init_reader(
             std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
 
-    Status get_next_block(Block* block, bool* eof) override;
+    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
 
     void close();
 
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index f6f8127146..b37ca30290 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -99,7 +99,7 @@ Status VFileScanner::open(RuntimeState* state) {
 }
 
 // For query:
-//                              [exist cols]  [non-exist cols]  [col from path]  input  ouput
+//                              [exist cols]  [non-exist cols]  [col from path]  input  output
 //                              A     B    C  D                 E
 // _init_src_block              x     x    x  x                 x                -      x
 // get_next_block               x     x    x  -                 -                -      x
@@ -109,7 +109,7 @@ Status VFileScanner::open(RuntimeState* state) {
 // _convert_to_output_block     -     -    -  -                 -                -      -
 //
 // For load:
-//                              [exist cols]  [non-exist cols]  [col from path]  input  ouput
+//                              [exist cols]  [non-exist cols]  [col from path]  input  output
 //                              A     B    C  D                 E
 // _init_src_block              x     x    x  x                 x                x      -
 // get_next_block               x     x    x  -                 -                x      -
@@ -130,15 +130,17 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eo
 
         // Init src block for load job based on the data file schema (e.g. parquet)
         // For query job, simply set _src_block_ptr to block.
+        size_t read_rows = 0;
         RETURN_IF_ERROR(_init_src_block(block));
         {
             SCOPED_TIMER(_get_block_timer);
             // Read next block.
             // Some of column in block may not be filled (column not exist in file)
-            RETURN_IF_ERROR(_cur_reader->get_next_block(_src_block_ptr, &_cur_reader_eof));
+            RETURN_IF_ERROR(
+                    _cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof));
         }
 
-        if (_src_block_ptr->rows() > 0) {
+        if (read_rows > 0) {
             // Convert the src block columns type to string in-place.
             RETURN_IF_ERROR(_cast_to_input_block(block));
             // Fill rows in src block with partition columns from path. (e.g. Hive partition columns)
diff --git a/be/test/vec/exec/parquet/parquet_reader_test.cpp b/be/test/vec/exec/parquet/parquet_reader_test.cpp
index 68a2043d66..b2288338b3 100644
--- a/be/test/vec/exec/parquet/parquet_reader_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_reader_test.cpp
@@ -122,7 +122,8 @@ TEST_F(ParquetReaderTest, normal) {
                 ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name()));
     }
     bool eof = false;
-    p_reader->get_next_block(block, &eof);
+    size_t read_row = 0;
+    p_reader->get_next_block(block, &read_row, &eof);
     for (auto& col : block->get_columns_with_type_and_name()) {
         ASSERT_EQ(col.column->size(), 10);
     }
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index c18d3099d7..1e530d4410 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -411,7 +411,8 @@ TEST_F(ParquetThriftReaderTest, group_reader) {
                 ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name()));
     }
     bool batch_eof = false;
-    auto stb = row_group_reader->next_batch(&block, 1024, &batch_eof);
+    size_t read_rows = 0;
+    auto stb = row_group_reader->next_batch(&block, 1024, &read_rows, &batch_eof);
     EXPECT_TRUE(stb.ok());
 
     LocalFileReader result("./be/test/exec/test_data/parquet_scanner/group-reader.txt", 0);
diff --git a/regression-test/conf/regression-conf.groovy b/regression-test/conf/regression-conf.groovy
index 170ae8cca5..506cf89bf7 100644
--- a/regression-test/conf/regression-conf.groovy
+++ b/regression-test/conf/regression-conf.groovy
@@ -63,3 +63,5 @@ brokerName = "broker_name"
 
 // broker load test config
 enableBrokerLoad=false
+ak=""
+sk=""
diff --git a/regression-test/suites/load_p0/broker_load/test_broker_load.groovy b/regression-test/suites/load_p0/broker_load/test_broker_load.groovy
index 8cfc912a40..0a9a6739df 100644
--- a/regression-test/suites/load_p0/broker_load/test_broker_load.groovy
+++ b/regression-test/suites/load_p0/broker_load/test_broker_load.groovy
@@ -60,6 +60,20 @@ suite("test_broker_load", "p0") {
     ]
     def where_exprs = ["", "", "", "", "", "", "", "", "", "", "", "where p_partkey>10", ""]
 
+    def etl_info = ["unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "unselected.rows=163703; dpp.abnorm.ALL=0; dpp.norm.ALL=36294",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000"]
+
     String ak = getS3AK()
     String sk = getS3SK()
     String enabled = context.config.otherConfigs.get("enableBrokerLoad")
@@ -126,12 +140,14 @@ suite("test_broker_load", "p0") {
                 i++
             }
 
+            i = 0
             for (String label in uuids) {
                 max_try_milli_secs = 600000
                 while (max_try_milli_secs > 0) {
-                    String[][] result = sql """ show load where label="$label"; """
+                    String[][] result = sql """ show load where label="$label" order by createtime desc limit 1; """
                     if (result[0][2].equals("FINISHED")) {
                         logger.info("Load FINISHED " + label)
+                        assertTrue(etl_info[i] == result[0][5], "expected: " + etl_info[i] + ", actual: " + result[0][5])
                         break;
                     }
                     if (result[0][2].equals("CANCELLED")) {
@@ -143,6 +159,7 @@ suite("test_broker_load", "p0") {
                         assertTrue(1 == 2, "Load Timeout.")
                     }
                 }
+                i++
             }
         } finally {
             for (String table in tables) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org