You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@hudi.apache.org by vtygoss <vt...@126.com> on 2022/09/01 08:31:50 UTC

Stuck for over an hour when transfering multi tables using ThreadPool

Hi, community!


I meet a problem when transfering some tables by spark using ExecutorService, and code logic like below


```
val kafkaCDCStream = ...
val executorService = Executors.newFixedThreadPool(20)


for(tableName <- tables){
val thread = new Thread(){
val rdd = kafkaCDCStream.filter(_.table = tableName)
rdd.filter(....).upsert(hudi)
....
}
executorService.submit(thread)
}
```


Recent 2 days, the driver of spark application had stucked twice, and we got thread dump of driver, with orignal jstack file in the attachment.


I found that all of 20 threads(pool-20-thread-1 ~ pool-20-thread-20) in ThreadPool was at the same stack as below, and i noticed that:


1. the class of SimpleDateFormat is not thread safe. 
2. each worker thread in pool will create new instant of different tables, but through a GLOBAL SimpleDateFormat instance. 
As picture below.
3. All i knew was that SimpleDateFormat is not thread-safe, and it will cause unintended results. 
4. In new version of hudi, it has changed to DateTimeFormatter which is thread-safe, instead of SimpleDateFormat.




So, i have these questions:


1. in addition to the Jstack, are there any other ways  to solve this stucked driver problem? 
2. Through the Jstack file, what are the possible reasons? 
3. If SimpleDateFormat can cause stuck jvm in multi-threads, what are the mechanisms of causing stuck jvm? 


Thanks for your any replies.


Best Regards! 




```
"pool-20-thread-20" #676 prio=5 os_prio=0 tid=0x00007f893c128000 nid=0x1998 runnable [0x00007f871f0ed000]
   java.lang.Thread.State: RUNNABLE
    at java.text.SimpleDateFormat.subFormat(SimpleDateFormat.java:1125)
    at java.text.SimpleDateFormat.format(SimpleDateFormat.java:966)
    at java.text.SimpleDateFormat.format(SimpleDateFormat.java:936)
    at java.text.DateFormat.format(DateFormat.java:345)
    at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.lambda$createNewInstantTime$0(HoodieActiveTimeline.java:92)
    at org.apache.hudi.common.table.timeline.HoodieActiveTimeline$$Lambda$111/537139208.apply(Unknown Source)
    at java.util.concurrent.atomic.AtomicReference.updateAndGet(AtomicReference.java:179)
    at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createNewInstantTime(HoodieActiveTimeline.java:89)
    at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createNewInstantTime(HoodieActiveTimeline.java:81)
    at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:108)
    at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    - locked <0x00000004da62beb0> (a org.apache.spark.sql.execution.command.ExecutedCommandExec)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
    - locked <0x00000004da6251e8> (a org.apache.spark.sql.execution.QueryExecution)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
```