You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Jeremy Davis <je...@speakeasy.net> on 2016/10/22 21:14:05 UTC

Ran in to a bug in Broadcast Hash Join

Hello, I ran in to a bug with Broadcast Hash Join in Spark 2.0. (Running on EMR)
If I just toggle spark.sql.autoBroadcastJoinThreshold=-1 then the join works, if I leave it as default it does not work.
When it doesn’t work, then one of my joined columns is filled with very small Doubles.

I’m joining two small tables: (datetime,spx) and (datetime,vix)
Attached are the plans and debug.

================================================================
The (Default) Broken case:

+-------------+-----------+
|     datetime|        spx|
+-------------+-----------+
|1476907200000|2144.290039|
|1476820800000|2139.600098|
|1476734400000|     2126.5|
|1476475200000| 2132.97998|
|1476388800000|2132.550049|
|1476302400000|2139.179932|
|1476216000000| 2136.72998|
|1476129600000|2163.659912|
|1475870400000| 2153.73999|
|1475784000000| 2160.77002|
|1475697600000| 2159.72998|
|1475611200000| 2150.48999|
|1475524800000|2161.199951|
|1475265600000| 2168.27002|
|1475179200000|2151.129883|
|1475092800000|2171.370117|
|1475006400000|2159.929932|
|1474920000000|2146.100098|
|1474660800000|2164.689941|
|1474574400000|2177.179932|
+-------------+-----------+
only showing top 20 rows

+-------------+---------+
|     datetime|      vix|
+-------------+---------+
|1476907200000|    14.41|
|1476820800000|    15.28|
|1476734400000|16.209999|
|1476475200000|16.120001|
|1476388800000|16.690001|
|1476302400000|    15.91|
|1476216000000|    15.36|
|1476129600000|    13.38|
|1475870400000|    13.48|
|1475784000000|    12.84|
|1475697600000|    12.99|
|1475611200000|    13.63|
|1475524800000|    13.57|
|1475265600000|    13.29|
|1475179200000|    14.02|
|1475092800000|    12.39|
|1475006400000|     13.1|
|1474920000000|     14.5|
|1474660800000|    12.29|
|1474574400000|    12.02|
+-------------+---------+
only showing top 20 rows

2016-10-22T20:50:31.345+0000: [GC (Allocation Failure) [PSYoungGen: 704134K->79382K(945664K)] 823872K->199145K(3089408K), 0.0285894 secs] [Times: user=0.29 sys=0.04, real=0.03 secs]
== Physical Plan ==
*Project [datetime#34L, spx#25, vix#72]
+- *BroadcastHashJoin [datetime#34L], [datetime#81L], Inner, BuildRight
   :- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25]
   :  +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
   :     +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths: s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
      +- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72]
         +- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
            +- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths: s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
