You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/10/20 00:59:27 UTC
[doris] branch master updated: [improvement](spark-load) support parquet and orc file (#13438)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 60d5e4dfce [improvement](spark-load) support parquet and orc file (#13438)
60d5e4dfce is described below
commit 60d5e4dfce1e0bb03023d632c8ded0b50e2be386
Author: liujinhui <96...@qq.com>
AuthorDate: Thu Oct 20 08:59:22 2022 +0800
[improvement](spark-load) support parquet and orc file (#13438)
Add support for parquet/orc in SparkDpp.java
Fixed sparkDpp checkstyle issue
---
.../org/apache/doris/load/loadv2/dpp/SparkDpp.java | 28 ++++++++++++++++++++--
1 file changed, 26 insertions(+), 2 deletions(-)
diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
index 74357a9afe..d64c1080c8 100644
--- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
+++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
@@ -22,6 +22,7 @@ import org.apache.doris.load.loadv2.etl.EtlJobConfig;
import com.google.common.base.Strings;
import com.google.gson.Gson;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
@@ -75,7 +76,6 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
-
// This class is a Spark-based data preprocessing program,
// which will make use of the distributed compute framework of spark to
// do ETL job/sort/preaggregate jobs in spark job
@@ -87,6 +87,7 @@ import java.util.Set;
// 2. repartition data by using doris data model(partition and bucket)
// 3. process aggregation if needed
// 4. write data to parquet file
+
public final class SparkDpp implements java.io.Serializable {
private static final Logger LOG = LoggerFactory.getLogger(SparkDpp.class);
@@ -212,7 +213,6 @@ public final class SparkDpp implements java.io.Serializable {
continue;
}
-
String curBucketKey = keyColumns.get(0).toString();
List<Object> columnObjects = new ArrayList<>();
for (int i = 1; i < keyColumns.size(); ++i) {
@@ -620,6 +620,30 @@ public final class SparkDpp implements java.io.Serializable {
if (fileGroup.columnsFromPath != null) {
srcColumnsWithColumnsFromPath.addAll(fileGroup.columnsFromPath);
}
+
+ if (fileGroup.fileFormat.equalsIgnoreCase("parquet")) {
+ // parquet had its own schema, just use it; perhaps we could add some validation in future.
+ Dataset<Row> dataFrame = spark.read().parquet(fileUrl);
+ if (!CollectionUtils.isEmpty(columnValueFromPath)) {
+ for (int k = 0; k < columnValueFromPath.size(); k++) {
+ dataFrame = dataFrame.withColumn(
+ fileGroup.columnsFromPath.get(k), functions.lit(columnValueFromPath.get(k)));
+ }
+ }
+ return dataFrame;
+ }
+
+ if (fileGroup.fileFormat.equalsIgnoreCase("orc")) {
+ Dataset<Row> dataFrame = spark.read().orc(fileUrl);
+ if (!CollectionUtils.isEmpty(columnValueFromPath)) {
+ for (int k = 0; k < columnValueFromPath.size(); k++) {
+ dataFrame = dataFrame.withColumn(
+ fileGroup.columnsFromPath.get(k), functions.lit(columnValueFromPath.get(k)));
+ }
+ }
+ return dataFrame;
+ }
+
StructType srcSchema = createScrSchema(srcColumnsWithColumnsFromPath);
JavaRDD<String> sourceDataRdd = spark.read().textFile(fileUrl).toJavaRDD();
int columnSize = dataSrcColumns.size();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org