You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/04/26 07:48:35 UTC

[GitHub] [incubator-doris] yinzhijian opened a new pull request, #9231: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

yinzhijian opened a new pull request, #9231:
URL: https://github.com/apache/incubator-doris/pull/9231

   # Proposed changes
   
   Issue Number: close #xxx
   
   ## Problem Summary:
   
   ## Optimization Todo list:
   1. FE generates the corresponding src slot desc and Expr through the parquet schema
   2. BE supports direct conversion of arrow type into dest primitive type of similar type. For example, arrow type is INT32, and dest type is TYPE_BIGINT (int64), INT32=>TYPE_BIGINT. Instead of the current way: INT32=> TYPE_INT => TYPE_BIGINT
   
   ## Performance Testing:
   load parquet file in vec version almost 1x faster than rowset version.
   rows num:300k
   test table schema:
   CREATE TABLE `parquet` (
     `id` int(11) NOT NULL COMMENT "",
     `email` varchar(26) NOT NULL COMMENT "",
     `c_date32` DATE NOT NULL COMMENT "",
     `c_date64` DATETIME NOT NULL COMMENT "",
     `c_timestamp` DATETIME NOT NULL COMMENT "",
     `c_decimal128` DECIMAL(27, 9) NULL COMMENT "",
     `c_bool` BOOLEAN NULL COMMENT "",
     `c_float` FLOAT NULL COMMENT "",
     `c_double` DOUBLE NULL COMMENT "",
     `c_fixed_size_binary` CHAR(20) NULL COMMENT "",
     `c_binary` VARCHAR(32) NULL COMMENT "",
     `c_uint64` BIGINT NULL COMMENT ""
   )
   DISTRIBUTED BY HASH(`id`) BUCKETS 1
   PROPERTIES (
   "replication_num" = "1"
   );
   
   ## Checklist(Required)
   
   1. Does it affect the original behavior: (No)
   2. Has unit tests been added: (No)
   4. Has document been added or modified: (No Need)
   5. Does it need to update dependencies: (No)
   6. Are there any changes that cannot be rolled back: (No)
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc...
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee commented on a diff in pull request #9231: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #9231:
URL: https://github.com/apache/incubator-doris/pull/9231#discussion_r865516492


