You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/10/21 00:25:14 UTC

[doris] branch branch-1.1-lts updated: [improvement](spark-load) support parquet and orc file (#13524)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.1-lts by this push:
     new 82f44db4e2 [improvement](spark-load) support parquet and orc file (#13524)
82f44db4e2 is described below

commit 82f44db4e20576d4545b9eaf2e50d00ffb9d475d
Author: liujinhui <96...@qq.com>
AuthorDate: Fri Oct 21 08:25:03 2022 +0800

    [improvement](spark-load) support parquet and orc file (#13524)
---
 .../org/apache/doris/load/loadv2/dpp/SparkDpp.java | 29 +++++++++++++++++++---
 1 file changed, 26 insertions(+), 3 deletions(-)

diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
index df79a7338e..5e0fbe2086 100644
--- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
+++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
@@ -21,7 +21,7 @@ import org.apache.doris.common.SparkDppException;
 import org.apache.doris.load.loadv2.etl.EtlJobConfig;
 import com.google.common.base.Strings;
 import com.google.gson.Gson;
-
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
@@ -88,6 +88,7 @@ import scala.Tuple2;
 // 2. repartition data by using doris data model(partition and bucket)
 // 3. process aggregation if needed
 // 4. write data to parquet file
+
 public final class SparkDpp implements java.io.Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(SparkDpp.class);
 
@@ -219,7 +220,6 @@ public final class SparkDpp implements java.io.Serializable {
                         continue;
                     }
 
-
                     String curBucketKey = keyColumns.get(0).toString();
                     List<Object> columnObjects = new ArrayList<>();
                     for (int i = 1; i < keyColumns.size(); ++i) {
@@ -228,7 +228,6 @@ public final class SparkDpp implements java.io.Serializable {
                     for (int i = 0; i < valueColumns.length; ++i) {
                         columnObjects.add(sparkRDDAggregators[i].finalize(valueColumns[i]));
                     }
-
                     Row rowWithoutBucketKey = RowFactory.create(columnObjects.toArray());
                     // if the bucket key is new, it will belong to a new tablet
                     if (lastBucketKey == null || !curBucketKey.equals(lastBucketKey)) {
@@ -622,6 +621,30 @@ public final class SparkDpp implements java.io.Serializable {
         if (fileGroup.columnsFromPath != null) {
             srcColumnsWithColumnsFromPath.addAll(fileGroup.columnsFromPath);
         }
+
+        if (fileGroup.fileFormat.equalsIgnoreCase("parquet")) {
+            // parquet had its own schema, just use it; perhaps we could add some validation in future.
+            Dataset<Row> dataFrame = spark.read().parquet(fileUrl);
+            if (!CollectionUtils.isEmpty(columnValueFromPath)) {
+                for (int k = 0; k < columnValueFromPath.size(); k++) {
+                    dataFrame = dataFrame.withColumn(
+                        fileGroup.columnsFromPath.get(k), functions.lit(columnValueFromPath.get(k)));
+                }
+            }
+            return dataFrame;
+        }
+
+        if (fileGroup.fileFormat.equalsIgnoreCase("orc")) {
+            Dataset<Row> dataFrame = spark.read().orc(fileUrl);
+            if (!CollectionUtils.isEmpty(columnValueFromPath)) {
+                for (int k = 0; k < columnValueFromPath.size(); k++) {
+                    dataFrame = dataFrame.withColumn(
+                        fileGroup.columnsFromPath.get(k), functions.lit(columnValueFromPath.get(k)));
+                }
+            }
+            return dataFrame;
+        }
+
         StructType srcSchema = createScrSchema(srcColumnsWithColumnsFromPath);
         JavaRDD<String> sourceDataRdd = spark.read().textFile(fileUrl).toJavaRDD();
         int columnSize = dataSrcColumns.size();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org