You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/05/07 08:40:00 UTC

[GitHub] [incubator-doris] morningman commented on a diff in pull request #9433: [feature-wip](parquet-vec) Support parquet scanner in vectorized engine

morningman commented on code in PR #9433:
URL: https://github.com/apache/incubator-doris/pull/9433#discussion_r867320783


##########
be/src/vec/exec/vparquet_scanner.h:
##########
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <vector>
+#include <unordered_map>
+
+#include <arrow/array.h>
+#include "common/status.h"
+#include <exec/parquet_scanner.h>
+#include "gen_cpp/PlanNodes_types.h"
+#include "gen_cpp/Types_types.h"
+#include "runtime/mem_pool.h"
+#include "util/runtime_profile.h"
+
+namespace doris::vectorized {
+
+// VParquet scanner convert the data read from Parquet to doris's columns.
+class VParquetScanner : public ParquetScanner {
+public:
+    VParquetScanner(RuntimeState* state, RuntimeProfile* profile,
+                    const TBrokerScanRangeParams& params,
+                    const std::vector<TBrokerRangeDesc>& ranges,
+                    const std::vector<TNetworkAddress>& broker_addresses,
+                    const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
+
+    virtual ~VParquetScanner();
+
+    // Open this scanner, will initialize information need to
+    Status open();
+
+    Status get_next(std::vector<MutableColumnPtr>& columns, bool* eof);
+
+private:
+    Status next_arrow_batch();

Review Comment:
   ```suggestion
       Status _next_arrow_batch();
   ```
   Same as other private method



##########
fe/fe-core/src/main/java/org/apache/doris/load/Load.java:
##########
@@ -1044,26 +1047,52 @@ private static void initColumns(Table tbl, List<ImportColumnDesc> columnExprs,
         if (!needInitSlotAndAnalyzeExprs) {
             return;
         }
-
+        Set<String> exprArgsColumns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
+        for (ImportColumnDesc importColumnDesc : copiedColumnExprs) {
+            if (importColumnDesc.isColumn()) {
+                continue;
+            }
+            List<SlotRef> slots = Lists.newArrayList();
+            importColumnDesc.getExpr().collect(SlotRef.class, slots);
+            for (SlotRef slot : slots) {
+                String slotColumnName = slot.getColumnName();
+                exprArgsColumns.add(slotColumnName);
+            }
+        }
+        Set<String> excludedColumns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
         // init slot desc add expr map, also transform hadoop functions
         for (ImportColumnDesc importColumnDesc : copiedColumnExprs) {
             // make column name case match with real column name
             String columnName = importColumnDesc.getColumnName();
-            String realColName = tbl.getColumn(columnName) == null ? columnName
+            Column tblColumn = tbl.getColumn(columnName);
+            String realColName =  tblColumn == null ? columnName
                     : tbl.getColumn(columnName).getName();
             if (importColumnDesc.getExpr() != null) {
                 Expr expr = transformHadoopFunctionExpr(tbl, realColName, importColumnDesc.getExpr());
                 exprsByName.put(realColName, expr);
             } else {
                 SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(srcTupleDesc);
-                slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
+                // only support parquet format now
+                if (exprArgsColumns.contains(columnName) || formatType != TFileFormatType.FORMAT_PARQUET
+                    || !useVectorizedLoad) {
+                    // columns in expr args should be parsed as varchar type
+                    slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
+                    slotDesc.setColumn(new Column(realColName, PrimitiveType.VARCHAR));
+                    excludedColumns.add(realColName);
+                    // ISSUE A: src slot should be nullable even if the column is not nullable.
+                    // because src slot is what we read from file, not represent to real column value.
+                    // If column is not nullable, error will be thrown when filling the dest slot,
+                    // which is not nullable.
+                    slotDesc.setIsNullable(true);
+                } else {
+                    // in vectorized load,
+                    // columns from files like parquet files can be parsed as the type in table schema
+                    slotDesc.setType(tblColumn.getType());
+                    slotDesc.setColumn(new Column(realColName, tblColumn.getType()));
+                    // non-nullable column is allowed in vectorized load with parquet format
+                    slotDesc.setIsNullable(tblColumn.isAllowNull());

Review Comment:
   Even if this column does not have expr, it may still be nullable in parquet file?
   So I think this should be set to true, too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org