You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Lujun-WC (via GitHub)" <gi...@apache.org> on 2023/04/07 03:35:52 UTC

[GitHub] [hudi] Lujun-WC commented on issue #8391: [SUPPORT]hudi-0.13 Using spark to write into Hudi is too slow

Lujun-WC commented on issue #8391:
URL: https://github.com/apache/hudi/issues/8391#issuecomment-1499897104

   
   object DwdFoo6Order {
     def main(args: Array[String]): Unit = {
       val spark = SparkSession
         .builder()
         .config("spark.debug.maxToStringFields", "500")
         .config("spark.sql.debug.maxToStringFields", "500")
         .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
         .config("hive.exec.dynamic.partition", true)
         .config("hive.exec.dynamic.partition.mode", "nonstrict")
         .enableHiveSupport()
         .getOrCreate()
   
   
       import spark.implicits._
       val foo6KafkaDF: Dataset[Row] = spark
         .readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", "10.10.10.1:9092")
         .option("subscribe", topic)
         .option("startingOffsets", "latest")
         .option("maxOffsetsPerTrigger", 1500000L)
         .option("failOnDataLoss", value = false)
         .load()
   
   
       val query = foo6KafkaDF
         .writeStream
         .queryName(queryName)
         .option("checkpointLocation", checkpointLocation)
         .trigger(Trigger.ProcessingTime(s"300 seconds"))
         .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
   
           //df processing
           val df = ...
   
         df.write.format("org.apache.hudi")
           .option(TBL_NAME.key(), hudiTable)
           .option(TABLE_TYPE.key(), HudiTable.COW)
           .option(OPERATION.key(), HudiWriteOpts.UPSERT)
           .option(RECORDKEY_FIELD.key(), "order_id")
           .option(PRECOMBINE_FIELD.key(), "sort_key")
           .option(PARTITIONPATH_FIELD.key(), "cdt,data_source")
           .option(WRITE_PAYLOAD_CLASS_NAME.key(), classOf[DefaultHoodieRecordPayload].getName)
           .option("hoodie.write.markers.type", "direct")
           .option("hoodie.index.type", "BLOOM")
           .option("hoodie.datasource.write.hive_style_partitioning", "true")
           .option("hoodie.insert.shuffle.parallelism", "150")
           .option("hoodie.bulkinsert.shuffle.parallelism", "150")
           .option("hoodie.upsert.shuffle.parallelism", "150")
           .option("hoodie.delete.shuffle.parallelism", "150")
           .mode(SaveMode.Append).save(basePath)
   
           batchDF.unpersist()
           ()
         }.start()
   
       query.awaitTermination()
     }
   
   }
   
   
   spark submit:
   spark-submit  --class com.example.DwdFoo6Order \
   --master yarn --deploy-mode cluster \
   --num-executors 4 --executor-cores 2 --executor-memory 8G \
   --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,org.apache.kafka:kafka-clients:2.8.0,com.alibaba:druid:1.2.6,mysql:mysql-connector-java:5.1.34 \
   ./hudi-test.jar
   
   
   
   existing data size:
   cdt                   data_source  count(1)
   2023-03-16      foo6_standard   2
   2023-03-17      foo6_standard   2
   2023-03-18      foo6_standard   3
   2023-03-19      foo6_standard   3
   2023-03-20      foo6_standard   1
   2023-03-23      foo6_standard   3
   2023-03-24      foo6_standard   2
   2023-03-25      foo6_standard   1
   2023-03-26      foo6_standard   12
   2023-03-27      foo6_standard   12
   2023-03-28      foo6_standard   41
   2023-03-29      foo6_standard   70
   2023-03-30      foo6_standard   88
   2023-03-31      foo6_standard   301
   2023-04-01      foo6_standard   1613
   2023-04-02      foo6_standard   1828818
   2023-04-03      foo6_standard   1567815
   2023-04-04      foo6_standard   2269541
   2023-04-05      foo6_standard   2884449
   2023-04-06      foo6_standard   1933243
   2023-04-07      foo6_standard   304502
   
   The RECORDKEY_FIELD is order_id, which is a self-incrementing long type id. 
   The cdt represents the creation time of the order, and the creation time for the same order_id stays constant. 
   In each data batch (containing less than 100,000 records), 90% of the data belongs to the current day's partition, while 10% of the data updates the older partition
   
   The same data takes just 0.1 minutes to write to Hive, but writing to Hudi is unexpectedly slow. What could be the reason for this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org