########
== Parsed Logical Plan ==
'Join UsingJoin(Inner,List('datetime))
:- Project [datetime#34L, spx#25]
:  +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj Close#6, if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L]
:     +- Project [Date#0, Open#1, High#2, Low#3, cast(spx#16 as double) AS spx#25, Volume#5, Adj Close#6]
:        +- Project [Date#0, Open#1, High#2, Low#3, Close#4 AS spx#16, Volume#5, Adj Close#6]
:           +- Filter NOT (Date#0 = Date)
:              +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv
+- Project [datetime#81L, vix#72]
   +- Project [Date#47, Open#48, High#49, Low#50, vix#72, Volume#52, Adj Close#53, if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L]
      +- Project [Date#47, Open#48, High#49, Low#50, cast(vix#63 as double) AS vix#72, Volume#52, Adj Close#53]
         +- Project [Date#47, Open#48, High#49, Low#50, Close#51 AS vix#63, Volume#52, Adj Close#53]
            +- Filter NOT (Date#47 = Date)
               +- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv

== Analyzed Logical Plan ==
datetime: bigint, spx: double, vix: double
Project [datetime#34L, spx#25, vix#72]
+- Join Inner, (datetime#34L = datetime#81L)
   :- Project [datetime#34L, spx#25]
   :  +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj Close#6, if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L]
   :     +- Project [Date#0, Open#1, High#2, Low#3, cast(spx#16 as double) AS spx#25, Volume#5, Adj Close#6]
   :        +- Project [Date#0, Open#1, High#2, Low#3, Close#4 AS spx#16, Volume#5, Adj Close#6]
   :           +- Filter NOT (Date#0 = Date)
   :              +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv
   +- Project [datetime#81L, vix#72]
      +- Project [Date#47, Open#48, High#49, Low#50, vix#72, Volume#52, Adj Close#53, if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L]
         +- Project [Date#47, Open#48, High#49, Low#50, cast(vix#63 as double) AS vix#72, Volume#52, Adj Close#53]
            +- Project [Date#47, Open#48, High#49, Low#50, Close#51 AS vix#63, Volume#52, Adj Close#53]
               +- Filter NOT (Date#47 = Date)
                  +- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv

== Optimized Logical Plan ==
Project [datetime#34L, spx#25, vix#72]
+- Join Inner, (datetime#34L = datetime#81L)
   :- Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25]
   :  +- Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
   :     +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv
   +- Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72]
      +- Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
         +- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv

== Physical Plan ==
*Project [datetime#34L, spx#25, vix#72]
+- *BroadcastHashJoin [datetime#34L], [datetime#81L], Inner, BuildRight
   :- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25]
   :  +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
   :     +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths: s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
      +- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72]
         +- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
            +- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths: s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
########
+-------------+-----------+-------------------+
|     datetime|        spx|                vix|
+-------------+-----------+-------------------+
|1476907200000|2144.290039|7.296891096156E-312|
|1476820800000|2139.600098| 7.29646422344E-312|
|1476734400000|     2126.5| 7.29603735072E-312|
|1476475200000| 2132.97998|7.294756732566E-312|
|1476388800000|2132.550049| 7.29432985985E-312|
|1476302400000|2139.179932| 7.29390298713E-312|
|1476216000000| 2136.72998| 7.29347611441E-312|
|1476129600000|2163.659912|7.293049241694E-312|
|1475870400000| 2153.73999| 7.29176862354E-312|
|1475784000000| 2160.77002| 7.29134175082E-312|
|1475697600000| 2159.72998|7.290914878104E-312|
|1475611200000| 2150.48999|7.290488005386E-312|
|1475524800000|2161.199951| 7.29006113267E-312|
|1475265600000| 2168.27002|7.288780514514E-312|
|1475179200000|2151.129883|7.288353641796E-312|
|1475092800000|2171.370117| 7.28792676908E-312|
|1475006400000|2159.929932| 7.28749989636E-312|
|1474920000000|2146.100098| 7.28707302364E-312|
|1474660800000|2164.689941| 7.28579240549E-312|
|1474574400000|2177.179932| 7.28536553277E-312|
+-------------+-----------+-------------------+




=============================================================================
=============================================================================
=============================================================================
--conf,spark.sql.autoBroadcastJoinThreshold=-1,



+-------------+-----------+
|     datetime|        spx|
+-------------+-----------+
|1476907200000|2144.290039|
|1476820800000|2139.600098|
|1476734400000|     2126.5|
|1476475200000| 2132.97998|
|1476388800000|2132.550049|
|1476302400000|2139.179932|
|1476216000000| 2136.72998|
|1476129600000|2163.659912|
|1475870400000| 2153.73999|
|1475784000000| 2160.77002|
|1475697600000| 2159.72998|
|1475611200000| 2150.48999|
|1475524800000|2161.199951|
|1475265600000| 2168.27002|
|1475179200000|2151.129883|
|1475092800000|2171.370117|
|1475006400000|2159.929932|
|1474920000000|2146.100098|
|1474660800000|2164.689941|
|1474574400000|2177.179932|
+-------------+-----------+
only showing top 20 rows

+-------------+---------+
|     datetime|      vix|
+-------------+---------+
|1476907200000|    14.41|
|1476820800000|    15.28|
|1476734400000|16.209999|
|1476475200000|16.120001|
|1476388800000|16.690001|
|1476302400000|    15.91|
|1476216000000|    15.36|
|1476129600000|    13.38|
|1475870400000|    13.48|
|1475784000000|    12.84|
|1475697600000|    12.99|
|1475611200000|    13.63|
|1475524800000|    13.57|
|1475265600000|    13.29|
|1475179200000|    14.02|
|1475092800000|    12.39|
|1475006400000|     13.1|
|1474920000000|     14.5|
|1474660800000|    12.29|
|1474574400000|    12.02|
+-------------+---------+
only showing top 20 rows

== Physical Plan ==
*Project [datetime#34L, spx#25, vix#72]
+- *SortMergeJoin [datetime#34L], [datetime#81L], Inner
   :- *Sort [datetime#34L ASC], false, 0
   :  +- Exchange hashpartitioning(datetime#34L, 200)
   :     +- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25]
   :        +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
   :           +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths: s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
   +- *Sort [datetime#81L ASC], false, 0
      +- Exchange hashpartitioning(datetime#81L, 200)
         +- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72]
            +- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
               +- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths: s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
########
2016-10-22T20:58:15.994+0000: [GC (Allocation Failure) [PSYoungGen: 705910K->79079K(999936K)] 824644K->197829K(2974208K), 0.0294130 secs] [Times: user=0.26 sys=0.04, real=0.03 secs]
== Parsed Logical Plan ==
'Join UsingJoin(Inner,List('datetime))
:- Project [datetime#34L, spx#25]
:  +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj Close#6, if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L]
:     +- Project [Date#0, Open#1, High#2, Low#3, cast(spx#16 as double) AS spx#25, Volume#5, Adj Close#6]
:        +- Project [Date#0, Open#1, High#2, Low#3, Close#4 AS spx#16, Volume#5, Adj Close#6]
:           +- Filter NOT (Date#0 = Date)
:              +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv
+- Project [datetime#81L, vix#72]
   +- Project [Date#47, Open#48, High#49, Low#50, vix#72, Volume#52, Adj Close#53, if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L]
      +- Project [Date#47, Open#48, High#49, Low#50, cast(vix#63 as double) AS vix#72, Volume#52, Adj Close#53]
         +- Project [Date#47, Open#48, High#49, Low#50, Close#51 AS vix#63, Volume#52, Adj Close#53]
            +- Filter NOT (Date#47 = Date)
               +- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv

== Analyzed Logical Plan ==
datetime: bigint, spx: double, vix: double
Project [datetime#34L, spx#25, vix#72]
+- Join Inner, (datetime#34L = datetime#81L)
   :- Project [datetime#34L, spx#25]
   :  +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj Close#6, if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L]
   :     +- Project [Date#0, Open#1, High#2, Low#3, cast(spx#16 as double) AS spx#25, Volume#5, Adj Close#6]
   :        +- Project [Date#0, Open#1, High#2, Low#3, Close#4 AS spx#16, Volume#5, Adj Close#6]
   :           +- Filter NOT (Date#0 = Date)
   :              +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv
   +- Project [datetime#81L, vix#72]
      +- Project [Date#47, Open#48, High#49, Low#50, vix#72, Volume#52, Adj Close#53, if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L]
         +- Project [Date#47, Open#48, High#49, Low#50, cast(vix#63 as double) AS vix#72, Volume#52, Adj Close#53]
            +- Project [Date#47, Open#48, High#49, Low#50, Close#51 AS vix#63, Volume#52, Adj Close#53]
               +- Filter NOT (Date#47 = Date)
                  +- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv

== Optimized Logical Plan ==
Project [datetime#34L, spx#25, vix#72]
+- Join Inner, (datetime#34L = datetime#81L)
   :- Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25]
   :  +- Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
   :     +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv
   +- Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72]
      +- Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
         +- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv

== Physical Plan ==
*Project [datetime#34L, spx#25, vix#72]
+- *SortMergeJoin [datetime#34L], [datetime#81L], Inner
   :- *Sort [datetime#34L ASC], false, 0
   :  +- Exchange hashpartitioning(datetime#34L, 200)
   :     +- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25]
   :        +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
   :           +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths: s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
   +- *Sort [datetime#81L ASC], false, 0
      +- Exchange hashpartitioning(datetime#81L, 200)
         +- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72]
            +- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
               +- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths: s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
########
+-------------+-----------+---------+
|     datetime|        spx|      vix|
+-------------+-----------+---------+
| 931550400000|1403.280029|17.959999|
| 955742400000|1356.560059|33.490002|
| 962308800000|1442.390015|19.700001|
| 967752000000|1517.680054|    16.84|
| 995054400000|1215.680054|21.139999|
|1028145600000| 911.619995|32.029999|
|1049832000000| 878.289978|27.129999|
|1088452800000|1133.349976|    16.07|
|1097265600000|1122.140015|    15.05|
|1102539600000|1182.810059|    13.19|
|1147809600000|1292.079956|    13.35|
|1162414800000|1367.810059|    11.51|
|1266526800000|    1106.75|20.629999|
|1314043200000|1123.819946|42.439999|
|1319227200000|    1238.25|    31.32|
|1331928000000|1404.170044|    14.43|
|1377201600000|1656.959961|    14.76|
|1378756800000|1671.709961|    15.63|
|1390597200000|1790.290039|17.879999|
|1400616000000|1872.829956|    12.96|
+-------------+-----------+---------+







Re: Ran in to a bug in Broadcast Hash Join

Posted by Michael Armbrust <mi...@databricks.com>.
2.0.0 or 2.0.1?  There are several correctness fixes in the latter.

On Oct 22, 2016 2:14 PM, "Jeremy Davis" <je...@speakeasy.net> wrote:

>
> Hello, I ran in to a bug with Broadcast Hash Join in Spark 2.0. (Running
> on EMR)
> If I just toggle spark.sql.autoBroadcastJoinThreshold=-1 then the join
> works, if I leave it as default it does not work.
> When it doesn’t work, then one of my joined columns is filled with very
> small Doubles.
>
> I’m joining two small tables: (datetime,spx) and (datetime,vix)
> Attached are the plans and debug.
>
>
> ================================================================
>
> The (Default) Broken case:
>
>
> +-------------+-----------+
> |     datetime|        spx|
> +-------------+-----------+
> |1476907200000|2144.290039|
> |1476820800000|2139.600098|
> |1476734400000|     2126.5|
> |1476475200000| 2132.97998|
> |1476388800000|2132.550049|
> |1476302400000|2139.179932|
> |1476216000000| 2136.72998|
> |1476129600000|2163.659912|
> |1475870400000| 2153.73999|
> |1475784000000| 2160.77002|
> |1475697600000| 2159.72998|
> |1475611200000| 2150.48999|
> |1475524800000|2161.199951|
> |1475265600000| 2168.27002|
> |1475179200000|2151.129883|
> |1475092800000|2171.370117|
> |1475006400000|2159.929932|
> |1474920000000|2146.100098|
> |1474660800000|2164.689941|
> |1474574400000|2177.179932|
> +-------------+-----------+
> only showing top 20 rows
>
> +-------------+---------+
> |     datetime|      vix|
> +-------------+---------+
> |1476907200000|    14.41|
> |1476820800000|    15.28|
> |1476734400000|16.209999|
> |1476475200000|16.120001|
> |1476388800000|16.690001|
> |1476302400000|    15.91|
> |1476216000000|    15.36|
> |1476129600000|    13.38|
> |1475870400000|    13.48|
> |1475784000000|    12.84|
> |1475697600000|    12.99|
> |1475611200000|    13.63|
> |1475524800000|    13.57|
> |1475265600000|    13.29|
> |1475179200000|    14.02|
> |1475092800000|    12.39|
> |1475006400000|     13.1|
> |1474920000000|     14.5|
> |1474660800000|    12.29|
> |1474574400000|    12.02|
> +-------------+---------+
> only showing top 20 rows
>
> 2016-10-22T20:50:31.345+0000: [GC (Allocation Failure) [PSYoungGen: 704134K->79382K(945664K)] 823872K->199145K(3089408K), 0.0285894 secs] [Times: user=0.29 sys=0.04, real=0.03 secs]
> == Physical Plan ==
> *Project [datetime#34L, spx#25, vix#72]
> +- *BroadcastHashJoin [datetime#34L], [datetime#81L], Inner, BuildRight
>    :- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25]
>    :  +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
>    :     +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths: s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
>    +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
>       +- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72]
>          +- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
>             +- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths: s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
> ########
> == Parsed Logical Plan ==
> 'Join UsingJoin(Inner,List('datetime))
> :- Project [datetime#34L, spx#25]
> :  +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj Close#6, if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L]
> :     +- Project [Date#0, Open#1, High#2, Low#3, cast(spx#16 as double) AS spx#25, Volume#5, Adj Close#6]
> :        +- Project [Date#0, Open#1, High#2, Low#3, Close#4 AS spx#16, Volume#5, Adj Close#6]
> :           +- Filter NOT (Date#0 = Date)
> :              +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv
> +- Project [datetime#81L, vix#72]
>    +- Project [Date#47, Open#48, High#49, Low#50, vix#72, Volume#52, Adj Close#53, if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L]
>       +- Project [Date#47, Open#48, High#49, Low#50, cast(vix#63 as double) AS vix#72, Volume#52, Adj Close#53]
>          +- Project [Date#47, Open#48, High#49, Low#50, Close#51 AS vix#63, Volume#52, Adj Close#53]
>             +- Filter NOT (Date#47 = Date)
>                +- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv
>
> == Analyzed Logical Plan ==
> datetime: bigint, spx: double, vix: double
> Project [datetime#34L, spx#25, vix#72]
> +- Join Inner, (datetime#34L = datetime#81L)
>    :- Project [datetime#34L, spx#25]
>    :  +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj Close#6, if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L]
>    :     +- Project [Date#0, Open#1, High#2, Low#3, cast(spx#16 as double) AS spx#25, Volume#5, Adj Close#6]
>    :        +- Project [Date#0, Open#1, High#2, Low#3, Close#4 AS spx#16, Volume#5, Adj Close#6]
>    :           +- Filter NOT (Date#0 = Date)
>    :              +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv
>    +- Project [datetime#81L, vix#72]
>       +- Project [Date#47, Open#48, High#49, Low#50, vix#72, Volume#52, Adj Close#53, if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L]
>          +- Project [Date#47, Open#48, High#49, Low#50, cast(vix#63 as double) AS vix#72, Volume#52, Adj Close#53]
>             +- Project [Date#47, Open#48, High#49, Low#50, Close#51 AS vix#63, Volume#52, Adj Close#53]
>                +- Filter NOT (Date#47 = Date)
>                   +- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv
>
> == Optimized Logical Plan ==
> Project [datetime#34L, spx#25, vix#72]
> +- Join Inner, (datetime#34L = datetime#81L)
>    :- Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25]
>    :  +- Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
>    :     +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv
>    +- Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72]
>       +- Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
>          +- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv
>
> == Physical Plan ==
> *Project [datetime#34L, spx#25, vix#72]
> +- *BroadcastHashJoin [datetime#34L], [datetime#81L], Inner, BuildRight
>    :- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25]
>    :  +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
>    :     +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths: s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
>    +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
>       +- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72]
>          +- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
>             +- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths: s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
> ########
> +-------------+-----------+-------------------+
> |     datetime|        spx|                vix|
> +-------------+-----------+-------------------+
> |1476907200000|2144.290039|7.296891096156E-312|
> |1476820800000|2139.600098| 7.29646422344E-312|
> |1476734400000|     2126.5| 7.29603735072E-312|
> |1476475200000| 2132.97998|7.294756732566E-312|
> |1476388800000|2132.550049| 7.29432985985E-312|
> |1476302400000|2139.179932| 7.29390298713E-312|
> |1476216000000| 2136.72998| 7.29347611441E-312|
> |1476129600000|2163.659912|7.293049241694E-312|
> |1475870400000| 2153.73999| 7.29176862354E-312|
> |1475784000000| 2160.77002| 7.29134175082E-312|
> |1475697600000| 2159.72998|7.290914878104E-312|
> |1475611200000| 2150.48999|7.290488005386E-312|
> |1475524800000|2161.199951| 7.29006113267E-312|
> |1475265600000| 2168.27002|7.288780514514E-312|
> |1475179200000|2151.129883|7.288353641796E-312|
> |1475092800000|2171.370117| 7.28792676908E-312|
> |1475006400000|2159.929932| 7.28749989636E-312|
> |1474920000000|2146.100098| 7.28707302364E-312|
> |1474660800000|2164.689941| 7.28579240549E-312|
> |1474574400000|2177.179932| 7.28536553277E-312|
> +-------------+-----------+-------------------+
>
>
>
>
>
> ============================================================
> =================
> ============================================================
> =================
> ============================================================
> =================
>
> --conf,spark.sql.autoBroadcastJoinThreshold=-1,
>
>
>
>
> +-------------+-----------+
> |     datetime|        spx|
> +-------------+-----------+
> |1476907200000|2144.290039|
> |1476820800000|2139.600098|
> |1476734400000|     2126.5|
> |1476475200000| 2132.97998|
> |1476388800000|2132.550049|
> |1476302400000|2139.179932|
> |1476216000000| 2136.72998|
> |1476129600000|2163.659912|
> |1475870400000| 2153.73999|
> |1475784000000| 2160.77002|
> |1475697600000| 2159.72998|
> |1475611200000| 2150.48999|
> |1475524800000|2161.199951|
> |1475265600000| 2168.27002|
> |1475179200000|2151.129883|
> |1475092800000|2171.370117|
> |1475006400000|2159.929932|
> |1474920000000|2146.100098|
> |1474660800000|2164.689941|
> |1474574400000|2177.179932|
> +-------------+-----------+
> only showing top 20 rows
>
> +-------------+---------+
> |     datetime|      vix|
> +-------------+---------+
> |1476907200000|    14.41|
> |1476820800000|    15.28|
> |1476734400000|16.209999|
> |1476475200000|16.120001|
> |1476388800000|16.690001|
> |1476302400000|    15.91|
> |1476216000000|    15.36|
> |1476129600000|    13.38|
> |1475870400000|    13.48|
> |1475784000000|    12.84|
> |1475697600000|    12.99|
> |1475611200000|    13.63|
> |1475524800000|    13.57|
> |1475265600000|    13.29|
> |1475179200000|    14.02|
> |1475092800000|    12.39|
> |1475006400000|     13.1|
> |1474920000000|     14.5|
> |1474660800000|    12.29|
> |1474574400000|    12.02|
> +-------------+---------+
> only showing top 20 rows
>
> == Physical Plan ==
> *Project [datetime#34L, spx#25, vix#72]
> +- *SortMergeJoin [datetime#34L], [datetime#81L], Inner
>    :- *Sort [datetime#34L ASC], false, 0
>    :  +- Exchange hashpartitioning(datetime#34L, 200)
>    :     +- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25]
>    :        +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
>    :           +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths: s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
>    +- *Sort [datetime#81L ASC], false, 0
>       +- Exchange hashpartitioning(datetime#81L, 200)
>          +- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72]
>             +- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
>                +- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths: s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
> ########
> 2016-10-22T20:58:15.994+0000: [GC (Allocation Failure) [PSYoungGen: 705910K->79079K(999936K)] 824644K->197829K(2974208K), 0.0294130 secs] [Times: user=0.26 sys=0.04, real=0.03 secs]
> == Parsed Logical Plan ==
> 'Join UsingJoin(Inner,List('datetime))
> :- Project [datetime#34L, spx#25]
> :  +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj Close#6, if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L]
> :     +- Project [Date#0, Open#1, High#2, Low#3, cast(spx#16 as double) AS spx#25, Volume#5, Adj Close#6]
> :        +- Project [Date#0, Open#1, High#2, Low#3, Close#4 AS spx#16, Volume#5, Adj Close#6]
> :           +- Filter NOT (Date#0 = Date)
> :              +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv
> +- Project [datetime#81L, vix#72]
>    +- Project [Date#47, Open#48, High#49, Low#50, vix#72, Volume#52, Adj Close#53, if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L]
>       +- Project [Date#47, Open#48, High#49, Low#50, cast(vix#63 as double) AS vix#72, Volume#52, Adj Close#53]
>          +- Project [Date#47, Open#48, High#49, Low#50, Close#51 AS vix#63, Volume#52, Adj Close#53]
>             +- Filter NOT (Date#47 = Date)
>                +- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv
>
> == Analyzed Logical Plan ==
> datetime: bigint, spx: double, vix: double
> Project [datetime#34L, spx#25, vix#72]
> +- Join Inner, (datetime#34L = datetime#81L)
>    :- Project [datetime#34L, spx#25]
>    :  +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj Close#6, if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L]
>    :     +- Project [Date#0, Open#1, High#2, Low#3, cast(spx#16 as double) AS spx#25, Volume#5, Adj Close#6]
>    :        +- Project [Date#0, Open#1, High#2, Low#3, Close#4 AS spx#16, Volume#5, Adj Close#6]
>    :           +- Filter NOT (Date#0 = Date)
>    :              +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv
>    +- Project [datetime#81L, vix#72]
>       +- Project [Date#47, Open#48, High#49, Low#50, vix#72, Volume#52, Adj Close#53, if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L]
>          +- Project [Date#47, Open#48, High#49, Low#50, cast(vix#63 as double) AS vix#72, Volume#52, Adj Close#53]
>             +- Project [Date#47, Open#48, High#49, Low#50, Close#51 AS vix#63, Volume#52, Adj Close#53]
>                +- Filter NOT (Date#47 = Date)
>                   +- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv
>
> == Optimized Logical Plan ==
> Project [datetime#34L, spx#25, vix#72]
> +- Join Inner, (datetime#34L = datetime#81L)
>    :- Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25]
>    :  +- Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
>    :     +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] csv
>    +- Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72]
>       +- Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
>          +- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv
>
> == Physical Plan ==
> *Project [datetime#34L, spx#25, vix#72]
> +- *SortMergeJoin [datetime#34L], [datetime#81L], Inner
>    :- *Sort [datetime#34L ASC], false, 0
>    :  +- Exchange hashpartitioning(datetime#34L, 200)
>    :     +- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L, cast(Close#4 as double) AS spx#25]
>    :        +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
>    :           +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths: s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
>    +- *Sort [datetime#81L ASC], false, 0
>       +- Exchange hashpartitioning(datetime#81L, 200)
>          +- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L, cast(Close#51 as double) AS vix#72]
>             +- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
>                +- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths: s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date), Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
> ########
> +-------------+-----------+---------+
> |     datetime|        spx|      vix|
> +-------------+-----------+---------+
> | 931550400000|1403.280029|17.959999|
> | 955742400000|1356.560059|33.490002|
> | 962308800000|1442.390015|19.700001|
> | 967752000000|1517.680054|    16.84|
> | 995054400000|1215.680054|21.139999|
> |1028145600000| 911.619995|32.029999|
> |1049832000000| 878.289978|27.129999|
> |1088452800000|1133.349976|    16.07|
> |1097265600000|1122.140015|    15.05|
> |1102539600000|1182.810059|    13.19|
> |1147809600000|1292.079956|    13.35|
> |1162414800000|1367.810059|    11.51|
> |1266526800000|    1106.75|20.629999|
> |1314043200000|1123.819946|42.439999|
> |1319227200000|    1238.25|    31.32|
> |1331928000000|1404.170044|    14.43|
> |1377201600000|1656.959961|    14.76|
> |1378756800000|1671.709961|    15.63|
> |1390597200000|1790.290039|17.879999|
> |1400616000000|1872.829956|    12.96|
> +-------------+-----------+---------+
>
>
>
>
>
>
>
>