You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/10/14 02:46:53 UTC

[GitHub] [doris] morningman commented on a diff in pull request #13099: [improvement](spark-load) support parquet and orc file

morningman commented on code in PR #13099:
URL: https://github.com/apache/doris/pull/13099#discussion_r995288455


##########
fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java:
##########
@@ -620,13 +627,131 @@ private Dataset<Row> loadDataFromPath(SparkSession spark,
         if (fileGroup.columnsFromPath != null) {
             srcColumnsWithColumnsFromPath.addAll(fileGroup.columnsFromPath);
         }
-        StructType srcSchema = createScrSchema(srcColumnsWithColumnsFromPath);
-        JavaRDD<String> sourceDataRdd = spark.read().textFile(fileUrl).toJavaRDD();
+        StructType srcSchema = createSrcSchema(srcColumnsWithColumnsFromPath);
         int columnSize = dataSrcColumns.size();
         List<ColumnParser> parsers = new ArrayList<>();
         for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
             parsers.add(ColumnParser.create(column));
         }
+
+        String fileFormat = fileGroup.fileFormat;
+        if (Strings.isNullOrEmpty(fileFormat)) {
+            fileFormat = "csv";
+        }
+
+        JavaRDD<Row> rowRDD = null;
+        switch (fileFormat) {
+            case "csv":
+                rowRDD = getRowsFromText(spark, fileGroup, fileUrl, baseIndex,
+                        columns, columnValueFromPath, dstColumnNameToIndex, srcSchema, columnSize, parsers);
+                break;
+            case "parquet":
+            case "orc":
+                rowRDD = getRowsFromParquetOrOrc(fileFormat, spark, fileGroup, fileUrl, baseIndex,
+                        columns, columnValueFromPath, dstColumnNameToIndex, srcSchema, columnSize, parsers);
+                break;
+            default:
+                throw new SparkDppException("Unsupport file format: " + fileFormat);
+        }
+
+        Dataset<Row> dataframe = spark.createDataFrame(rowRDD, srcSchema);
+        return dataframe;
+    }
+
+    private JavaRDD<Row> getRowsFromParquetOrOrc(String fileFormat, SparkSession spark, EtlFileGroup fileGroup,
+            String fileUrl,
+            EtlIndex baseIndex, List<EtlColumn> columns, List<String> columnValueFromPath,
+            Map<String, Integer> dstColumnNameToIndex, StructType srcSchema,
+            int columnSize, List<ColumnParser> parsers) throws SparkDppException {
+        JavaRDD<Row> rows = null;
+        if (fileFormat.equalsIgnoreCase("parquet")) {
+            rows = spark.read().parquet(fileUrl).toJavaRDD();
+        } else if (fileFormat.equalsIgnoreCase("orc")) {
+            rows = spark.read().orc(fileUrl).toJavaRDD();
+        } else {
+            throw new SparkDppException("Unknown file format: " + fileFormat);
+        }
+
+        JavaRDD<Row> rowRDD = rows.flatMap(
+                record -> {
+                    scannedRowsAcc.add(1);
+                    List<Row> result = new ArrayList<>();
+                    List<String> resStringCols = Lists.newArrayList();
+                    boolean validRow = true;
+                    if (record.length() != columnSize) {
+                        LOG.warn("invalid src schema, data columns:"
+                                + record.length() + ", file group columns:"
+                                + columnSize + ", row:" + record);
+                        validRow = false;
+                    } else {
+                        for (int i = 0; i < record.length(); ++i) {
+                            StructField field = srcSchema.apply(i);
+                            String srcColumnName = field.name();
+                            Object col = record.get(i);
+                            String strCol = null;
+                            if (col == null && dstColumnNameToIndex.containsKey(srcColumnName)) {
+                                if (!baseIndex.columns.get(dstColumnNameToIndex.get(srcColumnName)).isAllowNull) {
+                                    LOG.warn("column name:" + srcColumnName + ", attribute: " + i
+                                            + " can not be null. row:" + record);
+                                    validRow = false;
+                                    break;
+                                }
+                            } else {
+                                strCol = col.toString();

Review Comment:
   I don't know what is the result of this `to_string()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org