You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sujith Chacko (JIRA)" <ji...@apache.org> on 2019/03/05 15:50:00 UTC

[jira] [Comment Edited] (SPARK-27036) Even Broadcast thread is timed out, BroadCast Job is not aborted.

    [ https://issues.apache.org/jira/browse/SPARK-27036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16782840#comment-16782840 ] 

Sujith Chacko edited comment on SPARK-27036 at 3/5/19 3:49 PM:
---------------------------------------------------------------

It seems to be the problem area is   BroadcastExchangeExec  in driver where  as part of Future a particular job will be fired and collected data will be broadcasted. 

The main problem is system will submit the job and its respective stage/tasks through DAGScheduler,  where the scheduler thread will schedule the respective events , In BroadcastExchangeExec when future time out happens respective exception will thrown but the jobs/task which is  scheduled by  the  DAGScheduler as part of the action called in future will not be cancelled, I think we shall cancel the respective job to avoid  running the same in  background even after Future time out exception, this can help to terminate the job promptly when TimeOutException happens, this will also save the additional resources getting utilized even after timeout exception thrown from driver. 

I want to give an attempt to handle this issue, Any comments suggestions are welcome.

cc [~blue@cloudera.com] [~hvanhovell] [~srowen]


was (Author: s71955):
It seems to be the problem area is   BroadcastExchangeExec  in driver where  as part of Future a particular job will be fired and collected data will be broadcasted. 

The main problem is system will submit the job and its respective stage/tasks through DAGScheduler,  where the scheduler thread will schedule the respective events , In BroadcastExchangeExec when future time out happens respective exception will thrown but the jobs/task which is  scheduled by  the  DAGScheduler as part of the action called in future will not be cancelled, I think we shall cancel the respective job to avoid  running the same in  background even after Future time out exception, this can help to terminate the job promptly when TimeOutException happens, this will also save the additional resources getting utilized even after timeout exception thrown from driver. 

I want to give an attempt to handle this issue, Any comments suggestions are welcome.

cc [~srowen@scient.com] [~blue@cloudera.com] [~hvanhovell]

> Even Broadcast thread is timed out, BroadCast Job is not aborted.
> -----------------------------------------------------------------
>
>                 Key: SPARK-27036
>                 URL: https://issues.apache.org/jira/browse/SPARK-27036
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.2
>            Reporter: Babulal
>            Priority: Minor
>         Attachments: image-2019-03-04-00-38-52-401.png, image-2019-03-04-00-39-12-210.png, image-2019-03-04-00-39-38-779.png
>
>
> During broadcast table job is execution if broadcast timeout (spark.sql.broadcastTimeout) happens ,broadcast Job still continue till completion whereas it should abort on broadcast timeout.
> Exception is thrown in console  but Spark Job is still continue.
>  
> !image-2019-03-04-00-39-38-779.png!
> !image-2019-03-04-00-39-12-210.png!
>  
>  wait for some time
> !image-2019-03-04-00-38-52-401.png!
> !image-2019-03-04-00-34-47-884.png!
>  
> How to Reproduce Issue
> Option1 using SQL:- 
>  create Table t1(Big Table,1M Records)
>  val rdd1=spark.sparkContext.parallelize(1 to 1000000,100).map(x=> ("name_"+x,x%3,x))
>  val df=rdd1.toDF.selectExpr("_1 as name","_2 as age","_3 as sal","_1 as c1","_1 as c2","_1 as c3","_1 as c4","_1 as c5","_1 as c6","_1 as c7","_1 as c8","_1 as c9","_1 as c10","_1 as c11","_1 as c12","_1 as c13","_1 as c14","_1 as c15","_1 as c16","_1 as c17","_1 as c18","_1 as c19","_1 as c20","_1 as c21","_1 as c22","_1 as c23","_1 as c24","_1 as c25","_1 as c26","_1 as c27","_1 as c28","_1 as c29","_1 as c30")
>  df.write.csv("D:/data/par1/t4");
>  spark.sql("create table csv_2 using csv options('path'='D:/data/par1/t4')");
> create Table t2(Small Table,100K records)
>  val rdd1=spark.sparkContext.parallelize(1 to 100000,100).map(x=> ("name_"+x,x%3,x))
>  val df=rdd1.toDF.selectExpr("_1 as name","_2 as age","_3 as sal","_1 as c1","_1 as c2","_1 as c3","_1 as c4","_1 as c5","_1 as c6","_1 as c7","_1 as c8","_1 as c9","_1 as c10","_1 as c11","_1 as c12","_1 as c13","_1 as c14","_1 as c15","_1 as c16","_1 as c17","_1 as c18","_1 as c19","_1 as c20","_1 as c21","_1 as c22","_1 as c23","_1 as c24","_1 as c25","_1 as c26","_1 as c27","_1 as c28","_1 as c29","_1 as c30")
>  df.write.csv("D:/data/par1/t4");
>  spark.sql("create table csv_2 using csv options('path'='D:/data/par1/t5')");
> spark.sql("set spark.sql.autoBroadcastJoinThreshold=73400320").show(false)
>  spark.sql("set spark.sql.broadcastTimeout=2").show(false)
>  Run Below Query 
>  spark.sql("create table s using parquet as select t1.* from csv_2 as t1,csv_1 as t2 where t1._c3=t2._c3")
> Option 2:- Use External DataSource and Add Delay in the #buildScan. and use datasource for query.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org