You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2017/01/10 05:14:58 UTC

[jira] [Commented] (SPARK-12076) countDistinct behaves inconsistently

    [ https://issues.apache.org/jira/browse/SPARK-12076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15813923#comment-15813923 ] 

Hyukjin Kwon commented on SPARK-12076:
--------------------------------------

Could I ask to narrow down the problem or self-contained reproducer? I am willing to help verify this.

> countDistinct behaves inconsistently
> ------------------------------------
>
>                 Key: SPARK-12076
>                 URL: https://issues.apache.org/jira/browse/SPARK-12076
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.5.1
>            Reporter: Paul Zaczkieiwcz
>            Priority: Minor
>
> Assume:
> {code:java}
> val slicePlayed:DataFrame = _
> val joinKeys:DataFrame = _
> {code}
> Also assume that all columns beginning with "cdnt_" are from {{slicePlayed}} and all columns beginning with "join_" are from {{joinKeys}}.  The following queries can return different values for slice_count_distinct:
> {code:java}
> slicePlayed.join(
>   joinKeys,
>   ( 
>     $"join_session_id" === $"cdnt_session_id" &&
>     $"join_asset_id" === $"cdnt_asset_id" &&
>     $"join_euid" === $"cdnt_euid"
>   ),
>   "inner"
> ).groupBy(
>   $"cdnt_session_id".as("slice_played_session_id"),
>   $"cdnt_asset_id".as("slice_played_asset_id"),
>   $"cdnt_euid".as("slice_played_euid")
> ).agg(
>   countDistinct($"cdnt_slice_number").as("slice_count_distinct"),
>   count($"cdnt_slice_number").as("slice_count_total"),
>   min($"cdnt_slice_number").as("min_slice_number"),
>   max($"cdnt_slice_number").as("max_slice_number")
> ).show(false)
> {code}
> {code:java}
> slicePlayed.join(
>   joinKeys,
>   ( 
>     $"join_session_id" === $"cdnt_session_id" &&
>     $"join_asset_id" === $"cdnt_asset_id" &&
>     $"join_euid" === $"cdnt_euid"
>   ),
>   "inner"
> ).groupBy(
>   $"cdnt_session_id".as("slice_played_session_id"),
>   $"cdnt_asset_id".as("slice_played_asset_id"),
>   $"cdnt_euid".as("slice_played_euid")
> ).agg(
>   min($"cdnt_event_time").as("slice_start_time"),
>   min($"cdnt_playing_owner_id").as("slice_played_playing_owner_id"),
>   min($"cdnt_user_ip").as("slice_played_user_ip"),
>   min($"cdnt_user_agent").as("slice_played_user_agent"),
>   min($"cdnt_referer").as("slice_played_referer"),
>   max($"cdnt_event_time").as("slice_end_time"),
>   countDistinct($"cdnt_slice_number").as("slice_count_distinct"),
>   count($"cdnt_slice_number").as("slice_count_total"),
>   min($"cdnt_slice_number").as("min_slice_number"),
>   max($"cdnt_slice_number").as("max_slice_number"),
>   min($"cdnt_is_live").as("is_live")
> ).show(false)
> {code}
> The +only+ difference between the two queries are that I'm adding more columns to the {{agg}} method.
> I can't reproduce by manually creating a dataFrame from {{DataFrame.parallelize}}. The original sources of the dataFrames are parquet files.
> The explain plans for the two queries are slightly different.
> {code}
> == Physical Plan ==
> TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], functions=[(count(cdnt_slice_number#24L),mode=Final,isDistinct=false),(min(cdnt_slice_number#24L),mode=Final,isDistinct=false),(max(cdnt_slice_number#24L),mode=Final,isDistinct=false),(count(cdnt_slice_number#24L),mode=Complete,isDistinct=true)], output=[slice_played_session_id#780,slice_played_asset_id#781,slice_played_euid#782,slice_count_distinct#783L,slice_count_total#784L,min_slice_number#785L,max_slice_number#786L])
>  TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L], functions=[(count(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(min(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(max(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false)], output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,currentCount#795L,min#797L,max#799L])
>   TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L], functions=[(count(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(min(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(max(cdnt_slice_number#24L),mode=Partial,isDistinct=false)], output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,currentCount#795L,min#797L,max#799L])
>    TungstenProject [cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L]
>     SortMergeJoin [cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], [join_session_id#41,join_asset_id#42,join_euid#43]
>      TungstenSort [cdnt_session_id#23 ASC,cdnt_asset_id#5 ASC,cdnt_euid#13 ASC], false, 0
>       TungstenExchange hashpartitioning(cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13)
>        ConvertToUnsafe
>         Scan ParquetRelation[hdfs://hadoop-namenode1:8020/user/hive/warehouse/src_cdn_events][cdnt_slice_number#24L,cdnt_euid#13,cdnt_asset_id#5,cdnt_session_id#23]
>      TungstenSort [join_session_id#41 ASC,join_asset_id#42 ASC,join_euid#43 ASC], false, 0
>       TungstenExchange hashpartitioning(join_session_id#41,join_asset_id#42,join_euid#43)
>        ConvertToUnsafe
>         Scan PhysicalRDD[join_session_id#41,join_asset_id#42,join_euid#43]
> {code}
> {code}
> == Physical Plan ==
> SortBasedAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], functions=[(max(cdnt_event_time#4),mode=Final,isDistinct=false),(min(cdnt_event_time#4),mode=Final,isDistinct=false),(min(cdnt_is_live#18),mode=Final,isDistinct=false),(min(cdnt_playing_owner_id#21),mode=Final,isDistinct=false),(max(cdnt_slice_number#24L),mode=Final,isDistinct=false),(min(cdnt_slice_number#24L),mode=Final,isDistinct=false),(count(cdnt_slice_number#24L),mode=Final,isDistinct=false),(min(cdnt_user_ip#31),mode=Final,isDistinct=false),(min(cdnt_user_agent#30),mode=Final,isDistinct=false),(min(cdnt_referer#22),mode=Final,isDistinct=false),(count(cdnt_slice_number#24L),mode=Complete,isDistinct=true)], output=[slice_played_session_id#721,slice_played_asset_id#722,slice_played_euid#723,slice_start_time#724,slice_played_playing_owner_id#725,slice_played_user_ip#726,slice_played_user_agent#727,slice_played_referer#728,slice_end_time#729,slice_count_distinct#730L,slice_count_total#731L,min_slice_number#732L,max_slice_number#733L,is_live#734])
>  SortBasedAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L], functions=[(max(cdnt_event_time#4),mode=PartialMerge,isDistinct=false),(min(cdnt_event_time#4),mode=PartialMerge,isDistinct=false),(min(cdnt_is_live#18),mode=PartialMerge,isDistinct=false),(min(cdnt_playing_owner_id#21),mode=PartialMerge,isDistinct=false),(max(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(min(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(count(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(min(cdnt_user_ip#31),mode=PartialMerge,isDistinct=false),(min(cdnt_user_agent#30),mode=PartialMerge,isDistinct=false),(min(cdnt_referer#22),mode=PartialMerge,isDistinct=false)], output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,max#758,min#748,min#768,min#750,max#766L,min#764L,currentCount#762L,min#752,min#754,min#756])
>   SortBasedAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L], functions=[(max(cdnt_event_time#4),mode=Partial,isDistinct=false),(min(cdnt_event_time#4),mode=Partial,isDistinct=false),(min(cdnt_is_live#18),mode=Partial,isDistinct=false),(min(cdnt_playing_owner_id#21),mode=Partial,isDistinct=false),(max(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(min(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(count(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(min(cdnt_user_ip#31),mode=Partial,isDistinct=false),(min(cdnt_user_agent#30),mode=Partial,isDistinct=false),(min(cdnt_referer#22),mode=Partial,isDistinct=false)], output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,max#758,min#748,min#768,min#750,max#766L,min#764L,currentCount#762L,min#752,min#754,min#756])
>    ConvertToSafe
>     TungstenProject [cdnt_playing_owner_id#21,cdnt_session_id#23,cdnt_slice_number#24L,cdnt_euid#13,cdnt_event_time#4,cdnt_is_live#18,cdnt_user_ip#31,cdnt_user_agent#30,cdnt_referer#22,cdnt_asset_id#5]
>      SortMergeJoin [cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], [join_session_id#41,join_asset_id#42,join_euid#43]
>       TungstenSort [cdnt_session_id#23 ASC,cdnt_asset_id#5 ASC,cdnt_euid#13 ASC], false, 0
>        TungstenExchange hashpartitioning(cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13)
>         ConvertToUnsafe
>          Scan ParquetRelation[hdfs://hadoop-namenode1:8020/user/hive/warehouse/src_cdn_events][cdnt_playing_owner_id#21,cdnt_session_id#23,cdnt_slice_number#24L,cdnt_euid#13,cdnt_event_time#4,cdnt_is_live#18,cdnt_user_ip#31,cdnt_user_agent#30,cdnt_referer#22,cdnt_asset_id#5]
>       TungstenSort [join_session_id#41 ASC,join_asset_id#42 ASC,join_euid#43 ASC], false, 0
>        TungstenExchange hashpartitioning(join_session_id#41,join_asset_id#42,join_euid#43)
>         ConvertToUnsafe
>          Scan PhysicalRDD[join_session_id#41,join_asset_id#42,join_euid#43]
> {code}
> The biggest difference betwen the two plans is whether TungstenAggregate is used or whether SortBasedAggregate+ConvertToSafe is used. The SortBasedAggregate+ConvertToSafe method gives the inaccurate results. I've been able to get around this issue by adding a {{sortBy}} call before the {{groupBy}} clause, but it strikes me that this particular calculation shouldn't change by adding a manual sort in an intermediate step.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org