##########
be/src/vec/utils/arrow_column_to_doris_column.cpp:
##########
@@ -0,0 +1,279 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/utils/arrow_column_to_doris_column.h"
+
+#include "vec/columns/column_nullable.h"
+#include "vec/data_types/data_type_decimal.h"
+#include "vec/runtime/vdatetime_value.h"
+
+#include <arrow/record_batch.h>
+#include <arrow/array.h>
+#include <arrow/status.h>
+
+#include "arrow/type.h"
+#include "arrow/array/array_binary.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/scalar.h"
+#include "arrow/type_fwd.h"
+#include "arrow/type_traits.h"
+
+#define FOR_ARROW_TYPES(M) \
+    M(::arrow::Type::BOOL, TYPE_BOOLEAN) \
+    M(::arrow::Type::INT8, TYPE_TINYINT) \
+    M(::arrow::Type::UINT8, TYPE_TINYINT) \
+    M(::arrow::Type::INT16, TYPE_SMALLINT) \
+    M(::arrow::Type::UINT16, TYPE_SMALLINT) \
+    M(::arrow::Type::INT32, TYPE_INT) \
+    M(::arrow::Type::UINT32, TYPE_INT) \
+    M(::arrow::Type::INT64, TYPE_BIGINT) \
+    M(::arrow::Type::UINT64, TYPE_BIGINT) \
+    M(::arrow::Type::HALF_FLOAT, TYPE_FLOAT) \
+    M(::arrow::Type::FLOAT, TYPE_FLOAT) \
+    M(::arrow::Type::DOUBLE, TYPE_DOUBLE) \
+    M(::arrow::Type::BINARY, TYPE_VARCHAR) \
+    M(::arrow::Type::FIXED_SIZE_BINARY, TYPE_VARCHAR) \
+    M(::arrow::Type::STRING, TYPE_VARCHAR) \
+    M(::arrow::Type::TIMESTAMP, TYPE_DATETIME) \
+    M(::arrow::Type::DATE32, TYPE_DATE) \
+    M(::arrow::Type::DATE64, TYPE_DATETIME) \
+    M(::arrow::Type::DECIMAL, TYPE_DECIMALV2)
+
+#define FOR_ARROW_NUMERIC_TYPES(M) \
+        M(arrow::Type::UINT8, UInt8) \
+        M(arrow::Type::INT8, Int8) \
+        M(arrow::Type::INT16, Int16) \
+        M(arrow::Type::INT32, Int32) \
+        M(arrow::Type::UINT64, UInt64) \
+        M(arrow::Type::INT64, Int64) \
+        M(arrow::Type::HALF_FLOAT, Float32) \
+        M(arrow::Type::FLOAT, Float32) \
+        M(arrow::Type::DOUBLE, Float64)
+
+namespace doris::vectorized {
+
+const PrimitiveType arrow_type_to_primitive_type(::arrow::Type::type type) {
+    switch(type) {
+#    define DISPATCH(ARROW_TYPE, CPP_TYPE) \
+        case ARROW_TYPE: \
+            return CPP_TYPE;
+        FOR_ARROW_TYPES(DISPATCH)
+#    undef DISPATCH
+        default:
+            break;
+    }
+    return INVALID_TYPE;
+}
+
+static size_t fill_nullable_column(const arrow::Array* array, size_t array_idx, vectorized::ColumnNullable* nullable_column,
+                        size_t num_elements) {
+    size_t null_elements_count = 0;
+    NullMap& map_data = nullable_column->get_null_map_data();
+    for (size_t i = 0; i < num_elements; ++i) {
+        auto is_null = array->IsNull(array_idx + i);
+        map_data.emplace_back(is_null);
+        null_elements_count += is_null;
+    }
+    return null_elements_count;
+}
+
+/// Inserts chars and offsets right into internal column data to reduce an overhead.
+/// Internal offsets are shifted by one to the right in comparison with Arrow ones. So the last offset should map to the end of all chars.
+/// Also internal strings are null terminated.
+static Status convert_column_with_string_data(const arrow::Array* array, size_t array_idx,
+                                        MutableColumnPtr& data_column, size_t num_elements) {
+    PaddedPODArray<UInt8> & column_chars_t = assert_cast<ColumnString &>(*data_column).get_chars();
+    PaddedPODArray<UInt32> & column_offsets = assert_cast<ColumnString &>(*data_column).get_offsets();
+
+    const auto & concrete_array = dynamic_cast<const arrow::BinaryArray &>(*array);

Review Comment:
   code format: auto&. same to other.



##########
be/src/vec/utils/arrow_column_to_doris_column.cpp:
##########
@@ -0,0 +1,279 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/utils/arrow_column_to_doris_column.h"
+
+#include "vec/columns/column_nullable.h"
+#include "vec/data_types/data_type_decimal.h"
+#include "vec/runtime/vdatetime_value.h"
+
+#include <arrow/record_batch.h>
+#include <arrow/array.h>
+#include <arrow/status.h>
+
+#include "arrow/type.h"
+#include "arrow/array/array_binary.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/scalar.h"
+#include "arrow/type_fwd.h"
+#include "arrow/type_traits.h"
+
+#define FOR_ARROW_TYPES(M) \
+    M(::arrow::Type::BOOL, TYPE_BOOLEAN) \
+    M(::arrow::Type::INT8, TYPE_TINYINT) \
+    M(::arrow::Type::UINT8, TYPE_TINYINT) \
+    M(::arrow::Type::INT16, TYPE_SMALLINT) \
+    M(::arrow::Type::UINT16, TYPE_SMALLINT) \
+    M(::arrow::Type::INT32, TYPE_INT) \
+    M(::arrow::Type::UINT32, TYPE_INT) \
+    M(::arrow::Type::INT64, TYPE_BIGINT) \
+    M(::arrow::Type::UINT64, TYPE_BIGINT) \
+    M(::arrow::Type::HALF_FLOAT, TYPE_FLOAT) \
+    M(::arrow::Type::FLOAT, TYPE_FLOAT) \
+    M(::arrow::Type::DOUBLE, TYPE_DOUBLE) \
+    M(::arrow::Type::BINARY, TYPE_VARCHAR) \
+    M(::arrow::Type::FIXED_SIZE_BINARY, TYPE_VARCHAR) \
+    M(::arrow::Type::STRING, TYPE_VARCHAR) \
+    M(::arrow::Type::TIMESTAMP, TYPE_DATETIME) \
+    M(::arrow::Type::DATE32, TYPE_DATE) \
+    M(::arrow::Type::DATE64, TYPE_DATETIME) \
+    M(::arrow::Type::DECIMAL, TYPE_DECIMALV2)
+
+#define FOR_ARROW_NUMERIC_TYPES(M) \
+        M(arrow::Type::UINT8, UInt8) \
+        M(arrow::Type::INT8, Int8) \
+        M(arrow::Type::INT16, Int16) \
+        M(arrow::Type::INT32, Int32) \
+        M(arrow::Type::UINT64, UInt64) \
+        M(arrow::Type::INT64, Int64) \
+        M(arrow::Type::HALF_FLOAT, Float32) \
+        M(arrow::Type::FLOAT, Float32) \
+        M(arrow::Type::DOUBLE, Float64)
+
+namespace doris::vectorized {
+
+const PrimitiveType arrow_type_to_primitive_type(::arrow::Type::type type) {
+    switch(type) {
+#    define DISPATCH(ARROW_TYPE, CPP_TYPE) \
+        case ARROW_TYPE: \
+            return CPP_TYPE;
+        FOR_ARROW_TYPES(DISPATCH)
+#    undef DISPATCH
+        default:
+            break;
+    }
+    return INVALID_TYPE;
+}
+
+static size_t fill_nullable_column(const arrow::Array* array, size_t array_idx, vectorized::ColumnNullable* nullable_column,
+                        size_t num_elements) {
+    size_t null_elements_count = 0;
+    NullMap& map_data = nullable_column->get_null_map_data();
+    for (size_t i = 0; i < num_elements; ++i) {
+        auto is_null = array->IsNull(array_idx + i);
+        map_data.emplace_back(is_null);
+        null_elements_count += is_null;
+    }
+    return null_elements_count;
+}
+
+/// Inserts chars and offsets right into internal column data to reduce an overhead.
+/// Internal offsets are shifted by one to the right in comparison with Arrow ones. So the last offset should map to the end of all chars.
+/// Also internal strings are null terminated.
+static Status convert_column_with_string_data(const arrow::Array* array, size_t array_idx,
+                                        MutableColumnPtr& data_column, size_t num_elements) {
+    PaddedPODArray<UInt8> & column_chars_t = assert_cast<ColumnString &>(*data_column).get_chars();
+    PaddedPODArray<UInt32> & column_offsets = assert_cast<ColumnString &>(*data_column).get_offsets();
+
+    const auto & concrete_array = dynamic_cast<const arrow::BinaryArray &>(*array);

Review Comment:
   Rethink here really need `dynamic_cast` ?
   1. `dynamic_cast` means bad design of class or code, replace with virtual function
   2.  this code may throw exception if cast faild. it is danger here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yangzhg commented on a diff in pull request #9231: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
yangzhg commented on code in PR #9231:
URL: https://github.com/apache/incubator-doris/pull/9231#discussion_r858471563


##########
be/src/vec/exec/vparquet_scanner.cpp:
##########
@@ -0,0 +1,310 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/vparquet_scanner.h"
+#include "exec/parquet_reader.h"
+#include "exprs/expr.h"
+#include "runtime/descriptors.h"
+#include "runtime/exec_env.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/functions/simple_function_factory.h"
+#include "vec/utils/arrow_column_to_doris_column.h"
+
+namespace doris::vectorized {
+
+VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile,
+                               const TBrokerScanRangeParams& params,
+                               const std::vector<TBrokerRangeDesc>& ranges,
+                               const std::vector<TNetworkAddress>& broker_addresses,
+                               const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
+        : ParquetScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter),
+          _batch(nullptr),
+          _arrow_batch_cur_idx(0),
+          _num_of_columns_from_file(0) {}
+VParquetScanner::~VParquetScanner() {
+}
+
+Status VParquetScanner::open() {
+    RETURN_IF_ERROR(ParquetScanner::open());
+    if (_ranges.empty()) {
+        return Status::OK();
+    }
+    auto range = _ranges[0];
+    _num_of_columns_from_file = range.__isset.num_of_columns_from_file
+                                        ? implicit_cast<int>(range.num_of_columns_from_file)
+                                        : implicit_cast<int>(_src_slot_descs.size());
+
+    // check consistency
+    if (range.__isset.num_of_columns_from_file) {
+        int size = range.columns_from_path.size();
+        for (const auto& r : _ranges) {
+            if (r.columns_from_path.size() != size) {
+                return Status::InternalError("ranges have different number of columns.");
+            }
+        }
+    }
+    return Status::OK();
+}
+
+// get next available arrow batch
+Status VParquetScanner::next_arrow_batch() {
+    _arrow_batch_cur_idx = 0;
+    // first, init file reader
+    if (_cur_file_reader == nullptr || _cur_file_eof) {
+        RETURN_IF_ERROR(open_next_reader());
+        _cur_file_eof = false;
+    }
+    // second, loop until find available arrow batch or EOF
+    while (!_scanner_eof) {
+        RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, _src_slot_descs, &_cur_file_eof));
+        if (_cur_file_eof) {
+            RETURN_IF_ERROR(open_next_reader());
+            _cur_file_eof = false;
+            continue;
+        }
+        if (_batch->num_rows() == 0) {
+            continue;
+        }
+        return Status::OK();
+    }
+    return Status::EndOfFile("EOF");
+}
+
+Status VParquetScanner::init_arrow_batch_if_necessary() {
+    // 1. init batch if first time 
+    // 2. reset reader if end of file
+    if (_scanner_eof || _batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) {
+        while (!_scanner_eof) {
+            Status status = next_arrow_batch();
+            if (_scanner_eof) {
+                return status;
+            }
+            if (status.is_end_of_file()) {
+                // try next file
+                continue;
+            }
+            return status;
+        }
+    }
+    return Status::OK();
+}
+
+Status VParquetScanner::init_src_block(Block* block) {
+    size_t batch_pos = 0;
+    for (auto i = 0; i < _num_of_columns_from_file; ++i) {
+        SlotDescriptor* slot_desc = _src_slot_descs[i];
+        if (slot_desc == nullptr) {
+            continue;
+        }
+        auto* array = _batch->column(batch_pos++).get();
+        auto pt = arrow_type_to_pt(array->type()->id());
+        if (pt == INVALID_TYPE) {
+            return Status::NotSupported(fmt::format(
+                "Not support arrow type:{}", array->type()->name()));
+        }
+        auto is_nullable = true;
+        // let src column be nullable for simplify converting
+        DataTypePtr data_type = DataTypeFactory::instance().create_data_type(pt, is_nullable);
+        MutableColumnPtr data_column = data_type->create_column();
+        block->insert(ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name()));
+    }
+    return Status::OK();
+}
+
+Status VParquetScanner::get_next(std::vector<MutableColumnPtr>& columns, bool* eof) {
+    // overall of type converting: 
+    // arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==>
+    // primitive type(PT1) ==materialize_block==> dest primitive type
+    SCOPED_TIMER(_read_timer);
+    // init arrow batch
+    {
+        Status st = init_arrow_batch_if_necessary();
+        if (!st.ok()) {
+            if (!st.is_end_of_file()) {
+                return st;
+            }
+            return Status::OK();
+        }
+    }
+    Block src_block;
+    RETURN_IF_ERROR(init_src_block(&src_block));
+    // convert arrow batch to block until reach the batch_size
+    while (!_scanner_eof) {
+        // cast arrow type to PT0 and append it to src block
+        // for example: arrow::Type::INT16 => TYPE_SMALLINT
+        RETURN_IF_ERROR(append_batch_to_src_block(&src_block));
+        // finalize the src block if full
+        if (src_block.rows() >= _state->batch_size()) {
+            break;
+        }
+        auto status = next_arrow_batch();
+        // if ok, append the batch to the src columns
+        if (status.ok()) {
+            continue;
+        }
+        // return error if not EOF
+        if (!status.is_end_of_file()) {
+            return status;
+        }
+        // if src block is not empty, then finalize the block
+        if (src_block.rows() > 0) {
+            break;
+        }
+        _cur_file_eof = true;
+        RETURN_IF_ERROR(next_arrow_batch());
+        // there may be different arrow file, so reinit block here
+        RETURN_IF_ERROR(init_src_block(&src_block));
+    }
+    COUNTER_UPDATE(_rows_read_counter, src_block.rows());
+    SCOPED_TIMER(_materialize_timer);
+    // cast PT0 => PT1
+    // for example: TYPE_SMALLINT => TYPE_VARCHAR
+    RETURN_IF_ERROR(cast_src_block(&src_block));
+    // range of current file
+    fill_columns_from_path(&src_block);
+    RETURN_IF_ERROR(eval_conjunts(&src_block));
+    // materialize, src block => dest columns
+    RETURN_IF_ERROR(materialize_block(&src_block, columns));
+    if (_scanner_eof) {
+        *eof = true;
+    } else {
+        *eof = false;
+    }

Review Comment:
   *eof = _scanner_eof



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yangzhg commented on a diff in pull request #9231: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
yangzhg commented on code in PR #9231:
URL: https://github.com/apache/incubator-doris/pull/9231#discussion_r858453645


##########
be/src/vec/utils/arrow_column_to_doris_column.h:
##########
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <iostream>
+#include <memory>
+#include <arrow/type.h>
+#include <arrow/type_fwd.h>
+
+#include "common/status.h"
+#include "runtime/primitive_type.h"
+#include "vec/core/column_with_type_and_name.h"
+
+// This files contains some utilities to convert Doris internal
+// data format from Apache Arrow format. 
+namespace doris::vectorized {
+
+const PrimitiveType arrow_type_to_pt(::arrow::Type::type type);
+
+Status arrow_column_to_doris_column(const arrow::Array* arrow_column,
+                                    size_t arrow_batch_cur_idx,
+                                    ColumnWithTypeAndName& doirs_column,
+                                    size_t num_elements,
+                                    const std::string& timezone);

Review Comment:
   why all types need timezone parameter



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yangzhg commented on a diff in pull request #9231: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
yangzhg commented on code in PR #9231:
URL: https://github.com/apache/incubator-doris/pull/9231#discussion_r858460946


##########
be/src/http/action/stream_load.cpp:
##########
@@ -98,6 +98,8 @@ static TFileFormatType::type parse_format(const std::string& format_str,
         if (compress_type.empty()) {
             format_type = TFileFormatType::FORMAT_JSON;
         }
+    } else if (boost::iequals(format_str, "PARQUET")) {

Review Comment:
   Better to use  `iequals` in `be/src/util/string_util.h`,  avoid using functions from boost



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yangzhg commented on a diff in pull request #9231: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
yangzhg commented on code in PR #9231:
URL: https://github.com/apache/incubator-doris/pull/9231#discussion_r858453645


##########
be/src/vec/utils/arrow_column_to_doris_column.h:
##########
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <iostream>
+#include <memory>
+#include <arrow/type.h>
+#include <arrow/type_fwd.h>
+
+#include "common/status.h"
+#include "runtime/primitive_type.h"
+#include "vec/core/column_with_type_and_name.h"
+
+// This files contains some utilities to convert Doris internal
+// data format from Apache Arrow format. 
+namespace doris::vectorized {
+
+const PrimitiveType arrow_type_to_pt(::arrow::Type::type type);
+
+Status arrow_column_to_doris_column(const arrow::Array* arrow_column,
+                                    size_t arrow_batch_cur_idx,
+                                    ColumnWithTypeAndName& doirs_column,
+                                    size_t num_elements,
+                                    const std::string& timezone);

Review Comment:
   why all types need timezone parmater



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yinzhijian closed pull request #9231: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
yinzhijian closed pull request #9231:  [feature-wip](parquet-vec) Support parquet scanner in vectorized engine
URL: https://github.com/apache/incubator-doris/pull/9231


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yinzhijian commented on a diff in pull request #9231: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
yinzhijian commented on code in PR #9231:
URL: https://github.com/apache/incubator-doris/pull/9231#discussion_r859307192


##########
be/src/vec/utils/arrow_column_to_doris_column.h:
##########
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <iostream>
+#include <memory>
+#include <arrow/type.h>
+#include <arrow/type_fwd.h>
+
+#include "common/status.h"
+#include "runtime/primitive_type.h"
+#include "vec/core/column_with_type_and_name.h"
+
+// This files contains some utilities to convert Doris internal
+// data format from Apache Arrow format. 
+namespace doris::vectorized {
+
+const PrimitiveType arrow_type_to_pt(::arrow::Type::type type);
+
+Status arrow_column_to_doris_column(const arrow::Array* arrow_column,
+                                    size_t arrow_batch_cur_idx,
+                                    ColumnWithTypeAndName& doirs_column,
+                                    size_t num_elements,
+                                    const std::string& timezone);

Review Comment:
   there is no better way than this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yangzhg commented on a diff in pull request #9231: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
yangzhg commented on code in PR #9231:
URL: https://github.com/apache/incubator-doris/pull/9231#discussion_r858465100


##########
be/src/vec/exec/vparquet_scanner.cpp:
##########
@@ -0,0 +1,310 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/vparquet_scanner.h"
+#include "exec/parquet_reader.h"
+#include "exprs/expr.h"
+#include "runtime/descriptors.h"
+#include "runtime/exec_env.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/functions/simple_function_factory.h"
+#include "vec/utils/arrow_column_to_doris_column.h"
+
+namespace doris::vectorized {
+
+VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile,
+                               const TBrokerScanRangeParams& params,
+                               const std::vector<TBrokerRangeDesc>& ranges,
+                               const std::vector<TNetworkAddress>& broker_addresses,
+                               const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
+        : ParquetScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter),
+          _batch(nullptr),
+          _arrow_batch_cur_idx(0),
+          _num_of_columns_from_file(0) {}
+VParquetScanner::~VParquetScanner() {
+}
+
+Status VParquetScanner::open() {
+    RETURN_IF_ERROR(ParquetScanner::open());
+    if (_ranges.empty()) {
+        return Status::OK();
+    }
+    auto range = _ranges[0];
+    _num_of_columns_from_file = range.__isset.num_of_columns_from_file
+                                        ? implicit_cast<int>(range.num_of_columns_from_file)
+                                        : implicit_cast<int>(_src_slot_descs.size());
+
+    // check consistency
+    if (range.__isset.num_of_columns_from_file) {
+        int size = range.columns_from_path.size();
+        for (const auto& r : _ranges) {
+            if (r.columns_from_path.size() != size) {
+                return Status::InternalError("ranges have different number of columns.");
+            }
+        }
+    }
+    return Status::OK();
+}
+
+// get next available arrow batch
+Status VParquetScanner::next_arrow_batch() {
+    _arrow_batch_cur_idx = 0;
+    // first, init file reader
+    if (_cur_file_reader == nullptr || _cur_file_eof) {
+        RETURN_IF_ERROR(open_next_reader());
+        _cur_file_eof = false;
+    }
+    // second, loop until find available arrow batch or EOF
+    while (!_scanner_eof) {
+        RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, _src_slot_descs, &_cur_file_eof));
+        if (_cur_file_eof) {
+            RETURN_IF_ERROR(open_next_reader());
+            _cur_file_eof = false;
+            continue;
+        }
+        if (_batch->num_rows() == 0) {
+            continue;
+        }
+        return Status::OK();
+    }
+    return Status::EndOfFile("EOF");
+}
+
+Status VParquetScanner::init_arrow_batch_if_necessary() {
+    // 1. init batch if first time 
+    // 2. reset reader if end of file
+    if (_scanner_eof || _batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) {
+        while (!_scanner_eof) {
+            Status status = next_arrow_batch();

Review Comment:
   put `Status status` outside of while



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee commented on a diff in pull request #9231: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #9231:
URL: https://github.com/apache/incubator-doris/pull/9231#discussion_r865524101


##########
be/src/vec/exec/vparquet_scanner.cpp:
##########
@@ -0,0 +1,307 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/vparquet_scanner.h"
+#include "exec/parquet_reader.h"
+#include "exprs/expr.h"
+#include "runtime/descriptors.h"
+#include "runtime/exec_env.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/functions/simple_function_factory.h"
+#include "vec/utils/arrow_column_to_doris_column.h"
+
+namespace doris::vectorized {
+
+VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile,
+                               const TBrokerScanRangeParams& params,
+                               const std::vector<TBrokerRangeDesc>& ranges,
+                               const std::vector<TNetworkAddress>& broker_addresses,
+                               const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
+        : ParquetScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter),
+          _batch(nullptr),
+          _arrow_batch_cur_idx(0),
+          _num_of_columns_from_file(0) {}
+VParquetScanner::~VParquetScanner() {
+}
+
+Status VParquetScanner::open() {
+    RETURN_IF_ERROR(ParquetScanner::open());
+    if (_ranges.empty()) {
+        return Status::OK();
+    }
+    auto range = _ranges[0];
+    _num_of_columns_from_file = range.__isset.num_of_columns_from_file
+                                        ? implicit_cast<int>(range.num_of_columns_from_file)
+                                        : implicit_cast<int>(_src_slot_descs.size());
+
+    // check consistency
+    if (range.__isset.num_of_columns_from_file) {
+        int size = range.columns_from_path.size();
+        for (const auto& r : _ranges) {
+            if (r.columns_from_path.size() != size) {
+                return Status::InternalError("ranges have different number of columns.");
+            }
+        }
+    }
+    return Status::OK();
+}
+
+// get next available arrow batch
+Status VParquetScanner::next_arrow_batch() {
+    _arrow_batch_cur_idx = 0;
+    // first, init file reader
+    if (_cur_file_reader == nullptr || _cur_file_eof) {
+        RETURN_IF_ERROR(open_next_reader());
+        _cur_file_eof = false;
+    }
+    // second, loop until find available arrow batch or EOF
+    while (!_scanner_eof) {
+        RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, _src_slot_descs, &_cur_file_eof));
+        if (_cur_file_eof) {
+            RETURN_IF_ERROR(open_next_reader());
+            _cur_file_eof = false;
+            continue;
+        }
+        if (_batch->num_rows() == 0) {
+            continue;
+        }
+        return Status::OK();
+    }
+    return Status::EndOfFile("EOF");
+}
+
+Status VParquetScanner::init_arrow_batch_if_necessary() {
+    // 1. init batch if first time 
+    // 2. reset reader if end of file
+    Status status;
+    if (_scanner_eof || _batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) {
+        while (!_scanner_eof) {
+            status = next_arrow_batch();
+            if (_scanner_eof) {
+                return status;
+            }
+            if (status.is_end_of_file()) {
+                // try next file
+                continue;
+            }
+            return status;
+        }
+    }
+    return status;
+}
+
+Status VParquetScanner::init_src_block(Block* block) {
+    size_t batch_pos = 0;
+    for (auto i = 0; i < _num_of_columns_from_file; ++i) {
+        SlotDescriptor* slot_desc = _src_slot_descs[i];
+        if (slot_desc == nullptr) {
+            continue;
+        }
+        auto* array = _batch->column(batch_pos++).get();
+        auto pt = arrow_type_to_primitive_type(array->type()->id());
+        if (pt == INVALID_TYPE) {
+            return Status::NotSupported(fmt::format(
+                "Not support arrow type:{}", array->type()->name()));
+        }
+        auto is_nullable = true;
+        // let src column be nullable for simplify converting
+        DataTypePtr data_type = DataTypeFactory::instance().create_data_type(pt, is_nullable);
+        MutableColumnPtr data_column = data_type->create_column();
+        block->insert(ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name()));
+    }
+    return Status::OK();
+}
+
+Status VParquetScanner::get_next(std::vector<MutableColumnPtr>& columns, bool* eof) {
+    // overall of type converting: 
+    // arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==>
+    // primitive type(PT1) ==materialize_block==> dest primitive type
+    SCOPED_TIMER(_read_timer);
+    // init arrow batch
+    {
+        Status st = init_arrow_batch_if_necessary();
+        if (!st.ok()) {

Review Comment:
   the code is weird? rethink the logic?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yangzhg commented on a diff in pull request #9231: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
yangzhg commented on code in PR #9231:
URL: https://github.com/apache/incubator-doris/pull/9231#discussion_r858451897


##########
be/src/vec/utils/arrow_column_to_doris_column.h:
##########
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <iostream>
+#include <memory>
+#include <arrow/type.h>
+#include <arrow/type_fwd.h>
+
+#include "common/status.h"
+#include "runtime/primitive_type.h"
+#include "vec/core/column_with_type_and_name.h"
+
+// This files contains some utilities to convert Doris internal
+// data format from Apache Arrow format. 
+namespace doris::vectorized {
+
+const PrimitiveType arrow_type_to_pt(::arrow::Type::type type);

Review Comment:
   `pt` feels a little weird.  `primitive_type` maybe better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee commented on a diff in pull request #9231: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #9231:
URL: https://github.com/apache/incubator-doris/pull/9231#discussion_r865515473


##########
be/src/vec/utils/arrow_column_to_doris_column.cpp:
##########
@@ -0,0 +1,279 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/utils/arrow_column_to_doris_column.h"
+
+#include "vec/columns/column_nullable.h"
+#include "vec/data_types/data_type_decimal.h"
+#include "vec/runtime/vdatetime_value.h"
+
+#include <arrow/record_batch.h>
+#include <arrow/array.h>
+#include <arrow/status.h>
+
+#include "arrow/type.h"
+#include "arrow/array/array_binary.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/scalar.h"
+#include "arrow/type_fwd.h"
+#include "arrow/type_traits.h"
+
+#define FOR_ARROW_TYPES(M) \
+    M(::arrow::Type::BOOL, TYPE_BOOLEAN) \
+    M(::arrow::Type::INT8, TYPE_TINYINT) \
+    M(::arrow::Type::UINT8, TYPE_TINYINT) \

Review Comment:
   UINT8 bigger than tinyint ?may overflow



##########
be/src/vec/CMakeLists.txt:
##########
@@ -189,6 +190,11 @@ set(VEC_FILES
   runtime/vdata_stream_recvr.cpp
   runtime/vdata_stream_mgr.cpp
   runtime/vpartition_info.cpp
+  runtime/vsorted_run_merger.cpp
+  runtime/vload_channel.cpp

Review Comment:
   the code not add in this PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee commented on a diff in pull request #9231: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #9231:
URL: https://github.com/apache/incubator-doris/pull/9231#discussion_r865521312


##########
be/src/vec/utils/arrow_column_to_doris_column.cpp:
##########
@@ -0,0 +1,279 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/utils/arrow_column_to_doris_column.h"
+
+#include "vec/columns/column_nullable.h"
+#include "vec/data_types/data_type_decimal.h"
+#include "vec/runtime/vdatetime_value.h"
+
+#include <arrow/record_batch.h>
+#include <arrow/array.h>
+#include <arrow/status.h>
+
+#include "arrow/type.h"
+#include "arrow/array/array_binary.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/scalar.h"
+#include "arrow/type_fwd.h"
+#include "arrow/type_traits.h"
+
+#define FOR_ARROW_TYPES(M) \
+    M(::arrow::Type::BOOL, TYPE_BOOLEAN) \
+    M(::arrow::Type::INT8, TYPE_TINYINT) \
+    M(::arrow::Type::UINT8, TYPE_TINYINT) \
+    M(::arrow::Type::INT16, TYPE_SMALLINT) \
+    M(::arrow::Type::UINT16, TYPE_SMALLINT) \
+    M(::arrow::Type::INT32, TYPE_INT) \
+    M(::arrow::Type::UINT32, TYPE_INT) \
+    M(::arrow::Type::INT64, TYPE_BIGINT) \
+    M(::arrow::Type::UINT64, TYPE_BIGINT) \
+    M(::arrow::Type::HALF_FLOAT, TYPE_FLOAT) \
+    M(::arrow::Type::FLOAT, TYPE_FLOAT) \
+    M(::arrow::Type::DOUBLE, TYPE_DOUBLE) \
+    M(::arrow::Type::BINARY, TYPE_VARCHAR) \
+    M(::arrow::Type::FIXED_SIZE_BINARY, TYPE_VARCHAR) \
+    M(::arrow::Type::STRING, TYPE_VARCHAR) \
+    M(::arrow::Type::TIMESTAMP, TYPE_DATETIME) \
+    M(::arrow::Type::DATE32, TYPE_DATE) \
+    M(::arrow::Type::DATE64, TYPE_DATETIME) \
+    M(::arrow::Type::DECIMAL, TYPE_DECIMALV2)
+
+#define FOR_ARROW_NUMERIC_TYPES(M) \
+        M(arrow::Type::UINT8, UInt8) \
+        M(arrow::Type::INT8, Int8) \
+        M(arrow::Type::INT16, Int16) \
+        M(arrow::Type::INT32, Int32) \
+        M(arrow::Type::UINT64, UInt64) \
+        M(arrow::Type::INT64, Int64) \
+        M(arrow::Type::HALF_FLOAT, Float32) \
+        M(arrow::Type::FLOAT, Float32) \
+        M(arrow::Type::DOUBLE, Float64)
+
+namespace doris::vectorized {
+
+const PrimitiveType arrow_type_to_primitive_type(::arrow::Type::type type) {
+    switch(type) {
+#    define DISPATCH(ARROW_TYPE, CPP_TYPE) \
+        case ARROW_TYPE: \
+            return CPP_TYPE;
+        FOR_ARROW_TYPES(DISPATCH)
+#    undef DISPATCH
+        default:
+            break;
+    }
+    return INVALID_TYPE;
+}
+
+static size_t fill_nullable_column(const arrow::Array* array, size_t array_idx, vectorized::ColumnNullable* nullable_column,
+                        size_t num_elements) {
+    size_t null_elements_count = 0;
+    NullMap& map_data = nullable_column->get_null_map_data();
+    for (size_t i = 0; i < num_elements; ++i) {
+        auto is_null = array->IsNull(array_idx + i);
+        map_data.emplace_back(is_null);
+        null_elements_count += is_null;
+    }
+    return null_elements_count;
+}
+
+/// Inserts chars and offsets right into internal column data to reduce an overhead.
+/// Internal offsets are shifted by one to the right in comparison with Arrow ones. So the last offset should map to the end of all chars.
+/// Also internal strings are null terminated.
+static Status convert_column_with_string_data(const arrow::Array* array, size_t array_idx,
+                                        MutableColumnPtr& data_column, size_t num_elements) {
+    PaddedPODArray<UInt8> & column_chars_t = assert_cast<ColumnString &>(*data_column).get_chars();
+    PaddedPODArray<UInt32> & column_offsets = assert_cast<ColumnString &>(*data_column).get_offsets();
+
+    const auto & concrete_array = dynamic_cast<const arrow::BinaryArray &>(*array);
+    std::shared_ptr<arrow::Buffer> buffer = concrete_array.value_data();
+
+    for (size_t offset_i = array_idx; offset_i < array_idx + num_elements; ++offset_i) {
+        if (!concrete_array.IsNull(offset_i) && buffer) {
+            const auto * raw_data = buffer->data() + concrete_array.value_offset(offset_i);
+            column_chars_t.insert(raw_data, raw_data + concrete_array.value_length(offset_i));
+        }
+        column_chars_t.emplace_back('\0');
+
+        column_offsets.emplace_back(column_chars_t.size());
+    }
+    return Status::OK();
+}
+
+static Status convert_column_with_fixed_size_data(const arrow::Array* array, size_t array_idx,
+                                        MutableColumnPtr& data_column, size_t num_elements) {
+    PaddedPODArray<UInt8> & column_chars_t = assert_cast<ColumnString &>(*data_column).get_chars();
+    PaddedPODArray<UInt32> & column_offsets = assert_cast<ColumnString &>(*data_column).get_offsets();
+
+    const auto & concrete_array = dynamic_cast<const arrow::FixedSizeBinaryArray &>(*array);
+    uint32_t width = concrete_array.byte_width();
+    const auto* array_data = concrete_array.GetValue(array_idx);
+
+    for (size_t offset_i = 0; offset_i < num_elements; ++offset_i) {
+        if (!concrete_array.IsNull(offset_i)) {
+            const auto * raw_data = array_data + (offset_i * width);
+            column_chars_t.insert(raw_data, raw_data + width);
+        }
+        column_chars_t.emplace_back('\0');
+        column_offsets.emplace_back(column_chars_t.size());
+    }
+    return Status::OK();
+}
+
+/// Inserts numeric data right into internal column data to reduce an overhead
+template <typename NumericType, typename VectorType = ColumnVector<NumericType>>
+static Status convert_column_with_numeric_data(const arrow::Array* array, size_t array_idx,
+                                        MutableColumnPtr& data_column, size_t num_elements){
+    auto & column_data = static_cast<VectorType &>(*data_column).get_data();
+    /// buffers[0] is a null bitmap and buffers[1] are actual values
+    std::shared_ptr<arrow::Buffer> buffer = array->data()->buffers[1];
+    const auto * raw_data = reinterpret_cast<const NumericType *>(buffer->data()) + array_idx;
+    column_data.insert(raw_data, raw_data + num_elements);
+    return Status::OK();
+}
+
+static Status convert_column_with_boolean_data(const arrow::Array* array, size_t array_idx,
+                                        MutableColumnPtr& data_column, size_t num_elements){
+    auto & column_data = static_cast<ColumnVector<UInt8> &>(*data_column).get_data();
+    const auto & concrete_array = dynamic_cast<const arrow::BooleanArray &>(*array);
+    for (size_t bool_i = array_idx; bool_i < array_idx + num_elements; ++bool_i) {
+        column_data.emplace_back(concrete_array.Value(bool_i));
+    }
+    return Status::OK();
+}
+
+static int64_t time_unit_divisor(arrow::TimeUnit::type unit) {
+    // Doris only supports seconds
+    switch (unit) {
+    case arrow::TimeUnit::type::SECOND: {
+        return 1L;
+    }
+    case arrow::TimeUnit::type::MILLI: {
+        return 1000L;
+    }
+    case arrow::TimeUnit::type::MICRO: {
+        return 1000000L;
+    }
+    case arrow::TimeUnit::type::NANO: {
+        return 1000000000L;
+    }
+    default:
+        return 0L;
+    }
+}
+
+template <typename ArrowType>
+static Status convert_column_with_timestamp_data(const arrow::Array* array, size_t array_idx,
+                                        MutableColumnPtr& data_column, size_t num_elements,
+                                        const std::string& timezone){
+    auto & column_data = static_cast<ColumnVector<Int64> &>(*data_column).get_data();
+    const auto & concrete_array = dynamic_cast<const ArrowType &>(*array);
+    int64_t divisor = 1;
+    int64_t multiplier = 1;
+    if constexpr (std::is_same_v<ArrowType, arrow::TimestampArray>) {
+        const auto type = std::static_pointer_cast<arrow::TimestampType>(array->type());
+        divisor = time_unit_divisor(type->unit());
+        if (divisor == 0L) {
+            return Status::InternalError(fmt::format("Invalid Time Type:{}", type->name()));
+        }
+    } else if constexpr (std::is_same_v<ArrowType, arrow::Date32Array>) {
+        multiplier = 24 * 60 * 60; // day => secs
+    } else if constexpr (std::is_same_v<ArrowType, arrow::Date64Array>) {
+        divisor = 1000; //ms => secs
+    }
+
+    for (size_t value_i = array_idx; value_i < array_idx + num_elements; ++value_i) {
+        VecDateTimeValue v;
+        v.from_unixtime(static_cast<Int64>(concrete_array.Value(value_i)) / divisor * multiplier, timezone) ;
+        if constexpr (std::is_same_v<ArrowType, arrow::Date32Array>) {
+            v.cast_to_date();
+        }
+        column_data.emplace_back(binary_cast<VecDateTimeValue, Int64>(v));
+    }
+    return Status::OK();
+}
+
+static Status convert_column_with_decimal_data(const arrow::Array* array, size_t array_idx,
+                                        MutableColumnPtr& data_column, size_t num_elements){
+    auto & column_data = static_cast<ColumnDecimal<vectorized::Decimal128> &>(*data_column).get_data();
+    const auto & concrete_array = dynamic_cast<const arrow::DecimalArray &>(*array);
+    const auto * arrow_decimal_type = static_cast<arrow::DecimalType *>(array->type().get());
+    // TODO check precision
+    //size_t precision = arrow_decimal_type->precision();
+    const auto scale = arrow_decimal_type->scale();
+
+    for (size_t value_i = array_idx; value_i < array_idx + num_elements; ++value_i) {
+        auto value = *reinterpret_cast<const vectorized::Decimal128 *>(concrete_array.Value(value_i));
+        // convert scale to 9
+        if (scale != 9) {
+            value = convert_decimals<vectorized::DataTypeDecimal<vectorized::Decimal128>, 
+                                vectorized::DataTypeDecimal<vectorized::Decimal128>>(
+                                value, scale, 9);
+        }
+        column_data.emplace_back(value);
+    }
+    return Status::OK();
+}
+
+Status arrow_column_to_doris_column(const arrow::Array* arrow_column,

Review Comment:
   Add UT for this class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee commented on a diff in pull request #9231: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #9231:
URL: https://github.com/apache/incubator-doris/pull/9231#discussion_r865524515


##########
be/src/vec/exec/vparquet_scanner.cpp:
##########
@@ -0,0 +1,307 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/vparquet_scanner.h"
+#include "exec/parquet_reader.h"
+#include "exprs/expr.h"
+#include "runtime/descriptors.h"
+#include "runtime/exec_env.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/functions/simple_function_factory.h"
+#include "vec/utils/arrow_column_to_doris_column.h"
+
+namespace doris::vectorized {
+
+VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile,
+                               const TBrokerScanRangeParams& params,
+                               const std::vector<TBrokerRangeDesc>& ranges,
+                               const std::vector<TNetworkAddress>& broker_addresses,
+                               const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
+        : ParquetScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter),
+          _batch(nullptr),
+          _arrow_batch_cur_idx(0),
+          _num_of_columns_from_file(0) {}
+VParquetScanner::~VParquetScanner() {
+}
+
+Status VParquetScanner::open() {
+    RETURN_IF_ERROR(ParquetScanner::open());
+    if (_ranges.empty()) {
+        return Status::OK();
+    }
+    auto range = _ranges[0];
+    _num_of_columns_from_file = range.__isset.num_of_columns_from_file
+                                        ? implicit_cast<int>(range.num_of_columns_from_file)
+                                        : implicit_cast<int>(_src_slot_descs.size());
+
+    // check consistency
+    if (range.__isset.num_of_columns_from_file) {
+        int size = range.columns_from_path.size();
+        for (const auto& r : _ranges) {
+            if (r.columns_from_path.size() != size) {
+                return Status::InternalError("ranges have different number of columns.");
+            }
+        }
+    }
+    return Status::OK();
+}
+
+// get next available arrow batch
+Status VParquetScanner::next_arrow_batch() {
+    _arrow_batch_cur_idx = 0;
+    // first, init file reader
+    if (_cur_file_reader == nullptr || _cur_file_eof) {
+        RETURN_IF_ERROR(open_next_reader());
+        _cur_file_eof = false;
+    }
+    // second, loop until find available arrow batch or EOF
+    while (!_scanner_eof) {
+        RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, _src_slot_descs, &_cur_file_eof));
+        if (_cur_file_eof) {
+            RETURN_IF_ERROR(open_next_reader());
+            _cur_file_eof = false;
+            continue;
+        }
+        if (_batch->num_rows() == 0) {
+            continue;
+        }
+        return Status::OK();
+    }
+    return Status::EndOfFile("EOF");
+}
+
+Status VParquetScanner::init_arrow_batch_if_necessary() {
+    // 1. init batch if first time 
+    // 2. reset reader if end of file
+    Status status;
+    if (_scanner_eof || _batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) {
+        while (!_scanner_eof) {
+            status = next_arrow_batch();
+            if (_scanner_eof) {
+                return status;
+            }
+            if (status.is_end_of_file()) {
+                // try next file
+                continue;
+            }
+            return status;
+        }
+    }
+    return status;
+}
+
+Status VParquetScanner::init_src_block(Block* block) {
+    size_t batch_pos = 0;
+    for (auto i = 0; i < _num_of_columns_from_file; ++i) {
+        SlotDescriptor* slot_desc = _src_slot_descs[i];
+        if (slot_desc == nullptr) {
+            continue;
+        }
+        auto* array = _batch->column(batch_pos++).get();
+        auto pt = arrow_type_to_primitive_type(array->type()->id());
+        if (pt == INVALID_TYPE) {
+            return Status::NotSupported(fmt::format(
+                "Not support arrow type:{}", array->type()->name()));
+        }
+        auto is_nullable = true;
+        // let src column be nullable for simplify converting
+        DataTypePtr data_type = DataTypeFactory::instance().create_data_type(pt, is_nullable);
+        MutableColumnPtr data_column = data_type->create_column();
+        block->insert(ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name()));
+    }
+    return Status::OK();
+}
+
+Status VParquetScanner::get_next(std::vector<MutableColumnPtr>& columns, bool* eof) {
+    // overall of type converting: 
+    // arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==>
+    // primitive type(PT1) ==materialize_block==> dest primitive type
+    SCOPED_TIMER(_read_timer);
+    // init arrow batch
+    {
+        Status st = init_arrow_batch_if_necessary();
+        if (!st.ok()) {
+            if (!st.is_end_of_file()) {
+                return st;
+            }
+            return Status::OK();
+        }
+    }
+    Block src_block;
+    RETURN_IF_ERROR(init_src_block(&src_block));
+    // convert arrow batch to block until reach the batch_size
+    while (!_scanner_eof) {
+        // cast arrow type to PT0 and append it to src block
+        // for example: arrow::Type::INT16 => TYPE_SMALLINT
+        RETURN_IF_ERROR(append_batch_to_src_block(&src_block));
+        // finalize the src block if full
+        if (src_block.rows() >= _state->batch_size()) {
+            break;
+        }
+        auto status = next_arrow_batch();
+        // if ok, append the batch to the src columns
+        if (status.ok()) {
+            continue;
+        }
+        // return error if not EOF
+        if (!status.is_end_of_file()) {
+            return status;
+        }
+        // if src block is not empty, then finalize the block
+        if (src_block.rows() > 0) {
+            break;
+        }
+        _cur_file_eof = true;
+        RETURN_IF_ERROR(next_arrow_batch());
+        // there may be different arrow file, so reinit block here
+        RETURN_IF_ERROR(init_src_block(&src_block));
+    }
+    COUNTER_UPDATE(_rows_read_counter, src_block.rows());
+    SCOPED_TIMER(_materialize_timer);
+    // cast PT0 => PT1
+    // for example: TYPE_SMALLINT => TYPE_VARCHAR
+    RETURN_IF_ERROR(cast_src_block(&src_block));
+    // range of current file
+    fill_columns_from_path(&src_block);
+    RETURN_IF_ERROR(eval_conjunts(&src_block));
+    // materialize, src block => dest columns
+    RETURN_IF_ERROR(materialize_block(&src_block, columns));
+    *eof = _scanner_eof;
+    return Status::OK();
+}
+
+// eval conjuncts, for example: t1 > 1
+Status VParquetScanner::eval_conjunts(Block* block) {
+    for (auto& vctx : _pre_filter_vctxs) {

Review Comment:
   may should change `_pre_filter_vctxs` to a tree to avoid the each time of copy operation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yinzhijian commented on pull request #9231: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
yinzhijian commented on PR #9231:
URL: https://github.com/apache/incubator-doris/pull/9231#issuecomment-1120159918

   close this pr, use this https://github.com/apache/incubator-doris/pull/9433


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org