You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Jeremy Davis <je...@speakeasy.net> on 2016/10/22 21:14:05 UTC
Ran in to a bug in Broadcast Hash Join
Hello, I ran in to a bug with Broadcast Hash Join in Spark 2.0. (Running on EMR)
If I just toggle spark.sql.autoBroadcastJoinThreshold=-1 then the join works, if I leave it as default it does not work.
When it doesn’t work, then one of my joined columns is filled with very small Doubles.
I’m joining two small tables: (datetime,spx) and (datetime,vix)
Attached are the plans and debug.
================================================================
The (Default) Broken case:
+-------------+-----------+
| datetime| spx|
+-------------+-----------+
|1476907200000|2144.290039|
|1476820800000|2139.600098|
|1476734400000| 2126.5|
|1476475200000| 2132.97998|
|1476388800000|2132.550049|
|1476302400000|2139.179932|
|1476216000000| 2136.72998|
|1476129600000|2163.659912|
|1475870400000| 2153.73999|
|1475784000000| 2160.77002|
|1475697600000| 2159.72998|
|1475611200000| 2150.48999|
|1475524800000|2161.199951|
|1475265600000| 2168.27002|
|1475179200000|2151.129883|
|1475092800000|2171.370117|
|1475006400000|2159.929932|
|1474920000000|2146.100098|
|1474660800000|2164.689941|
|1474574400000|2177.179932|
+-------------+-----------+
only showing top 20 rows
+-------------+---------+
| datetime| vix|
+-------------+---------+
|1476907200000| 14.41|
|1476820800000| 15.28|
|1476734400000|16.209999|
|1476475200000|16.120001|
|1476388800000|16.690001|
|1476302400000| 15.91|
|1476216000000| 15.36|
|1476129600000| 13.38|
|1475870400000| 13.48|
|1475784000000| 12.84|
|1475697600000| 12.99|
|1475611200000| 13.63|
|1475524800000| 13.57|
|1475265600000| 13.29|
|1475179200000| 14.02|
|1475092800000| 12.39|
|1475006400000| 13.1|
|1474920000000| 14.5|
|1474660800000| 12.29|
|1474574400000| 12.02|
+-------------+---------+
only showing top 20 rows
2016-10-22T20:50:31.345+0000: [GC (Allocation Failure) [PSYoungGen: 704134K->79382K(945664K)] 823872K->199145K(3089408K), 0.0285894 secs] [Times: user=0.29 sys=0.04, real=0.03 secs]
== Physical Plan ==
*Project [datetime#34L, spx#25, vix#72]
+- *BroadcastHashJoin [datetime#34L], [datetime#81L], Inner, BuildRight
:- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25]
: +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
: +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths: s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
+- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72]
+- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
+- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths: s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
########
== Parsed Logical Plan ==
'Join UsingJoin(Inner,List('datetime))
:- Project [datetime#34L, spx#25]
: +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj Close#6, if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L]
: +- Project [Date#0, Open#1, High#2, Low#3, cast(spx#16 as double) AS spx#25, Volume#5, Adj Close#6]
: +- Project [Date#0, Open#1, High#2, Low#3, Close#4 AS spx#16, Volume#5, Adj Close#6]
: +- Filter NOT (Date#0 = Date)
: +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv
+- Project [datetime#81L, vix#72]
+- Project [Date#47, Open#48, High#49, Low#50, vix#72, Volume#52, Adj Close#53, if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L]
+- Project [Date#47, Open#48, High#49, Low#50, cast(vix#63 as double) AS vix#72, Volume#52, Adj Close#53]
+- Project [Date#47, Open#48, High#49, Low#50, Close#51 AS vix#63, Volume#52, Adj Close#53]
+- Filter NOT (Date#47 = Date)
+- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv
== Analyzed Logical Plan ==
datetime: bigint, spx: double, vix: double
Project [datetime#34L, spx#25, vix#72]
+- Join Inner, (datetime#34L = datetime#81L)
:- Project [datetime#34L, spx#25]
: +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj Close#6, if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L]
: +- Project [Date#0, Open#1, High#2, Low#3, cast(spx#16 as double) AS spx#25, Volume#5, Adj Close#6]
: +- Project [Date#0, Open#1, High#2, Low#3, Close#4 AS spx#16, Volume#5, Adj Close#6]
: +- Filter NOT (Date#0 = Date)
: +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv
+- Project [datetime#81L, vix#72]
+- Project [Date#47, Open#48, High#49, Low#50, vix#72, Volume#52, Adj Close#53, if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L]
+- Project [Date#47, Open#48, High#49, Low#50, cast(vix#63 as double) AS vix#72, Volume#52, Adj Close#53]
+- Project [Date#47, Open#48, High#49, Low#50, Close#51 AS vix#63, Volume#52, Adj Close#53]
+- Filter NOT (Date#47 = Date)
+- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv
== Optimized Logical Plan ==
Project [datetime#34L, spx#25, vix#72]
+- Join Inner, (datetime#34L = datetime#81L)
:- Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25]
: +- Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
: +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv
+- Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72]
+- Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
+- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv
== Physical Plan ==
*Project [datetime#34L, spx#25, vix#72]
+- *BroadcastHashJoin [datetime#34L], [datetime#81L], Inner, BuildRight
:- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25]
: +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
: +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths: s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
+- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72]
+- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
+- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths: s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
########
+-------------+-----------+-------------------+
| datetime| spx| vix|
+-------------+-----------+-------------------+
|1476907200000|2144.290039|7.296891096156E-312|
|1476820800000|2139.600098| 7.29646422344E-312|
|1476734400000| 2126.5| 7.29603735072E-312|
|1476475200000| 2132.97998|7.294756732566E-312|
|1476388800000|2132.550049| 7.29432985985E-312|
|1476302400000|2139.179932| 7.29390298713E-312|
|1476216000000| 2136.72998| 7.29347611441E-312|
|1476129600000|2163.659912|7.293049241694E-312|
|1475870400000| 2153.73999| 7.29176862354E-312|
|1475784000000| 2160.77002| 7.29134175082E-312|
|1475697600000| 2159.72998|7.290914878104E-312|
|1475611200000| 2150.48999|7.290488005386E-312|
|1475524800000|2161.199951| 7.29006113267E-312|
|1475265600000| 2168.27002|7.288780514514E-312|
|1475179200000|2151.129883|7.288353641796E-312|
|1475092800000|2171.370117| 7.28792676908E-312|
|1475006400000|2159.929932| 7.28749989636E-312|
|1474920000000|2146.100098| 7.28707302364E-312|
|1474660800000|2164.689941| 7.28579240549E-312|
|1474574400000|2177.179932| 7.28536553277E-312|
+-------------+-----------+-------------------+
=============================================================================
=============================================================================
=============================================================================
--conf,spark.sql.autoBroadcastJoinThreshold=-1,
+-------------+-----------+
| datetime| spx|
+-------------+-----------+
|1476907200000|2144.290039|
|1476820800000|2139.600098|
|1476734400000| 2126.5|
|1476475200000| 2132.97998|
|1476388800000|2132.550049|
|1476302400000|2139.179932|
|1476216000000| 2136.72998|
|1476129600000|2163.659912|
|1475870400000| 2153.73999|
|1475784000000| 2160.77002|
|1475697600000| 2159.72998|
|1475611200000| 2150.48999|
|1475524800000|2161.199951|
|1475265600000| 2168.27002|
|1475179200000|2151.129883|
|1475092800000|2171.370117|
|1475006400000|2159.929932|
|1474920000000|2146.100098|
|1474660800000|2164.689941|
|1474574400000|2177.179932|
+-------------+-----------+
only showing top 20 rows
+-------------+---------+
| datetime| vix|
+-------------+---------+
|1476907200000| 14.41|
|1476820800000| 15.28|
|1476734400000|16.209999|
|1476475200000|16.120001|
|1476388800000|16.690001|
|1476302400000| 15.91|
|1476216000000| 15.36|
|1476129600000| 13.38|
|1475870400000| 13.48|
|1475784000000| 12.84|
|1475697600000| 12.99|
|1475611200000| 13.63|
|1475524800000| 13.57|
|1475265600000| 13.29|
|1475179200000| 14.02|
|1475092800000| 12.39|
|1475006400000| 13.1|
|1474920000000| 14.5|
|1474660800000| 12.29|
|1474574400000| 12.02|
+-------------+---------+
only showing top 20 rows
== Physical Plan ==
*Project [datetime#34L, spx#25, vix#72]
+- *SortMergeJoin [datetime#34L], [datetime#81L], Inner
:- *Sort [datetime#34L ASC], false, 0
: +- Exchange hashpartitioning(datetime#34L, 200)
: +- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25]
: +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
: +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths: s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
+- *Sort [datetime#81L ASC], false, 0
+- Exchange hashpartitioning(datetime#81L, 200)
+- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72]
+- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
+- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths: s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
########
2016-10-22T20:58:15.994+0000: [GC (Allocation Failure) [PSYoungGen: 705910K->79079K(999936K)] 824644K->197829K(2974208K), 0.0294130 secs] [Times: user=0.26 sys=0.04, real=0.03 secs]
== Parsed Logical Plan ==
'Join UsingJoin(Inner,List('datetime))
:- Project [datetime#34L, spx#25]
: +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj Close#6, if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L]
: +- Project [Date#0, Open#1, High#2, Low#3, cast(spx#16 as double) AS spx#25, Volume#5, Adj Close#6]
: +- Project [Date#0, Open#1, High#2, Low#3, Close#4 AS spx#16, Volume#5, Adj Close#6]
: +- Filter NOT (Date#0 = Date)
: +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv
+- Project [datetime#81L, vix#72]
+- Project [Date#47, Open#48, High#49, Low#50, vix#72, Volume#52, Adj Close#53, if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L]
+- Project [Date#47, Open#48, High#49, Low#50, cast(vix#63 as double) AS vix#72, Volume#52, Adj Close#53]
+- Project [Date#47, Open#48, High#49, Low#50, Close#51 AS vix#63, Volume#52, Adj Close#53]
+- Filter NOT (Date#47 = Date)
+- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv
== Analyzed Logical Plan ==
datetime: bigint, spx: double, vix: double
Project [datetime#34L, spx#25, vix#72]
+- Join Inner, (datetime#34L = datetime#81L)
:- Project [datetime#34L, spx#25]
: +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj Close#6, if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L]
: +- Project [Date#0, Open#1, High#2, Low#3, cast(spx#16 as double) AS spx#25, Volume#5, Adj Close#6]
: +- Project [Date#0, Open#1, High#2, Low#3, Close#4 AS spx#16, Volume#5, Adj Close#6]
: +- Filter NOT (Date#0 = Date)
: +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv
+- Project [datetime#81L, vix#72]
+- Project [Date#47, Open#48, High#49, Low#50, vix#72, Volume#52, Adj Close#53, if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L]
+- Project [Date#47, Open#48, High#49, Low#50, cast(vix#63 as double) AS vix#72, Volume#52, Adj Close#53]
+- Project [Date#47, Open#48, High#49, Low#50, Close#51 AS vix#63, Volume#52, Adj Close#53]
+- Filter NOT (Date#47 = Date)
+- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv
== Optimized Logical Plan ==
Project [datetime#34L, spx#25, vix#72]
+- Join Inner, (datetime#34L = datetime#81L)
:- Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25]
: +- Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
: +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv
+- Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72]
+- Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
+- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv
== Physical Plan ==
*Project [datetime#34L, spx#25, vix#72]
+- *SortMergeJoin [datetime#34L], [datetime#81L], Inner
:- *Sort [datetime#34L ASC], false, 0
: +- Exchange hashpartitioning(datetime#34L, 200)
: +- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25]
: +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
: +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths: s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
+- *Sort [datetime#81L ASC], false, 0
+- Exchange hashpartitioning(datetime#81L, 200)
+- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72]
+- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
+- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths: s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
########
+-------------+-----------+---------+
| datetime| spx| vix|
+-------------+-----------+---------+
| 931550400000|1403.280029|17.959999|
| 955742400000|1356.560059|33.490002|
| 962308800000|1442.390015|19.700001|
| 967752000000|1517.680054| 16.84|
| 995054400000|1215.680054|21.139999|
|1028145600000| 911.619995|32.029999|
|1049832000000| 878.289978|27.129999|
|1088452800000|1133.349976| 16.07|
|1097265600000|1122.140015| 15.05|
|1102539600000|1182.810059| 13.19|
|1147809600000|1292.079956| 13.35|
|1162414800000|1367.810059| 11.51|
|1266526800000| 1106.75|20.629999|
|1314043200000|1123.819946|42.439999|
|1319227200000| 1238.25| 31.32|
|1331928000000|1404.170044| 14.43|
|1377201600000|1656.959961| 14.76|
|1378756800000|1671.709961| 15.63|
|1390597200000|1790.290039|17.879999|
|1400616000000|1872.829956| 12.96|
+-------------+-----------+---------+
Re: Ran in to a bug in Broadcast Hash Join
Posted by Michael Armbrust <mi...@databricks.com>.
2.0.0 or 2.0.1? There are several correctness fixes in the latter.
On Oct 22, 2016 2:14 PM, "Jeremy Davis" <je...@speakeasy.net> wrote:
>
> Hello, I ran in to a bug with Broadcast Hash Join in Spark 2.0. (Running
> on EMR)
> If I just toggle spark.sql.autoBroadcastJoinThreshold=-1 then the join
> works, if I leave it as default it does not work.
> When it doesn’t work, then one of my joined columns is filled with very
> small Doubles.
>
> I’m joining two small tables: (datetime,spx) and (datetime,vix)
> Attached are the plans and debug.
>
>
> ================================================================
>
> The (Default) Broken case:
>
>
> +-------------+-----------+
> | datetime| spx|
> +-------------+-----------+
> |1476907200000|2144.290039|
> |1476820800000|2139.600098|
> |1476734400000| 2126.5|
> |1476475200000| 2132.97998|
> |1476388800000|2132.550049|
> |1476302400000|2139.179932|
> |1476216000000| 2136.72998|
> |1476129600000|2163.659912|
> |1475870400000| 2153.73999|
> |1475784000000| 2160.77002|
> |1475697600000| 2159.72998|
> |1475611200000| 2150.48999|
> |1475524800000|2161.199951|
> |1475265600000| 2168.27002|
> |1475179200000|2151.129883|
> |1475092800000|2171.370117|
> |1475006400000|2159.929932|
> |1474920000000|2146.100098|
> |1474660800000|2164.689941|
> |1474574400000|2177.179932|
> +-------------+-----------+
> only showing top 20 rows
>
> +-------------+---------+
> | datetime| vix|
> +-------------+---------+
> |1476907200000| 14.41|
> |1476820800000| 15.28|
> |1476734400000|16.209999|
> |1476475200000|16.120001|
> |1476388800000|16.690001|
> |1476302400000| 15.91|
> |1476216000000| 15.36|
> |1476129600000| 13.38|
> |1475870400000| 13.48|
> |1475784000000| 12.84|
> |1475697600000| 12.99|
> |1475611200000| 13.63|
> |1475524800000| 13.57|
> |1475265600000| 13.29|
> |1475179200000| 14.02|
> |1475092800000| 12.39|
> |1475006400000| 13.1|
> |1474920000000| 14.5|
> |1474660800000| 12.29|
> |1474574400000| 12.02|
> +-------------+---------+
> only showing top 20 rows
>
> 2016-10-22T20:50:31.345+0000: [GC (Allocation Failure) [PSYoungGen: 704134K->79382K(945664K)] 823872K->199145K(3089408K), 0.0285894 secs] [Times: user=0.29 sys=0.04, real=0.03 secs]
> == Physical Plan ==
> *Project [datetime#34L, spx#25, vix#72]
> +- *BroadcastHashJoin [datetime#34L], [datetime#81L], Inner, BuildRight
> :- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25]
> : +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
> : +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths: s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
> +- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72]
> +- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
> +- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths: s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
> ########
> == Parsed Logical Plan ==
> 'Join UsingJoin(Inner,List('datetime))
> :- Project [datetime#34L, spx#25]
> : +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj Close#6, if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L]
> : +- Project [Date#0, Open#1, High#2, Low#3, cast(spx#16 as double) AS spx#25, Volume#5, Adj Close#6]
> : +- Project [Date#0, Open#1, High#2, Low#3, Close#4 AS spx#16, Volume#5, Adj Close#6]
> : +- Filter NOT (Date#0 = Date)
> : +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv
> +- Project [datetime#81L, vix#72]
> +- Project [Date#47, Open#48, High#49, Low#50, vix#72, Volume#52, Adj Close#53, if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L]
> +- Project [Date#47, Open#48, High#49, Low#50, cast(vix#63 as double) AS vix#72, Volume#52, Adj Close#53]
> +- Project [Date#47, Open#48, High#49, Low#50, Close#51 AS vix#63, Volume#52, Adj Close#53]
> +- Filter NOT (Date#47 = Date)
> +- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv
>
> == Analyzed Logical Plan ==
> datetime: bigint, spx: double, vix: double
> Project [datetime#34L, spx#25, vix#72]
> +- Join Inner, (datetime#34L = datetime#81L)
> :- Project [datetime#34L, spx#25]
> : +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj Close#6, if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L]
> : +- Project [Date#0, Open#1, High#2, Low#3, cast(spx#16 as double) AS spx#25, Volume#5, Adj Close#6]
> : +- Project [Date#0, Open#1, High#2, Low#3, Close#4 AS spx#16, Volume#5, Adj Close#6]
> : +- Filter NOT (Date#0 = Date)
> : +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv
> +- Project [datetime#81L, vix#72]
> +- Project [Date#47, Open#48, High#49, Low#50, vix#72, Volume#52, Adj Close#53, if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L]
> +- Project [Date#47, Open#48, High#49, Low#50, cast(vix#63 as double) AS vix#72, Volume#52, Adj Close#53]
> +- Project [Date#47, Open#48, High#49, Low#50, Close#51 AS vix#63, Volume#52, Adj Close#53]
> +- Filter NOT (Date#47 = Date)
> +- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv
>
> == Optimized Logical Plan ==
> Project [datetime#34L, spx#25, vix#72]
> +- Join Inner, (datetime#34L = datetime#81L)
> :- Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25]
> : +- Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
> : +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv
> +- Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72]
> +- Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
> +- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv
>
> == Physical Plan ==
> *Project [datetime#34L, spx#25, vix#72]
> +- *BroadcastHashJoin [datetime#34L], [datetime#81L], Inner, BuildRight
> :- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25]
> : +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
> : +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths: s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
> +- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72]
> +- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
> +- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths: s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
> ########
> +-------------+-----------+-------------------+
> | datetime| spx| vix|
> +-------------+-----------+-------------------+
> |1476907200000|2144.290039|7.296891096156E-312|
> |1476820800000|2139.600098| 7.29646422344E-312|
> |1476734400000| 2126.5| 7.29603735072E-312|
> |1476475200000| 2132.97998|7.294756732566E-312|
> |1476388800000|2132.550049| 7.29432985985E-312|
> |1476302400000|2139.179932| 7.29390298713E-312|
> |1476216000000| 2136.72998| 7.29347611441E-312|
> |1476129600000|2163.659912|7.293049241694E-312|
> |1475870400000| 2153.73999| 7.29176862354E-312|
> |1475784000000| 2160.77002| 7.29134175082E-312|
> |1475697600000| 2159.72998|7.290914878104E-312|
> |1475611200000| 2150.48999|7.290488005386E-312|
> |1475524800000|2161.199951| 7.29006113267E-312|
> |1475265600000| 2168.27002|7.288780514514E-312|
> |1475179200000|2151.129883|7.288353641796E-312|
> |1475092800000|2171.370117| 7.28792676908E-312|
> |1475006400000|2159.929932| 7.28749989636E-312|
> |1474920000000|2146.100098| 7.28707302364E-312|
> |1474660800000|2164.689941| 7.28579240549E-312|
> |1474574400000|2177.179932| 7.28536553277E-312|
> +-------------+-----------+-------------------+
>
>
>
>
>
> ============================================================
> =================
> ============================================================
> =================
> ============================================================
> =================
>
> --conf,spark.sql.autoBroadcastJoinThreshold=-1,
>
>
>
>
> +-------------+-----------+
> | datetime| spx|
> +-------------+-----------+
> |1476907200000|2144.290039|
> |1476820800000|2139.600098|
> |1476734400000| 2126.5|
> |1476475200000| 2132.97998|
> |1476388800000|2132.550049|
> |1476302400000|2139.179932|
> |1476216000000| 2136.72998|
> |1476129600000|2163.659912|
> |1475870400000| 2153.73999|
> |1475784000000| 2160.77002|
> |1475697600000| 2159.72998|
> |1475611200000| 2150.48999|
> |1475524800000|2161.199951|
> |1475265600000| 2168.27002|
> |1475179200000|2151.129883|
> |1475092800000|2171.370117|
> |1475006400000|2159.929932|
> |1474920000000|2146.100098|
> |1474660800000|2164.689941|
> |1474574400000|2177.179932|
> +-------------+-----------+
> only showing top 20 rows
>
> +-------------+---------+
> | datetime| vix|
> +-------------+---------+
> |1476907200000| 14.41|
> |1476820800000| 15.28|
> |1476734400000|16.209999|
> |1476475200000|16.120001|
> |1476388800000|16.690001|
> |1476302400000| 15.91|
> |1476216000000| 15.36|
> |1476129600000| 13.38|
> |1475870400000| 13.48|
> |1475784000000| 12.84|
> |1475697600000| 12.99|
> |1475611200000| 13.63|
> |1475524800000| 13.57|
> |1475265600000| 13.29|
> |1475179200000| 14.02|
> |1475092800000| 12.39|
> |1475006400000| 13.1|
> |1474920000000| 14.5|
> |1474660800000| 12.29|
> |1474574400000| 12.02|
> +-------------+---------+
> only showing top 20 rows
>
> == Physical Plan ==
> *Project [datetime#34L, spx#25, vix#72]
> +- *SortMergeJoin [datetime#34L], [datetime#81L], Inner
> :- *Sort [datetime#34L ASC], false, 0
> : +- Exchange hashpartitioning(datetime#34L, 200)
> : +- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25]
> : +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
> : +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths: s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
> +- *Sort [datetime#81L ASC], false, 0
> +- Exchange hashpartitioning(datetime#81L, 200)
> +- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72]
> +- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
> +- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths: s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
> ########
> 2016-10-22T20:58:15.994+0000: [GC (Allocation Failure) [PSYoungGen: 705910K->79079K(999936K)] 824644K->197829K(2974208K), 0.0294130 secs] [Times: user=0.26 sys=0.04, real=0.03 secs]
> == Parsed Logical Plan ==
> 'Join UsingJoin(Inner,List('datetime))
> :- Project [datetime#34L, spx#25]
> : +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj Close#6, if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L]
> : +- Project [Date#0, Open#1, High#2, Low#3, cast(spx#16 as double) AS spx#25, Volume#5, Adj Close#6]
> : +- Project [Date#0, Open#1, High#2, Low#3, Close#4 AS spx#16, Volume#5, Adj Close#6]
> : +- Filter NOT (Date#0 = Date)
> : +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv
> +- Project [datetime#81L, vix#72]
> +- Project [Date#47, Open#48, High#49, Low#50, vix#72, Volume#52, Adj Close#53, if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L]
> +- Project [Date#47, Open#48, High#49, Low#50, cast(vix#63 as double) AS vix#72, Volume#52, Adj Close#53]
> +- Project [Date#47, Open#48, High#49, Low#50, Close#51 AS vix#63, Volume#52, Adj Close#53]
> +- Filter NOT (Date#47 = Date)
> +- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv
>
> == Analyzed Logical Plan ==
> datetime: bigint, spx: double, vix: double
> Project [datetime#34L, spx#25, vix#72]
> +- Join Inner, (datetime#34L = datetime#81L)
> :- Project [datetime#34L, spx#25]
> : +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj Close#6, if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L]
> : +- Project [Date#0, Open#1, High#2, Low#3, cast(spx#16 as double) AS spx#25, Volume#5, Adj Close#6]
> : +- Project [Date#0, Open#1, High#2, Low#3, Close#4 AS spx#16, Volume#5, Adj Close#6]
> : +- Filter NOT (Date#0 = Date)
> : +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv
> +- Project [datetime#81L, vix#72]
> +- Project [Date#47, Open#48, High#49, Low#50, vix#72, Volume#52, Adj Close#53, if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L]
> +- Project [Date#47, Open#48, High#49, Low#50, cast(vix#63 as double) AS vix#72, Volume#52, Adj Close#53]
> +- Project [Date#47, Open#48, High#49, Low#50, Close#51 AS vix#63, Volume#52, Adj Close#53]
> +- Filter NOT (Date#47 = Date)
> +- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv
>
> == Optimized Logical Plan ==
> Project [datetime#34L, spx#25, vix#72]
> +- Join Inner, (datetime#34L = datetime#81L)
> :- Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25]
> : +- Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
> : +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv
> +- Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72]
> +- Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
> +- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv
>
> == Physical Plan ==
> *Project [datetime#34L, spx#25, vix#72]
> +- *SortMergeJoin [datetime#34L], [datetime#81L], Inner
> :- *Sort [datetime#34L ASC], false, 0
> : +- Exchange hashpartitioning(datetime#34L, 200)
> : +- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25]
> : +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
> : +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths: s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
> +- *Sort [datetime#81L ASC], false, 0
> +- Exchange hashpartitioning(datetime#81L, 200)
> +- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72]
> +- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
> +- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths: s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
> ########
> +-------------+-----------+---------+
> | datetime| spx| vix|
> +-------------+-----------+---------+
> | 931550400000|1403.280029|17.959999|
> | 955742400000|1356.560059|33.490002|
> | 962308800000|1442.390015|19.700001|
> | 967752000000|1517.680054| 16.84|
> | 995054400000|1215.680054|21.139999|
> |1028145600000| 911.619995|32.029999|
> |1049832000000| 878.289978|27.129999|
> |1088452800000|1133.349976| 16.07|
> |1097265600000|1122.140015| 15.05|
> |1102539600000|1182.810059| 13.19|
> |1147809600000|1292.079956| 13.35|
> |1162414800000|1367.810059| 11.51|
> |1266526800000| 1106.75|20.629999|
> |1314043200000|1123.819946|42.439999|
> |1319227200000| 1238.25| 31.32|
> |1331928000000|1404.170044| 14.43|
> |1377201600000|1656.959961| 14.76|
> |1378756800000|1671.709961| 15.63|
> |1390597200000|1790.290039|17.879999|
> |1400616000000|1872.829956| 12.96|
> +-------------+-----------+---------+
>
>
>
>
>
>
>
>