You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2017/01/11 02:43:58 UTC

[jira] [Comment Edited] (SPARK-12076) countDistinct behaves inconsistently

    [ https://issues.apache.org/jira/browse/SPARK-12076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15816938#comment-15816938 ] 

Hyukjin Kwon edited comment on SPARK-12076 at 1/11/17 2:43 AM:
---------------------------------------------------------------

Would you be able to try this in Spark 2.1?

It is painful to imagine and generate the data to reproduce this issue with such a complex query and even if someone like me makes it to verify, I can't say it is correctly reproduced somewhere because strictly it is unknown if the data was correct and I believe SQL component has rapidly changed and now it might produce other plans. Also, it is even worse because someone like me can't sure what is incorrect in the output.

So, I guess we could narrow down the problem so that someone can verify it is a problem.

If you are not able to try this in the current master, we should resolve this either as {{Cannot Reproduce}} because I guess no one can reproduce this and verify it, or {{Not A Problem}} because this "applies to issues or components that have changed radically since it was opened".



was (Author: hyukjin.kwon):
Would you be able to try this in Spark 2.1?

It is painful to imagine and generate the data to reproduce this issue with such a complex query and even if someone like me make it to verify, I can't say it is fixed somewhere because strictly it is unknown if the data was correct and I believe SQL component has rapidly changed and now it might produce other plans. Also, it is even worse because someone like me can't sure what is incorrect in the output.

So, I guess we could narrow down the problem so that someone can verify it is a problem.

If you are not able to try this in the current master, we should resolve this either as {{Cannot Reproduce}} because I guess no one can reproduce this and verify it, or {{Not A Problem}} because this "applies to issues or components that have changed radically since it was opened".


> countDistinct behaves inconsistently
> ------------------------------------
>
>                 Key: SPARK-12076
>                 URL: https://issues.apache.org/jira/browse/SPARK-12076
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.5.1
>            Reporter: Paul Zaczkieiwcz
>            Priority: Minor
>
> Assume:
> {code:java}
> val slicePlayed:DataFrame = _
> val joinKeys:DataFrame = _
> {code}
> Also assume that all columns beginning with "cdnt_" are from {{slicePlayed}} and all columns beginning with "join_" are from {{joinKeys}}.  The following queries can return different values for slice_count_distinct:
> {code:java}
> slicePlayed.join(
>   joinKeys,
>   ( 
>     $"join_session_id" === $"cdnt_session_id" &&
>     $"join_asset_id" === $"cdnt_asset_id" &&
>     $"join_euid" === $"cdnt_euid"
>   ),
>   "inner"
> ).groupBy(
>   $"cdnt_session_id".as("slice_played_session_id"),
>   $"cdnt_asset_id".as("slice_played_asset_id"),
>   $"cdnt_euid".as("slice_played_euid")
> ).agg(
>   countDistinct($"cdnt_slice_number").as("slice_count_distinct"),
>   count($"cdnt_slice_number").as("slice_count_total"),
>   min($"cdnt_slice_number").as("min_slice_number"),
>   max($"cdnt_slice_number").as("max_slice_number")
> ).show(false)
> {code}
> {code:java}
> slicePlayed.join(
>   joinKeys,
>   ( 
>     $"join_session_id" === $"cdnt_session_id" &&
>     $"join_asset_id" === $"cdnt_asset_id" &&
>     $"join_euid" === $"cdnt_euid"
>   ),
>   "inner"
> ).groupBy(
>   $"cdnt_session_id".as("slice_played_session_id"),
>   $"cdnt_asset_id".as("slice_played_asset_id"),
>   $"cdnt_euid".as("slice_played_euid")
> ).agg(
>   min($"cdnt_event_time").as("slice_start_time"),
>   min($"cdnt_playing_owner_id").as("slice_played_playing_owner_id"),
>   min($"cdnt_user_ip").as("slice_played_user_ip"),
>   min($"cdnt_user_agent").as("slice_played_user_agent"),
>   min($"cdnt_referer").as("slice_played_referer"),
>   max($"cdnt_event_time").as("slice_end_time"),
>   countDistinct($"cdnt_slice_number").as("slice_count_distinct"),
>   count($"cdnt_slice_number").as("slice_count_total"),
>   min($"cdnt_slice_number").as("min_slice_number"),
>   max($"cdnt_slice_number").as("max_slice_number"),
>   min($"cdnt_is_live").as("is_live")
> ).show(false)
> {code}
> The +only+ difference between the two queries are that I'm adding more columns to the {{agg}} method.
> I can't reproduce by manually creating a dataFrame from {{DataFrame.parallelize}}. The original sources of the dataFrames are parquet files.
> The explain plans for the two queries are slightly different.
> {code}
> == Physical Plan ==
> TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], functions=[(count(cdnt_slice_number#24L),mode=Final,isDistinct=false),(min(cdnt_slice_number#24L),mode=Final,isDistinct=false),(max(cdnt_slice_number#24L),mode=Final,isDistinct=false),(count(cdnt_slice_number#24L),mode=Complete,isDistinct=true)], output=[slice_played_session_id#780,slice_played_asset_id#781,slice_played_euid#782,slice_count_distinct#783L,slice_count_total#784L,min_slice_number#785L,max_slice_number#786L])
>  TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L], functions=[(count(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(min(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(max(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false)], output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,currentCount#795L,min#797L,max#799L])
>   TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L], functions=[(count(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(min(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(max(cdnt_slice_number#24L),mode=Partial,isDistinct=false)], output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,currentCount#795L,min#797L,max#799L])
>    TungstenProject [cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L]
>     SortMergeJoin [cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], [join_session_id#41,join_asset_id#42,join_euid#43]
>      TungstenSort [cdnt_session_id#23 ASC,cdnt_asset_id#5 ASC,cdnt_euid#13 ASC], false, 0
>       TungstenExchange hashpartitioning(cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13)
>        ConvertToUnsafe
>         Scan ParquetRelation[hdfs://hadoop-namenode1:8020/user/hive/warehouse/src_cdn_events][cdnt_slice_number#24L,cdnt_euid#13,cdnt_asset_id#5,cdnt_session_id#23]
>      TungstenSort [join_session_id#41 ASC,join_asset_id#42 ASC,join_euid#43 ASC], false, 0
>       TungstenExchange hashpartitioning(join_session_id#41,join_asset_id#42,join_euid#43)
>        ConvertToUnsafe
>         Scan PhysicalRDD[join_session_id#41,join_asset_id#42,join_euid#43]
> {code}
> {code}
> == Physical Plan ==
> SortBasedAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], functions=[(max(cdnt_event_time#4),mode=Final,isDistinct=false),(min(cdnt_event_time#4),mode=Final,isDistinct=false),(min(cdnt_is_live#18),mode=Final,isDistinct=false),(min(cdnt_playing_owner_id#21),mode=Final,isDistinct=false),(max(cdnt_slice_number#24L),mode=Final,isDistinct=false),(min(cdnt_slice_number#24L),mode=Final,isDistinct=false),(count(cdnt_slice_number#24L),mode=Final,isDistinct=false),(min(cdnt_user_ip#31),mode=Final,isDistinct=false),(min(cdnt_user_agent#30),mode=Final,isDistinct=false),(min(cdnt_referer#22),mode=Final,isDistinct=false),(count(cdnt_slice_number#24L),mode=Complete,isDistinct=true)], output=[slice_played_session_id#721,slice_played_asset_id#722,slice_played_euid#723,slice_start_time#724,slice_played_playing_owner_id#725,slice_played_user_ip#726,slice_played_user_agent#727,slice_played_referer#728,slice_end_time#729,slice_count_distinct#730L,slice_count_total#731L,min_slice_number#732L,max_slice_number#733L,is_live#734])
>  SortBasedAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L], functions=[(max(cdnt_event_time#4),mode=PartialMerge,isDistinct=false),(min(cdnt_event_time#4),mode=PartialMerge,isDistinct=false),(min(cdnt_is_live#18),mode=PartialMerge,isDistinct=false),(min(cdnt_playing_owner_id#21),mode=PartialMerge,isDistinct=false),(max(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(min(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(count(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(min(cdnt_user_ip#31),mode=PartialMerge,isDistinct=false),(min(cdnt_user_agent#30),mode=PartialMerge,isDistinct=false),(min(cdnt_referer#22),mode=PartialMerge,isDistinct=false)], output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,max#758,min#748,min#768,min#750,max#766L,min#764L,currentCount#762L,min#752,min#754,min#756])
>   SortBasedAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L], functions=[(max(cdnt_event_time#4),mode=Partial,isDistinct=false),(min(cdnt_event_time#4),mode=Partial,isDistinct=false),(min(cdnt_is_live#18),mode=Partial,isDistinct=false),(min(cdnt_playing_owner_id#21),mode=Partial,isDistinct=false),(max(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(min(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(count(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(min(cdnt_user_ip#31),mode=Partial,isDistinct=false),(min(cdnt_user_agent#30),mode=Partial,isDistinct=false),(min(cdnt_referer#22),mode=Partial,isDistinct=false)], output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,max#758,min#748,min#768,min#750,max#766L,min#764L,currentCount#762L,min#752,min#754,min#756])
>    ConvertToSafe
>     TungstenProject [cdnt_playing_owner_id#21,cdnt_session_id#23,cdnt_slice_number#24L,cdnt_euid#13,cdnt_event_time#4,cdnt_is_live#18,cdnt_user_ip#31,cdnt_user_agent#30,cdnt_referer#22,cdnt_asset_id#5]
>      SortMergeJoin [cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], [join_session_id#41,join_asset_id#42,join_euid#43]
>       TungstenSort [cdnt_session_id#23 ASC,cdnt_asset_id#5 ASC,cdnt_euid#13 ASC], false, 0
>        TungstenExchange hashpartitioning(cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13)
>         ConvertToUnsafe
>          Scan ParquetRelation[hdfs://hadoop-namenode1:8020/user/hive/warehouse/src_cdn_events][cdnt_playing_owner_id#21,cdnt_session_id#23,cdnt_slice_number#24L,cdnt_euid#13,cdnt_event_time#4,cdnt_is_live#18,cdnt_user_ip#31,cdnt_user_agent#30,cdnt_referer#22,cdnt_asset_id#5]
>       TungstenSort [join_session_id#41 ASC,join_asset_id#42 ASC,join_euid#43 ASC], false, 0
>        TungstenExchange hashpartitioning(join_session_id#41,join_asset_id#42,join_euid#43)
>         ConvertToUnsafe
>          Scan PhysicalRDD[join_session_id#41,join_asset_id#42,join_euid#43]
> {code}
> The biggest difference betwen the two plans is whether TungstenAggregate is used or whether SortBasedAggregate+ConvertToSafe is used. The SortBasedAggregate+ConvertToSafe method gives the inaccurate results. I've been able to get around this issue by adding a {{sortBy}} call before the {{groupBy}} clause, but it strikes me that this particular calculation shouldn't change by adding a manual sort in an intermediate step.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org