You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/05/07 06:49:53 UTC

[GitHub] [incubator-doris] yinzhijian opened a new pull request, #9433: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

yinzhijian opened a new pull request, #9433:
URL: https://github.com/apache/incubator-doris/pull/9433

   # Proposed changes
   
   Issue Number: close #xxx
   
   ## Problem Summary:
   
   ## Optimization Todo list:
   1. FE generates the corresponding src slot desc and Expr through the parquet schema
   2. BE supports direct conversion of arrow type into dest primitive type of similar type. For example, arrow type is INT32, and dest type is TYPE_BIGINT (int64), INT32=>TYPE_BIGINT. Instead of the current way: INT32=> TYPE_INT => TYPE_BIGINT
   
   ## Performance Testing:
   load parquet file in vec version almost 1x faster than rowset version.
   rows num:300k
   test table schema:
   CREATE TABLE `parquet` (
     `id` int(11) NOT NULL COMMENT "",
     `email` varchar(26) NOT NULL COMMENT "",
     `c_date32` DATE NOT NULL COMMENT "",
     `c_date64` DATETIME NOT NULL COMMENT "",
     `c_timestamp` DATETIME NOT NULL COMMENT "",
     `c_decimal128` DECIMAL(27, 9) NULL COMMENT "",
     `c_bool` BOOLEAN NULL COMMENT "",
     `c_float` FLOAT NULL COMMENT "",
     `c_double` DOUBLE NULL COMMENT "",
     `c_fixed_size_binary` CHAR(20) NULL COMMENT "",
     `c_binary` VARCHAR(32) NULL COMMENT "",
     `c_uint64` BIGINT NULL COMMENT ""
   )
   DISTRIBUTED BY HASH(`id`) BUCKETS 1
   PROPERTIES (
   "replication_num" = "1"
   );
   
   ## Checklist(Required)
   
   1. Does it affect the original behavior: (No)
   2. Has unit tests been added: (No)
   4. Has document been added or modified: (No Need)
   5. Does it need to update dependencies: (No)
   6. Are there any changes that cannot be rolled back: (No)
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] github-actions[bot] commented on pull request #9433: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #9433:
URL: https://github.com/apache/incubator-doris/pull/9433#issuecomment-1124484744

   PR approved by anyone and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yinzhijian commented on a diff in pull request #9433: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
yinzhijian commented on code in PR #9433:
URL: https://github.com/apache/incubator-doris/pull/9433#discussion_r871144364


