You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "lamber-ken (Jira)" <ji...@apache.org> on 2020/01/04 23:32:00 UTC

[jira] [Comment Edited] (HUDI-494) [DEBUGGING] Huge amount of tasks when writing files into HDFS

    [ https://issues.apache.org/jira/browse/HUDI-494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17008189#comment-17008189 ] 

lamber-ken edited comment on HUDI-494 at 1/4/20 11:31 PM:
----------------------------------------------------------

hi [~garyli1019], I guess your dataset contains many partitions, you can check this option *PARTITIONPATH_FIELD_OPT_KEY*.

hi, [~vinoth] WDYT?

 

*Reprodut steps*

1, steup hudi

 
{code:java}
${SPARK_HOME}/bin/spark-shell --packages org.apache.hudi:hudi-spark-bundle:0.5.0-incubating --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'{code}
 

2, insert data
{code:java}
import org.apache.spark.sql.SaveMode._val tableName = "hudi_mor_table"
val basePath = "file:///tmp/hudi_mor_table"var datas = List("""{ "name": "kenken", "ts": 1574297893836, "age": "12a", "location": "latitude"}""")
val df = spark.read.json(spark.sparkContext.parallelize(datas, 2))
df.write.format("org.apache.hudi").
    option("hoodie.insert.shuffle.parallelism", "10").
    option("hoodie.upsert.shuffle.parallelism", "10").
    option("hoodie.delete.shuffle.parallelism", "10").
    option("hoodie.bulkinsert.shuffle.parallelism", "10").
    option("hoodie.datasource.write.recordkey.field", "name").
    option("hoodie.datasource.write.partitionpath.field", "location").
    option("hoodie.datasource.write.precombine.field", "ts").
    option("hoodie.table.name", tableName).
    mode(Overwrite).
    save(basePath)

var datas = List.tabulate(30000)(i => s"""{ "name": "kenken", "ts": "zasz", "age": 123, "location": "latitude${i}"}""")
val df = spark.read.json(spark.sparkContext.parallelize(datas, 2))
df.write.format("org.apache.hudi").
    option("hoodie.insert.shuffle.parallelism", "10").
    option("hoodie.upsert.shuffle.parallelism", "10").
    option("hoodie.delete.shuffle.parallelism", "10").
    option("hoodie.bulkinsert.shuffle.parallelism", "10").
    option("hoodie.datasource.write.recordkey.field", "name").
    option("hoodie.datasource.write.partitionpath.field", "location").
    option("hoodie.datasource.write.precombine.field", "ts").
    option("hoodie.table.name", "hudi_mor_table").
    mode(Append).
    save("file:///tmp/hudi_mor_table")
{code}
3, generate many tasks && temp files
{code:java}
/tmp/hudi_mor_table/.hoodie/.temp/20200105071257/*
{code}
4, spark ui

!image-2020-01-05-07-30-53-567.png!  

 


was (Author: lamber-ken):
hi [~garyli1019], I guess your dataset contains many partitions, you can check this option *PARTITIONPATH_FIELD_OPT_KEY*.

hi, [~vinoth] WDYT?

 

*Reprodut steps*

1, steup hudi

 
{code:java}
${SPARK_HOME}/bin/spark-shell --packages org.apache.hudi:hudi-spark-bundle:0.5.0-incubating --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'{code}
 

2, insert data
{code:java}
import org.apache.spark.sql.SaveMode._val tableName = "hudi_mor_table"
val basePath = "file:///tmp/hudi_mor_table"var datas = List("""{ "name": "kenken", "ts": 1574297893836, "age": "12a", "location": "latitude"}""")
val df = spark.read.json(spark.sparkContext.parallelize(datas, 2))
df.write.format("org.apache.hudi").
    option("hoodie.insert.shuffle.parallelism", "10").
    option("hoodie.upsert.shuffle.parallelism", "10").
    option("hoodie.delete.shuffle.parallelism", "10").
    option("hoodie.bulkinsert.shuffle.parallelism", "10").
    option("hoodie.datasource.write.recordkey.field", "name").
    option("hoodie.datasource.write.partitionpath.field", "location").
    option("hoodie.datasource.write.precombine.field", "ts").
    option("hoodie.table.name", tableName).
    mode(Overwrite).
    save(basePath)

var datas = List.tabulate(30000)(i => s"""{ "name": "kenken", "ts": "zasz", "age": 123, "location": "latitude${i}"}""")
val df = spark.read.json(spark.sparkContext.parallelize(datas, 2))
df.write.format("org.apache.hudi").
    option("hoodie.insert.shuffle.parallelism", "10").
    option("hoodie.upsert.shuffle.parallelism", "10").
    option("hoodie.delete.shuffle.parallelism", "10").
    option("hoodie.bulkinsert.shuffle.parallelism", "10").
    option("hoodie.datasource.write.recordkey.field", "name").
    option("hoodie.datasource.write.partitionpath.field", "location").
    option("hoodie.datasource.write.precombine.field", "ts").
    option("hoodie.table.name", "hudi_mor_table").
    mode(Append).
    save("file:///tmp/hudi_mor_table")
{code}
3, generate many tasks && temp files
{code:java}
/tmp/hudi_mor_table/.hoodie/.temp/20200105071257/*
{code}
4, spark ui

!image-2020-01-05-07-30-53-567.png!

 

 

 

> [DEBUGGING] Huge amount of tasks when writing files into HDFS
> -------------------------------------------------------------
>
>                 Key: HUDI-494
>                 URL: https://issues.apache.org/jira/browse/HUDI-494
>             Project: Apache Hudi (incubating)
>          Issue Type: Test
>            Reporter: Yanjia Gary Li
>            Assignee: Vinoth Chandar
>            Priority: Major
>         Attachments: Screen Shot 2020-01-02 at 8.53.24 PM.png, Screen Shot 2020-01-02 at 8.53.44 PM.png, image-2020-01-05-07-30-53-567.png
>
>
> I am using the manual build master after [https://github.com/apache/incubator-hudi/commit/36b3b6f5dd913d3f1c9aa116aff8daf6540fed65] commit. EDIT: tried with the latest master but got the same result
> I am seeing 3 million tasks when the Hudi Spark job writing the files into HDFS. It seems like related to the input size. With 7.7 GB input it was 3.2 million tasks, with 9 GB input it was 3.7 million. Both with 10 parallelisms. 
> I am seeing a huge amount of 0 byte files being written into .hoodie/.temp/ folder in my HDFS. In the Spark UI, each task only writes less than 10 records in
> {code:java}
> count at HoodieSparkSqlWriter{code}
>  All the stages before this seem normal. Any idea what happened here? My first guess would be something related to the bloom filter index. Maybe somewhere trigger the repartitioning with the bloom filter index? But I am not really familiar with that part of the code. 
> Thanks
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)