You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Xiao Li (JIRA)" <ji...@apache.org> on 2019/05/15 21:49:00 UTC
[jira] [Resolved] (SPARK-27036) Even Broadcast thread is timed out,
BroadCast Job is not aborted.
[ https://issues.apache.org/jira/browse/SPARK-27036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xiao Li resolved SPARK-27036.
-----------------------------
Resolution: Fixed
Assignee: Xingbo Jiang
Fix Version/s: 3.0.0
> Even Broadcast thread is timed out, BroadCast Job is not aborted.
> -----------------------------------------------------------------
>
> Key: SPARK-27036
> URL: https://issues.apache.org/jira/browse/SPARK-27036
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.3.2
> Reporter: Babulal
> Assignee: Xingbo Jiang
> Priority: Minor
> Fix For: 3.0.0
>
> Attachments: image-2019-03-04-00-38-52-401.png, image-2019-03-04-00-39-12-210.png, image-2019-03-04-00-39-38-779.png
>
>
> During broadcast table job is execution if broadcast timeout (spark.sql.broadcastTimeout) happens ,broadcast Job still continue till completion whereas it should abort on broadcast timeout.
> Exception is thrown in console but Spark Job is still continue.
>
> !image-2019-03-04-00-39-38-779.png!
> !image-2019-03-04-00-39-12-210.png!
>
> wait for some time
> !image-2019-03-04-00-38-52-401.png!
> !image-2019-03-04-00-34-47-884.png!
>
> How to Reproduce Issue
> Option1 using SQL:-
> create Table t1(Big Table,1M Records)
> val rdd1=spark.sparkContext.parallelize(1 to 1000000,100).map(x=> ("name_"+x,x%3,x))
> val df=rdd1.toDF.selectExpr("_1 as name","_2 as age","_3 as sal","_1 as c1","_1 as c2","_1 as c3","_1 as c4","_1 as c5","_1 as c6","_1 as c7","_1 as c8","_1 as c9","_1 as c10","_1 as c11","_1 as c12","_1 as c13","_1 as c14","_1 as c15","_1 as c16","_1 as c17","_1 as c18","_1 as c19","_1 as c20","_1 as c21","_1 as c22","_1 as c23","_1 as c24","_1 as c25","_1 as c26","_1 as c27","_1 as c28","_1 as c29","_1 as c30")
> df.write.csv("D:/data/par1/t4");
> spark.sql("create table csv_2 using csv options('path'='D:/data/par1/t4')");
> create Table t2(Small Table,100K records)
> val rdd1=spark.sparkContext.parallelize(1 to 100000,100).map(x=> ("name_"+x,x%3,x))
> val df=rdd1.toDF.selectExpr("_1 as name","_2 as age","_3 as sal","_1 as c1","_1 as c2","_1 as c3","_1 as c4","_1 as c5","_1 as c6","_1 as c7","_1 as c8","_1 as c9","_1 as c10","_1 as c11","_1 as c12","_1 as c13","_1 as c14","_1 as c15","_1 as c16","_1 as c17","_1 as c18","_1 as c19","_1 as c20","_1 as c21","_1 as c22","_1 as c23","_1 as c24","_1 as c25","_1 as c26","_1 as c27","_1 as c28","_1 as c29","_1 as c30")
> df.write.csv("D:/data/par1/t4");
> spark.sql("create table csv_2 using csv options('path'='D:/data/par1/t5')");
> spark.sql("set spark.sql.autoBroadcastJoinThreshold=73400320").show(false)
> spark.sql("set spark.sql.broadcastTimeout=2").show(false)
> Run Below Query
> spark.sql("create table s using parquet as select t1.* from csv_2 as t1,csv_1 as t2 where t1._c3=t2._c3")
> Option 2:- Use External DataSource and Add Delay in the #buildScan. and use datasource for query.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org