##########
fe/fe-core/src/main/java/org/apache/doris/load/Load.java:
##########
@@ -1044,30 +1047,61 @@ private static void initColumns(Table tbl, List<ImportColumnDesc> columnExprs,
         if (!needInitSlotAndAnalyzeExprs) {
             return;
         }
-
+        Set<String> exprArgsColumns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
+        for (ImportColumnDesc importColumnDesc : copiedColumnExprs) {
+            if (importColumnDesc.isColumn()) {
+                continue;
+            }
+            List<SlotRef> slots = Lists.newArrayList();
+            importColumnDesc.getExpr().collect(SlotRef.class, slots);
+            for (SlotRef slot : slots) {
+                String slotColumnName = slot.getColumnName();
+                exprArgsColumns.add(slotColumnName);
+            }
+        }
+        Set<String> excludedColumns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
         // init slot desc add expr map, also transform hadoop functions
         for (ImportColumnDesc importColumnDesc : copiedColumnExprs) {
             // make column name case match with real column name
             String columnName = importColumnDesc.getColumnName();
             String realColName;
-            if (tbl.getColumn(columnName) == null || importColumnDesc.getExpr() == null) {
+            if (tblColumn == null || importColumnDesc.getExpr() == null) {
                 realColName = columnName;
             } else {
-                realColName = tbl.getColumn(columnName).getName();
+                realColName = tblColumn.getName();
             }
             if (importColumnDesc.getExpr() != null) {
                 Expr expr = transformHadoopFunctionExpr(tbl, realColName, importColumnDesc.getExpr());
                 exprsByName.put(realColName, expr);
             } else {
                 SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(srcTupleDesc);
-                slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
+                // only support parquet format now
+                if (useVectorizedLoad  && formatType == TFileFormatType.FORMAT_PARQUET
+                        && tblColumn != null) {
+                    // in vectorized load
+                    if (exprArgsColumns.contains(columnName)) {

Review Comment:
   A is varchar type. Suppose A is datetime in the schema , INT in the expression, so it is not sure whether to use the type of the schema or the inferred type of the expression.
   
   B follows the exprsByName logic, assuming that B is int in the schema, then cast((a+1) as int)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on pull request #9433: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
morningman commented on PR #9433:
URL: https://github.com/apache/incubator-doris/pull/9433#issuecomment-1120202616

   And please fix the FE ut.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a diff in pull request #9433: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
morningman commented on code in PR #9433:
URL: https://github.com/apache/incubator-doris/pull/9433#discussion_r867320783


##########
be/src/vec/exec/vparquet_scanner.h:
##########
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <vector>
+#include <unordered_map>
+
+#include <arrow/array.h>
+#include "common/status.h"
+#include <exec/parquet_scanner.h>
+#include "gen_cpp/PlanNodes_types.h"
+#include "gen_cpp/Types_types.h"
+#include "runtime/mem_pool.h"
+#include "util/runtime_profile.h"
+
+namespace doris::vectorized {
+
+// VParquet scanner convert the data read from Parquet to doris's columns.
+class VParquetScanner : public ParquetScanner {
+public:
+    VParquetScanner(RuntimeState* state, RuntimeProfile* profile,
+                    const TBrokerScanRangeParams& params,
+                    const std::vector<TBrokerRangeDesc>& ranges,
+                    const std::vector<TNetworkAddress>& broker_addresses,
+                    const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
+
+    virtual ~VParquetScanner();
+
+    // Open this scanner, will initialize information need to
+    Status open();
+
+    Status get_next(std::vector<MutableColumnPtr>& columns, bool* eof);
+
+private:
+    Status next_arrow_batch();

Review Comment:
   ```suggestion
       Status _next_arrow_batch();
   ```
   Same as other private method



##########
fe/fe-core/src/main/java/org/apache/doris/load/Load.java:
##########
@@ -1044,26 +1047,52 @@ private static void initColumns(Table tbl, List<ImportColumnDesc> columnExprs,
         if (!needInitSlotAndAnalyzeExprs) {
             return;
         }
-
+        Set<String> exprArgsColumns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
+        for (ImportColumnDesc importColumnDesc : copiedColumnExprs) {
+            if (importColumnDesc.isColumn()) {
+                continue;
+            }
+            List<SlotRef> slots = Lists.newArrayList();
+            importColumnDesc.getExpr().collect(SlotRef.class, slots);
+            for (SlotRef slot : slots) {
+                String slotColumnName = slot.getColumnName();
+                exprArgsColumns.add(slotColumnName);
+            }
+        }
+        Set<String> excludedColumns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
         // init slot desc add expr map, also transform hadoop functions
         for (ImportColumnDesc importColumnDesc : copiedColumnExprs) {
             // make column name case match with real column name
             String columnName = importColumnDesc.getColumnName();
-            String realColName = tbl.getColumn(columnName) == null ? columnName
+            Column tblColumn = tbl.getColumn(columnName);
+            String realColName =  tblColumn == null ? columnName
                     : tbl.getColumn(columnName).getName();
             if (importColumnDesc.getExpr() != null) {
                 Expr expr = transformHadoopFunctionExpr(tbl, realColName, importColumnDesc.getExpr());
                 exprsByName.put(realColName, expr);
             } else {
                 SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(srcTupleDesc);
-                slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
+                // only support parquet format now
+                if (exprArgsColumns.contains(columnName) || formatType != TFileFormatType.FORMAT_PARQUET
+                    || !useVectorizedLoad) {
+                    // columns in expr args should be parsed as varchar type
+                    slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
+                    slotDesc.setColumn(new Column(realColName, PrimitiveType.VARCHAR));
+                    excludedColumns.add(realColName);
+                    // ISSUE A: src slot should be nullable even if the column is not nullable.
+                    // because src slot is what we read from file, not represent to real column value.
+                    // If column is not nullable, error will be thrown when filling the dest slot,
+                    // which is not nullable.
+                    slotDesc.setIsNullable(true);
+                } else {
+                    // in vectorized load,
+                    // columns from files like parquet files can be parsed as the type in table schema
+                    slotDesc.setType(tblColumn.getType());
+                    slotDesc.setColumn(new Column(realColName, tblColumn.getType()));
+                    // non-nullable column is allowed in vectorized load with parquet format
+                    slotDesc.setIsNullable(tblColumn.isAllowNull());

Review Comment:
   Even if this column does not have expr, it may still be nullable in parquet file?
   So I think this should be set to true, too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yinzhijian commented on pull request #9433: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
yinzhijian commented on PR #9433:
URL: https://github.com/apache/incubator-doris/pull/9433#issuecomment-1121098412

   > And please fix the FE ut.
   
   done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] github-actions[bot] commented on pull request #9433: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #9433:
URL: https://github.com/apache/incubator-doris/pull/9433#issuecomment-1125785804

   PR approved by at least one committer and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] EmmyMiao87 commented on a diff in pull request #9433: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
EmmyMiao87 commented on code in PR #9433:
URL: https://github.com/apache/incubator-doris/pull/9433#discussion_r871026218


##########
fe/fe-core/src/main/java/org/apache/doris/load/Load.java:
##########
@@ -1044,30 +1047,61 @@ private static void initColumns(Table tbl, List<ImportColumnDesc> columnExprs,
         if (!needInitSlotAndAnalyzeExprs) {
             return;
         }
-
+        Set<String> exprArgsColumns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);

Review Comment:
   Named "exprSrcSlotName" 



##########
fe/fe-core/src/main/java/org/apache/doris/load/Load.java:
##########
@@ -1044,26 +1047,57 @@ private static void initColumns(Table tbl, List<ImportColumnDesc> columnExprs,
         if (!needInitSlotAndAnalyzeExprs) {
             return;
         }
-
+        Set<String> exprArgsColumns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
+        for (ImportColumnDesc importColumnDesc : copiedColumnExprs) {
+            if (importColumnDesc.isColumn()) {
+                continue;
+            }
+            List<SlotRef> slots = Lists.newArrayList();
+            importColumnDesc.getExpr().collect(SlotRef.class, slots);
+            for (SlotRef slot : slots) {
+                String slotColumnName = slot.getColumnName();
+                exprArgsColumns.add(slotColumnName);
+            }
+        }
+        Set<String> excludedColumns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
         // init slot desc add expr map, also transform hadoop functions
         for (ImportColumnDesc importColumnDesc : copiedColumnExprs) {
             // make column name case match with real column name
             String columnName = importColumnDesc.getColumnName();
-            String realColName = tbl.getColumn(columnName) == null ? columnName
-                    : tbl.getColumn(columnName).getName();
+            Column tblColumn = tbl.getColumn(columnName);
+            String realColName =  tblColumn == null ? columnName : tblColumn.getName();
             if (importColumnDesc.getExpr() != null) {
                 Expr expr = transformHadoopFunctionExpr(tbl, realColName, importColumnDesc.getExpr());
                 exprsByName.put(realColName, expr);
             } else {
                 SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(srcTupleDesc);
-                slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
+                // only support parquet format now
+                if (useVectorizedLoad  && formatType == TFileFormatType.FORMAT_PARQUET
+                        && tblColumn != null) {
+                    // in vectorized load
+                    if (exprArgsColumns.contains(columnName)) {
+                        // columns in expr args should be varchar type
+                        slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
+                        slotDesc.setColumn(new Column(realColName, PrimitiveType.VARCHAR));
+                        excludedColumns.add(realColName);
+                    } else {
+                        // columns from files like parquet files can be parsed as the type in table schema
+                        slotDesc.setType(tblColumn.getType());
+                        slotDesc.setColumn(new Column(realColName, tblColumn.getType()));
+                    }
+                    // non-nullable column is allowed in vectorized load with parquet format

Review Comment:
   If the src slot is null, it will be error



##########
fe/fe-core/src/main/java/org/apache/doris/load/Load.java:
##########
@@ -1044,30 +1047,61 @@ private static void initColumns(Table tbl, List<ImportColumnDesc> columnExprs,
         if (!needInitSlotAndAnalyzeExprs) {
             return;
         }
-
+        Set<String> exprArgsColumns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
+        for (ImportColumnDesc importColumnDesc : copiedColumnExprs) {
+            if (importColumnDesc.isColumn()) {
+                continue;
+            }
+            List<SlotRef> slots = Lists.newArrayList();
+            importColumnDesc.getExpr().collect(SlotRef.class, slots);
+            for (SlotRef slot : slots) {
+                String slotColumnName = slot.getColumnName();
+                exprArgsColumns.add(slotColumnName);
+            }
+        }
+        Set<String> excludedColumns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
         // init slot desc add expr map, also transform hadoop functions
         for (ImportColumnDesc importColumnDesc : copiedColumnExprs) {
             // make column name case match with real column name
             String columnName = importColumnDesc.getColumnName();
             String realColName;
-            if (tbl.getColumn(columnName) == null || importColumnDesc.getExpr() == null) {
+            if (tblColumn == null || importColumnDesc.getExpr() == null) {
                 realColName = columnName;
             } else {
-                realColName = tbl.getColumn(columnName).getName();
+                realColName = tblColumn.getName();
             }
             if (importColumnDesc.getExpr() != null) {
                 Expr expr = transformHadoopFunctionExpr(tbl, realColName, importColumnDesc.getExpr());
                 exprsByName.put(realColName, expr);
             } else {
                 SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(srcTupleDesc);
-                slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
+                // only support parquet format now
+                if (useVectorizedLoad  && formatType == TFileFormatType.FORMAT_PARQUET
+                        && tblColumn != null) {
+                    // in vectorized load
+                    if (exprArgsColumns.contains(columnName)) {

Review Comment:
   How about following situation ?
   table schema: a, b
   src schema : a
   src expr: a, b= a+1 



##########
fe/fe-core/src/main/java/org/apache/doris/load/Load.java:
##########
@@ -1044,30 +1047,61 @@ private static void initColumns(Table tbl, List<ImportColumnDesc> columnExprs,
         if (!needInitSlotAndAnalyzeExprs) {
             return;
         }
-
+        Set<String> exprArgsColumns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
+        for (ImportColumnDesc importColumnDesc : copiedColumnExprs) {
+            if (importColumnDesc.isColumn()) {
+                continue;
+            }
+            List<SlotRef> slots = Lists.newArrayList();
+            importColumnDesc.getExpr().collect(SlotRef.class, slots);
+            for (SlotRef slot : slots) {
+                String slotColumnName = slot.getColumnName();
+                exprArgsColumns.add(slotColumnName);
+            }
+        }
+        Set<String> excludedColumns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
         // init slot desc add expr map, also transform hadoop functions
         for (ImportColumnDesc importColumnDesc : copiedColumnExprs) {
             // make column name case match with real column name
             String columnName = importColumnDesc.getColumnName();
             String realColName;
-            if (tbl.getColumn(columnName) == null || importColumnDesc.getExpr() == null) {
+            if (tblColumn == null || importColumnDesc.getExpr() == null) {

Review Comment:
   Where did you declare this variable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] github-actions[bot] commented on pull request #9433: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #9433:
URL: https://github.com/apache/incubator-doris/pull/9433#issuecomment-1124484720

   PR approved by at least one committer and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee commented on a diff in pull request #9433: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #9433:
URL: https://github.com/apache/incubator-doris/pull/9433#discussion_r868793698


##########
be/src/vec/exec/vparquet_scanner.cpp:
##########
@@ -0,0 +1,311 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/vparquet_scanner.h"
+#include "exec/parquet_reader.h"
+#include "exprs/expr.h"
+#include "runtime/descriptors.h"
+#include "runtime/exec_env.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/functions/simple_function_factory.h"
+#include "vec/utils/arrow_column_to_doris_column.h"
+
+namespace doris::vectorized {
+
+VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile,
+                                 const TBrokerScanRangeParams& params,
+                                 const std::vector<TBrokerRangeDesc>& ranges,
+                                 const std::vector<TNetworkAddress>& broker_addresses,
+                                 const std::vector<TExpr>& pre_filter_texprs,
+                                 ScannerCounter* counter)
+        : ParquetScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs,
+                         counter),
+          _batch(nullptr),
+          _arrow_batch_cur_idx(0),
+          _num_of_columns_from_file(0) {}
+VParquetScanner::~VParquetScanner() {}
+
+Status VParquetScanner::open() {
+    RETURN_IF_ERROR(ParquetScanner::open());
+    if (_ranges.empty()) {
+        return Status::OK();
+    }
+    auto range = _ranges[0];
+    _num_of_columns_from_file = range.__isset.num_of_columns_from_file
+                                        ? implicit_cast<int>(range.num_of_columns_from_file)
+                                        : implicit_cast<int>(_src_slot_descs.size());
+
+    // check consistency
+    if (range.__isset.num_of_columns_from_file) {
+        int size = range.columns_from_path.size();
+        for (const auto& r : _ranges) {
+            if (r.columns_from_path.size() != size) {
+                return Status::InternalError("ranges have different number of columns.");
+            }
+        }
+    }
+    return Status::OK();
+}
+
+// get next available arrow batch
+Status VParquetScanner::_next_arrow_batch() {
+    _arrow_batch_cur_idx = 0;
+    // first, init file reader
+    if (_cur_file_reader == nullptr || _cur_file_eof) {
+        RETURN_IF_ERROR(open_next_reader());
+        _cur_file_eof = false;
+    }
+    // second, loop until find available arrow batch or EOF
+    while (!_scanner_eof) {
+        RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, _src_slot_descs, &_cur_file_eof));
+        if (_cur_file_eof) {
+            RETURN_IF_ERROR(open_next_reader());
+            _cur_file_eof = false;
+            continue;
+        }
+        if (_batch->num_rows() == 0) {
+            continue;
+        }
+        return Status::OK();
+    }
+    return Status::EndOfFile("EOF");
+}
+
+Status VParquetScanner::_init_arrow_batch_if_necessary() {
+    // 1. init batch if first time
+    // 2. reset reader if end of file
+    Status status;
+    if (_scanner_eof || _batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) {

Review Comment:
   `if( !_scanner_eof)`



##########
be/src/vec/exec/vparquet_scanner.cpp:
##########
@@ -0,0 +1,311 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/vparquet_scanner.h"
+#include "exec/parquet_reader.h"
+#include "exprs/expr.h"
+#include "runtime/descriptors.h"
+#include "runtime/exec_env.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/functions/simple_function_factory.h"
+#include "vec/utils/arrow_column_to_doris_column.h"
+
+namespace doris::vectorized {
+
+VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile,
+                                 const TBrokerScanRangeParams& params,
+                                 const std::vector<TBrokerRangeDesc>& ranges,
+                                 const std::vector<TNetworkAddress>& broker_addresses,
+                                 const std::vector<TExpr>& pre_filter_texprs,
+                                 ScannerCounter* counter)
+        : ParquetScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs,
+                         counter),
+          _batch(nullptr),
+          _arrow_batch_cur_idx(0),
+          _num_of_columns_from_file(0) {}
+VParquetScanner::~VParquetScanner() {}
+
+Status VParquetScanner::open() {
+    RETURN_IF_ERROR(ParquetScanner::open());
+    if (_ranges.empty()) {
+        return Status::OK();
+    }
+    auto range = _ranges[0];
+    _num_of_columns_from_file = range.__isset.num_of_columns_from_file
+                                        ? implicit_cast<int>(range.num_of_columns_from_file)
+                                        : implicit_cast<int>(_src_slot_descs.size());
+
+    // check consistency
+    if (range.__isset.num_of_columns_from_file) {
+        int size = range.columns_from_path.size();
+        for (const auto& r : _ranges) {
+            if (r.columns_from_path.size() != size) {
+                return Status::InternalError("ranges have different number of columns.");
+            }
+        }
+    }
+    return Status::OK();
+}
+
+// get next available arrow batch
+Status VParquetScanner::_next_arrow_batch() {
+    _arrow_batch_cur_idx = 0;
+    // first, init file reader
+    if (_cur_file_reader == nullptr || _cur_file_eof) {
+        RETURN_IF_ERROR(open_next_reader());
+        _cur_file_eof = false;
+    }
+    // second, loop until find available arrow batch or EOF
+    while (!_scanner_eof) {
+        RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, _src_slot_descs, &_cur_file_eof));
+        if (_cur_file_eof) {
+            RETURN_IF_ERROR(open_next_reader());
+            _cur_file_eof = false;
+            continue;
+        }
+        if (_batch->num_rows() == 0) {
+            continue;
+        }
+        return Status::OK();
+    }
+    return Status::EndOfFile("EOF");
+}
+
+Status VParquetScanner::_init_arrow_batch_if_necessary() {
+    // 1. init batch if first time
+    // 2. reset reader if end of file
+    Status status;
+    if (_scanner_eof || _batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) {
+        while (!_scanner_eof) {
+            status = _next_arrow_batch();
+            if (_scanner_eof) {
+                return status;
+            }
+            if (status.is_end_of_file()) {
+                // try next file
+                continue;
+            }
+            return status;
+        }
+    }
+    return status;
+}
+
+Status VParquetScanner::_init_src_block(Block* block) {
+    size_t batch_pos = 0;
+    for (auto i = 0; i < _num_of_columns_from_file; ++i) {
+        SlotDescriptor* slot_desc = _src_slot_descs[i];
+        if (slot_desc == nullptr) {
+            continue;
+        }
+        auto* array = _batch->column(batch_pos++).get();
+        // let src column be nullable for simplify converting
+        auto is_nullable = true;

Review Comment:
   nullable will make exec slowly, why do not get nullable from slot_desc?



##########
be/src/vec/exec/vparquet_scanner.cpp:
##########
@@ -0,0 +1,311 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/vparquet_scanner.h"
+#include "exec/parquet_reader.h"
+#include "exprs/expr.h"
+#include "runtime/descriptors.h"
+#include "runtime/exec_env.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/functions/simple_function_factory.h"
+#include "vec/utils/arrow_column_to_doris_column.h"
+
+namespace doris::vectorized {
+
+VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile,
+                                 const TBrokerScanRangeParams& params,
+                                 const std::vector<TBrokerRangeDesc>& ranges,
+                                 const std::vector<TNetworkAddress>& broker_addresses,
+                                 const std::vector<TExpr>& pre_filter_texprs,
+                                 ScannerCounter* counter)
+        : ParquetScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs,
+                         counter),
+          _batch(nullptr),
+          _arrow_batch_cur_idx(0),
+          _num_of_columns_from_file(0) {}
+VParquetScanner::~VParquetScanner() {}
+
+Status VParquetScanner::open() {
+    RETURN_IF_ERROR(ParquetScanner::open());
+    if (_ranges.empty()) {
+        return Status::OK();
+    }
+    auto range = _ranges[0];
+    _num_of_columns_from_file = range.__isset.num_of_columns_from_file
+                                        ? implicit_cast<int>(range.num_of_columns_from_file)
+                                        : implicit_cast<int>(_src_slot_descs.size());
+
+    // check consistency
+    if (range.__isset.num_of_columns_from_file) {
+        int size = range.columns_from_path.size();
+        for (const auto& r : _ranges) {
+            if (r.columns_from_path.size() != size) {
+                return Status::InternalError("ranges have different number of columns.");
+            }
+        }
+    }
+    return Status::OK();
+}
+
+// get next available arrow batch
+Status VParquetScanner::_next_arrow_batch() {
+    _arrow_batch_cur_idx = 0;
+    // first, init file reader
+    if (_cur_file_reader == nullptr || _cur_file_eof) {
+        RETURN_IF_ERROR(open_next_reader());
+        _cur_file_eof = false;
+    }
+    // second, loop until find available arrow batch or EOF
+    while (!_scanner_eof) {
+        RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, _src_slot_descs, &_cur_file_eof));
+        if (_cur_file_eof) {
+            RETURN_IF_ERROR(open_next_reader());
+            _cur_file_eof = false;
+            continue;
+        }
+        if (_batch->num_rows() == 0) {
+            continue;
+        }
+        return Status::OK();
+    }
+    return Status::EndOfFile("EOF");
+}
+
+Status VParquetScanner::_init_arrow_batch_if_necessary() {
+    // 1. init batch if first time
+    // 2. reset reader if end of file
+    Status status;
+    if (_scanner_eof || _batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) {
+        while (!_scanner_eof) {
+            status = _next_arrow_batch();
+            if (_scanner_eof) {
+                return status;
+            }
+            if (status.is_end_of_file()) {
+                // try next file
+                continue;
+            }
+            return status;
+        }
+    }
+    return status;
+}
+
+Status VParquetScanner::_init_src_block(Block* block) {
+    size_t batch_pos = 0;
+    for (auto i = 0; i < _num_of_columns_from_file; ++i) {
+        SlotDescriptor* slot_desc = _src_slot_descs[i];
+        if (slot_desc == nullptr) {
+            continue;
+        }
+        auto* array = _batch->column(batch_pos++).get();
+        // let src column be nullable for simplify converting
+        auto is_nullable = true;
+        DataTypePtr data_type =
+                DataTypeFactory::instance().create_data_type(array->type()->id(), is_nullable);
+        if (data_type == nullptr) {
+            return Status::NotSupported(
+                    fmt::format("Not support arrow type:{}", array->type()->name()));
+        }
+        MutableColumnPtr data_column = data_type->create_column();
+        block->insert(
+                ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name()));
+    }
+    return Status::OK();
+}
+
+Status VParquetScanner::get_next(vectorized::Block* block, bool* eof) {
+    // overall of type converting:
+    // arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==>
+    // primitive type(PT1) ==materialize_block==> dest primitive type
+    SCOPED_TIMER(_read_timer);
+    // init arrow batch
+    {
+        Status st = _init_arrow_batch_if_necessary();
+        if (!st.ok()) {
+            if (!st.is_end_of_file()) {
+                return st;
+            }
+            *eof = true;
+            return Status::OK();
+        }
+    }
+    Block src_block;
+    RETURN_IF_ERROR(_init_src_block(&src_block));
+    // convert arrow batch to block until reach the batch_size
+    while (!_scanner_eof) {
+        // cast arrow type to PT0 and append it to src block
+        // for example: arrow::Type::INT16 => TYPE_SMALLINT
+        RETURN_IF_ERROR(_append_batch_to_src_block(&src_block));
+        // finalize the src block if full
+        if (src_block.rows() >= _state->batch_size()) {
+            break;
+        }
+        auto status = _next_arrow_batch();
+        // if ok, append the batch to the src columns
+        if (status.ok()) {
+            continue;
+        }
+        // return error if not EOF
+        if (!status.is_end_of_file()) {
+            return status;
+        }
+        // if src block is not empty, then finalize the block
+        if (src_block.rows() > 0) {
+            break;
+        }
+        _cur_file_eof = true;
+        RETURN_IF_ERROR(_next_arrow_batch());
+        // there may be different arrow file, so reinit block here
+        RETURN_IF_ERROR(_init_src_block(&src_block));
+    }
+    COUNTER_UPDATE(_rows_read_counter, src_block.rows());
+    SCOPED_TIMER(_materialize_timer);
+    // cast PT0 => PT1
+    // for example: TYPE_SMALLINT => TYPE_VARCHAR
+    RETURN_IF_ERROR(_cast_src_block(&src_block));
+    // range of current file
+    _fill_columns_from_path(&src_block);
+    RETURN_IF_ERROR(_eval_conjunts(&src_block));
+    // materialize, src block => dest columns
+    RETURN_IF_ERROR(_materialize_block(&src_block, block));
+    *eof = _scanner_eof;
+    return Status::OK();
+}
+
+// eval conjuncts, for example: t1 > 1
+Status VParquetScanner::_eval_conjunts(Block* block) {
+    for (auto& vctx : _vpre_filter_ctxs) {
+        size_t orig_rows = block->rows();
+        RETURN_IF_ERROR(VExprContext::filter_block(vctx, block, block->columns()));
+        _counter->num_rows_unselected += orig_rows - block->rows();
+    }
+    return Status::OK();
+}
+
+void VParquetScanner::_fill_columns_from_path(Block* block) {
+    const TBrokerRangeDesc& range = _ranges.at(_next_range - 1);
+    if (range.__isset.num_of_columns_from_file) {
+        int start = range.num_of_columns_from_file;
+        int rows = block->rows();

Review Comment:
   auto or size_t



##########
be/src/vec/exec/vparquet_scanner.cpp:
##########
@@ -0,0 +1,311 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/vparquet_scanner.h"
+#include "exec/parquet_reader.h"
+#include "exprs/expr.h"
+#include "runtime/descriptors.h"
+#include "runtime/exec_env.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/functions/simple_function_factory.h"
+#include "vec/utils/arrow_column_to_doris_column.h"
+
+namespace doris::vectorized {
+
+VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile,
+                                 const TBrokerScanRangeParams& params,
+                                 const std::vector<TBrokerRangeDesc>& ranges,
+                                 const std::vector<TNetworkAddress>& broker_addresses,
+                                 const std::vector<TExpr>& pre_filter_texprs,
+                                 ScannerCounter* counter)
+        : ParquetScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs,
+                         counter),
+          _batch(nullptr),
+          _arrow_batch_cur_idx(0),
+          _num_of_columns_from_file(0) {}
+VParquetScanner::~VParquetScanner() {}
+
+Status VParquetScanner::open() {
+    RETURN_IF_ERROR(ParquetScanner::open());
+    if (_ranges.empty()) {
+        return Status::OK();
+    }
+    auto range = _ranges[0];
+    _num_of_columns_from_file = range.__isset.num_of_columns_from_file
+                                        ? implicit_cast<int>(range.num_of_columns_from_file)
+                                        : implicit_cast<int>(_src_slot_descs.size());
+
+    // check consistency
+    if (range.__isset.num_of_columns_from_file) {
+        int size = range.columns_from_path.size();
+        for (const auto& r : _ranges) {
+            if (r.columns_from_path.size() != size) {
+                return Status::InternalError("ranges have different number of columns.");
+            }
+        }
+    }
+    return Status::OK();
+}
+
+// get next available arrow batch
+Status VParquetScanner::_next_arrow_batch() {
+    _arrow_batch_cur_idx = 0;
+    // first, init file reader
+    if (_cur_file_reader == nullptr || _cur_file_eof) {
+        RETURN_IF_ERROR(open_next_reader());
+        _cur_file_eof = false;
+    }
+    // second, loop until find available arrow batch or EOF
+    while (!_scanner_eof) {
+        RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, _src_slot_descs, &_cur_file_eof));
+        if (_cur_file_eof) {
+            RETURN_IF_ERROR(open_next_reader());
+            _cur_file_eof = false;
+            continue;
+        }
+        if (_batch->num_rows() == 0) {
+            continue;
+        }
+        return Status::OK();
+    }
+    return Status::EndOfFile("EOF");
+}
+
+Status VParquetScanner::_init_arrow_batch_if_necessary() {
+    // 1. init batch if first time
+    // 2. reset reader if end of file
+    Status status;
+    if (_scanner_eof || _batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) {
+        while (!_scanner_eof) {
+            status = _next_arrow_batch();
+            if (_scanner_eof) {
+                return status;
+            }
+            if (status.is_end_of_file()) {
+                // try next file
+                continue;
+            }
+            return status;
+        }
+    }
+    return status;
+}
+
+Status VParquetScanner::_init_src_block(Block* block) {
+    size_t batch_pos = 0;
+    for (auto i = 0; i < _num_of_columns_from_file; ++i) {
+        SlotDescriptor* slot_desc = _src_slot_descs[i];
+        if (slot_desc == nullptr) {
+            continue;
+        }
+        auto* array = _batch->column(batch_pos++).get();
+        // let src column be nullable for simplify converting
+        auto is_nullable = true;
+        DataTypePtr data_type =
+                DataTypeFactory::instance().create_data_type(array->type()->id(), is_nullable);
+        if (data_type == nullptr) {
+            return Status::NotSupported(
+                    fmt::format("Not support arrow type:{}", array->type()->name()));
+        }
+        MutableColumnPtr data_column = data_type->create_column();
+        block->insert(
+                ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name()));
+    }
+    return Status::OK();
+}
+
+Status VParquetScanner::get_next(vectorized::Block* block, bool* eof) {
+    // overall of type converting:
+    // arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==>

Review Comment:
   Here we need more detailed comments on the reading process. Why are there two stages of pT0 and pT1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee commented on a diff in pull request #9433: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #9433:
URL: https://github.com/apache/incubator-doris/pull/9433#discussion_r867790723


##########
fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java:
##########
@@ -263,6 +263,9 @@ private void setOptionalFromTSLPutRequest(TStreamLoadPutRequest request) throws
             case FILE_STREAM:

Review Comment:
   case FILE_LOCAL:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee merged pull request #9433: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
HappenLee merged PR #9433:
URL: https://github.com/apache/incubator-doris/pull/9433


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yinzhijian commented on a diff in pull request #9433: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
yinzhijian commented on code in PR #9433:
URL: https://github.com/apache/incubator-doris/pull/9433#discussion_r867502807


##########
fe/fe-core/src/main/java/org/apache/doris/load/Load.java:
##########
@@ -1044,26 +1047,52 @@ private static void initColumns(Table tbl, List<ImportColumnDesc> columnExprs,
         if (!needInitSlotAndAnalyzeExprs) {
             return;
         }
-
+        Set<String> exprArgsColumns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
+        for (ImportColumnDesc importColumnDesc : copiedColumnExprs) {
+            if (importColumnDesc.isColumn()) {
+                continue;
+            }
+            List<SlotRef> slots = Lists.newArrayList();
+            importColumnDesc.getExpr().collect(SlotRef.class, slots);
+            for (SlotRef slot : slots) {
+                String slotColumnName = slot.getColumnName();
+                exprArgsColumns.add(slotColumnName);
+            }
+        }
+        Set<String> excludedColumns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
         // init slot desc add expr map, also transform hadoop functions
         for (ImportColumnDesc importColumnDesc : copiedColumnExprs) {
             // make column name case match with real column name
             String columnName = importColumnDesc.getColumnName();
-            String realColName = tbl.getColumn(columnName) == null ? columnName
+            Column tblColumn = tbl.getColumn(columnName);
+            String realColName =  tblColumn == null ? columnName
                     : tbl.getColumn(columnName).getName();
             if (importColumnDesc.getExpr() != null) {
                 Expr expr = transformHadoopFunctionExpr(tbl, realColName, importColumnDesc.getExpr());
                 exprsByName.put(realColName, expr);
             } else {
                 SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(srcTupleDesc);
-                slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
+                // only support parquet format now
+                if (exprArgsColumns.contains(columnName) || formatType != TFileFormatType.FORMAT_PARQUET
+                    || !useVectorizedLoad) {
+                    // columns in expr args should be parsed as varchar type
+                    slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
+                    slotDesc.setColumn(new Column(realColName, PrimitiveType.VARCHAR));
+                    excludedColumns.add(realColName);
+                    // ISSUE A: src slot should be nullable even if the column is not nullable.
+                    // because src slot is what we read from file, not represent to real column value.
+                    // If column is not nullable, error will be thrown when filling the dest slot,
+                    // which is not nullable.
+                    slotDesc.setIsNullable(true);
+                } else {
+                    // in vectorized load,
+                    // columns from files like parquet files can be parsed as the type in table schema
+                    slotDesc.setType(tblColumn.getType());
+                    slotDesc.setColumn(new Column(realColName, tblColumn.getType()));
+                    // non-nullable column is allowed in vectorized load with parquet format
+                    slotDesc.setIsNullable(tblColumn.isAllowNull());

Review Comment:
   This is for in future versions, where doris column is not nullable and parquet is nullable,we can filter out unwanted values ​​before executing expressions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yinzhijian commented on pull request #9433: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
yinzhijian commented on PR #9433:
URL: https://github.com/apache/incubator-doris/pull/9433#issuecomment-1121097884

   > **Code Quality Analysis / Build (pull_request) ** Failing after 3m — Build
   
   done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yinzhijian commented on a diff in pull request #9433: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
yinzhijian commented on code in PR #9433:
URL: https://github.com/apache/incubator-doris/pull/9433#discussion_r868946549


##########
be/src/vec/exec/vparquet_scanner.cpp:
##########
@@ -0,0 +1,311 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/vparquet_scanner.h"
+#include "exec/parquet_reader.h"
+#include "exprs/expr.h"
+#include "runtime/descriptors.h"
+#include "runtime/exec_env.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/functions/simple_function_factory.h"
+#include "vec/utils/arrow_column_to_doris_column.h"
+
+namespace doris::vectorized {
+
+VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile,
+                                 const TBrokerScanRangeParams& params,
+                                 const std::vector<TBrokerRangeDesc>& ranges,
+                                 const std::vector<TNetworkAddress>& broker_addresses,
+                                 const std::vector<TExpr>& pre_filter_texprs,
+                                 ScannerCounter* counter)
+        : ParquetScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs,
+                         counter),
+          _batch(nullptr),
+          _arrow_batch_cur_idx(0),
+          _num_of_columns_from_file(0) {}
+VParquetScanner::~VParquetScanner() {}
+
+Status VParquetScanner::open() {
+    RETURN_IF_ERROR(ParquetScanner::open());
+    if (_ranges.empty()) {
+        return Status::OK();
+    }
+    auto range = _ranges[0];
+    _num_of_columns_from_file = range.__isset.num_of_columns_from_file
+                                        ? implicit_cast<int>(range.num_of_columns_from_file)
+                                        : implicit_cast<int>(_src_slot_descs.size());
+
+    // check consistency
+    if (range.__isset.num_of_columns_from_file) {
+        int size = range.columns_from_path.size();
+        for (const auto& r : _ranges) {
+            if (r.columns_from_path.size() != size) {
+                return Status::InternalError("ranges have different number of columns.");
+            }
+        }
+    }
+    return Status::OK();
+}
+
+// get next available arrow batch
+Status VParquetScanner::_next_arrow_batch() {
+    _arrow_batch_cur_idx = 0;
+    // first, init file reader
+    if (_cur_file_reader == nullptr || _cur_file_eof) {
+        RETURN_IF_ERROR(open_next_reader());
+        _cur_file_eof = false;
+    }
+    // second, loop until find available arrow batch or EOF
+    while (!_scanner_eof) {
+        RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, _src_slot_descs, &_cur_file_eof));
+        if (_cur_file_eof) {
+            RETURN_IF_ERROR(open_next_reader());
+            _cur_file_eof = false;
+            continue;
+        }
+        if (_batch->num_rows() == 0) {
+            continue;
+        }
+        return Status::OK();
+    }
+    return Status::EndOfFile("EOF");
+}
+
+Status VParquetScanner::_init_arrow_batch_if_necessary() {
+    // 1. init batch if first time
+    // 2. reset reader if end of file
+    Status status;
+    if (_scanner_eof || _batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) {
+        while (!_scanner_eof) {
+            status = _next_arrow_batch();
+            if (_scanner_eof) {
+                return status;
+            }
+            if (status.is_end_of_file()) {
+                // try next file
+                continue;
+            }
+            return status;
+        }
+    }
+    return status;
+}
+
+Status VParquetScanner::_init_src_block(Block* block) {
+    size_t batch_pos = 0;
+    for (auto i = 0; i < _num_of_columns_from_file; ++i) {
+        SlotDescriptor* slot_desc = _src_slot_descs[i];
+        if (slot_desc == nullptr) {
+            continue;
+        }
+        auto* array = _batch->column(batch_pos++).get();
+        // let src column be nullable for simplify converting
+        auto is_nullable = true;

Review Comment:
   We don't support non-nullable now, but will support it in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yinzhijian commented on a diff in pull request #9433: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
yinzhijian commented on code in PR #9433:
URL: https://github.com/apache/incubator-doris/pull/9433#discussion_r867502807


##########
fe/fe-core/src/main/java/org/apache/doris/load/Load.java:
##########
@@ -1044,26 +1047,52 @@ private static void initColumns(Table tbl, List<ImportColumnDesc> columnExprs,
         if (!needInitSlotAndAnalyzeExprs) {
             return;
         }
-
+        Set<String> exprArgsColumns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
+        for (ImportColumnDesc importColumnDesc : copiedColumnExprs) {
+            if (importColumnDesc.isColumn()) {
+                continue;
+            }
+            List<SlotRef> slots = Lists.newArrayList();
+            importColumnDesc.getExpr().collect(SlotRef.class, slots);
+            for (SlotRef slot : slots) {
+                String slotColumnName = slot.getColumnName();
+                exprArgsColumns.add(slotColumnName);
+            }
+        }
+        Set<String> excludedColumns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
         // init slot desc add expr map, also transform hadoop functions
         for (ImportColumnDesc importColumnDesc : copiedColumnExprs) {
             // make column name case match with real column name
             String columnName = importColumnDesc.getColumnName();
-            String realColName = tbl.getColumn(columnName) == null ? columnName
+            Column tblColumn = tbl.getColumn(columnName);
+            String realColName =  tblColumn == null ? columnName
                     : tbl.getColumn(columnName).getName();
             if (importColumnDesc.getExpr() != null) {
                 Expr expr = transformHadoopFunctionExpr(tbl, realColName, importColumnDesc.getExpr());
                 exprsByName.put(realColName, expr);
             } else {
                 SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(srcTupleDesc);
-                slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
+                // only support parquet format now
+                if (exprArgsColumns.contains(columnName) || formatType != TFileFormatType.FORMAT_PARQUET
+                    || !useVectorizedLoad) {
+                    // columns in expr args should be parsed as varchar type
+                    slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
+                    slotDesc.setColumn(new Column(realColName, PrimitiveType.VARCHAR));
+                    excludedColumns.add(realColName);
+                    // ISSUE A: src slot should be nullable even if the column is not nullable.
+                    // because src slot is what we read from file, not represent to real column value.
+                    // If column is not nullable, error will be thrown when filling the dest slot,
+                    // which is not nullable.
+                    slotDesc.setIsNullable(true);
+                } else {
+                    // in vectorized load,
+                    // columns from files like parquet files can be parsed as the type in table schema
+                    slotDesc.setType(tblColumn.getType());
+                    slotDesc.setColumn(new Column(realColName, tblColumn.getType()));
+                    // non-nullable column is allowed in vectorized load with parquet format
+                    slotDesc.setIsNullable(tblColumn.isAllowNull());

Review Comment:
   This is for future versions, where doris column is not nullable and parquet is nullable,we can filter out unwanted values ​​before executing expressions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee commented on a diff in pull request #9433: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #9433:
URL: https://github.com/apache/incubator-doris/pull/9433#discussion_r867791905


##########
be/test/vec/utils/arrow_column_to_doris_column_test.cpp:
##########
@@ -0,0 +1,613 @@
+

Review Comment:
   no need space



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] github-actions[bot] commented on pull request #9433: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #9433:
URL: https://github.com/apache/incubator-doris/pull/9433#issuecomment-1127677524

   PR approved by at least one committer and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org