You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/12/15 01:27:01 UTC

[incubator-doris] branch master updated: [SparkLoadk] Avoid to read whole hive table when we add a where (#5047)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 834834d  [SparkLoadk] Avoid to read whole hive table when we add a where (#5047)
834834d is described below

commit 834834dc44adc2e300ada3ad310b93ad3afab0b4
Author: Dam1029 <99...@qq.com>
AuthorDate: Tue Dec 15 09:26:42 2020 +0800

    [SparkLoadk] Avoid to read whole hive table when we add a where (#5047)
    
    When we use spark load from hive table, the function loadDataFromHiveTable
    will read whole hive table and then filter the data in process()
    if hive table have lots of partitions and history data,the load will be cost too much time and resource.
    So we can do filter work in loadDataFromHiveTable function when read from hive table.
    Co-authored-by: 杜安明 <an...@mihoyo.com>
---
 .../main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java | 12 ++++--------
 1 file changed, 4 insertions(+), 8 deletions(-)

diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
index e4c3a23..2af4de4 100644
--- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
+++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
@@ -97,7 +97,6 @@ public final class SparkDpp implements java.io.Serializable {
     private SparkSession spark = null;
     private EtlJobConfig etlJobConfig = null;
     private LongAccumulator abnormalRowAcc = null;
-    private LongAccumulator unselectedRowAcc = null;
     private LongAccumulator scannedRowsAcc = null;
     private LongAccumulator fileNumberAcc = null;
     private LongAccumulator fileSizeAcc = null;
@@ -120,7 +119,6 @@ public final class SparkDpp implements java.io.Serializable {
 
     public void init() {
         abnormalRowAcc = spark.sparkContext().longAccumulator("abnormalRowAcc");
-        unselectedRowAcc = spark.sparkContext().longAccumulator("unselectedRowAcc");
         scannedRowsAcc = spark.sparkContext().longAccumulator("scannedRowsAcc");
         fileNumberAcc = spark.sparkContext().longAccumulator("fileNumberAcc");
         fileSizeAcc = spark.sparkContext().longAccumulator("fileSizeAcc");
@@ -854,6 +852,10 @@ public final class SparkDpp implements java.io.Serializable {
             sql.append(column.columnName).append(",");
         });
         sql.deleteCharAt(sql.length() - 1).append(" from ").append(hiveDbTableName);
+        if (!Strings.isNullOrEmpty(fileGroup.where)) {
+            sql.append(" where ").append(fileGroup.where);
+        }
+
         Dataset<Row> dataframe = spark.sql(sql.toString());
         dataframe = checkDataFromHiveWithStrictMode(dataframe, baseIndex, fileGroup.columnMappings.keySet(), etlJobConfig.properties.strictMode,
                     dstTableSchema);
@@ -988,12 +990,6 @@ public final class SparkDpp implements java.io.Serializable {
                         LOG.info("no data for file file group:" + fileGroup);
                         continue;
                     }
-                    if (!Strings.isNullOrEmpty(fileGroup.where)) {
-                        long originalSize = fileGroupDataframe.count();
-                        fileGroupDataframe = fileGroupDataframe.filter(fileGroup.where);
-                        long currentSize = fileGroupDataframe.count();
-                        unselectedRowAcc.add(currentSize - originalSize);
-                    }
 
                     JavaPairRDD<List<Object>, Object[]> ret = fillTupleWithPartitionColumn(
                             fileGroupDataframe,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org