You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Lujun-WC (via GitHub)" <gi...@apache.org> on 2023/04/07 03:35:52 UTC
[GitHub] [hudi] Lujun-WC commented on issue #8391: [SUPPORT]hudi-0.13 Using spark to write into Hudi is too slow
Lujun-WC commented on issue #8391:
URL: https://github.com/apache/hudi/issues/8391#issuecomment-1499897104
object DwdFoo6Order {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.config("spark.debug.maxToStringFields", "500")
.config("spark.sql.debug.maxToStringFields", "500")
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.config("hive.exec.dynamic.partition", true)
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val foo6KafkaDF: Dataset[Row] = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "10.10.10.1:9092")
.option("subscribe", topic)
.option("startingOffsets", "latest")
.option("maxOffsetsPerTrigger", 1500000L)
.option("failOnDataLoss", value = false)
.load()
val query = foo6KafkaDF
.writeStream
.queryName(queryName)
.option("checkpointLocation", checkpointLocation)
.trigger(Trigger.ProcessingTime(s"300 seconds"))
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
//df processing
val df = ...
df.write.format("org.apache.hudi")
.option(TBL_NAME.key(), hudiTable)
.option(TABLE_TYPE.key(), HudiTable.COW)
.option(OPERATION.key(), HudiWriteOpts.UPSERT)
.option(RECORDKEY_FIELD.key(), "order_id")
.option(PRECOMBINE_FIELD.key(), "sort_key")
.option(PARTITIONPATH_FIELD.key(), "cdt,data_source")
.option(WRITE_PAYLOAD_CLASS_NAME.key(), classOf[DefaultHoodieRecordPayload].getName)
.option("hoodie.write.markers.type", "direct")
.option("hoodie.index.type", "BLOOM")
.option("hoodie.datasource.write.hive_style_partitioning", "true")
.option("hoodie.insert.shuffle.parallelism", "150")
.option("hoodie.bulkinsert.shuffle.parallelism", "150")
.option("hoodie.upsert.shuffle.parallelism", "150")
.option("hoodie.delete.shuffle.parallelism", "150")
.mode(SaveMode.Append).save(basePath)
batchDF.unpersist()
()
}.start()
query.awaitTermination()
}
}
spark submit:
spark-submit --class com.example.DwdFoo6Order \
--master yarn --deploy-mode cluster \
--num-executors 4 --executor-cores 2 --executor-memory 8G \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,org.apache.kafka:kafka-clients:2.8.0,com.alibaba:druid:1.2.6,mysql:mysql-connector-java:5.1.34 \
./hudi-test.jar
existing data size:
cdt data_source count(1)
2023-03-16 foo6_standard 2
2023-03-17 foo6_standard 2
2023-03-18 foo6_standard 3
2023-03-19 foo6_standard 3
2023-03-20 foo6_standard 1
2023-03-23 foo6_standard 3
2023-03-24 foo6_standard 2
2023-03-25 foo6_standard 1
2023-03-26 foo6_standard 12
2023-03-27 foo6_standard 12
2023-03-28 foo6_standard 41
2023-03-29 foo6_standard 70
2023-03-30 foo6_standard 88
2023-03-31 foo6_standard 301
2023-04-01 foo6_standard 1613
2023-04-02 foo6_standard 1828818
2023-04-03 foo6_standard 1567815
2023-04-04 foo6_standard 2269541
2023-04-05 foo6_standard 2884449
2023-04-06 foo6_standard 1933243
2023-04-07 foo6_standard 304502
The RECORDKEY_FIELD is order_id, which is a self-incrementing long type id.
The cdt represents the creation time of the order, and the creation time for the same order_id stays constant.
In each data batch (containing less than 100,000 records), 90% of the data belongs to the current day's partition, while 10% of the data updates the older partition
The same data takes just 0.1 minutes to write to Hive, but writing to Hudi is unexpectedly slow. What could be the reason for this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org