You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2017/01/10 05:14:58 UTC
[jira] [Commented] (SPARK-12076) countDistinct behaves
inconsistently
[ https://issues.apache.org/jira/browse/SPARK-12076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15813923#comment-15813923 ]
Hyukjin Kwon commented on SPARK-12076:
--------------------------------------
Could I ask to narrow down the problem or self-contained reproducer? I am willing to help verify this.
> countDistinct behaves inconsistently
> ------------------------------------
>
> Key: SPARK-12076
> URL: https://issues.apache.org/jira/browse/SPARK-12076
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.5.1
> Reporter: Paul Zaczkieiwcz
> Priority: Minor
>
> Assume:
> {code:java}
> val slicePlayed:DataFrame = _
> val joinKeys:DataFrame = _
> {code}
> Also assume that all columns beginning with "cdnt_" are from {{slicePlayed}} and all columns beginning with "join_" are from {{joinKeys}}. The following queries can return different values for slice_count_distinct:
> {code:java}
> slicePlayed.join(
> joinKeys,
> (
> $"join_session_id" === $"cdnt_session_id" &&
> $"join_asset_id" === $"cdnt_asset_id" &&
> $"join_euid" === $"cdnt_euid"
> ),
> "inner"
> ).groupBy(
> $"cdnt_session_id".as("slice_played_session_id"),
> $"cdnt_asset_id".as("slice_played_asset_id"),
> $"cdnt_euid".as("slice_played_euid")
> ).agg(
> countDistinct($"cdnt_slice_number").as("slice_count_distinct"),
> count($"cdnt_slice_number").as("slice_count_total"),
> min($"cdnt_slice_number").as("min_slice_number"),
> max($"cdnt_slice_number").as("max_slice_number")
> ).show(false)
> {code}
> {code:java}
> slicePlayed.join(
> joinKeys,
> (
> $"join_session_id" === $"cdnt_session_id" &&
> $"join_asset_id" === $"cdnt_asset_id" &&
> $"join_euid" === $"cdnt_euid"
> ),
> "inner"
> ).groupBy(
> $"cdnt_session_id".as("slice_played_session_id"),
> $"cdnt_asset_id".as("slice_played_asset_id"),
> $"cdnt_euid".as("slice_played_euid")
> ).agg(
> min($"cdnt_event_time").as("slice_start_time"),
> min($"cdnt_playing_owner_id").as("slice_played_playing_owner_id"),
> min($"cdnt_user_ip").as("slice_played_user_ip"),
> min($"cdnt_user_agent").as("slice_played_user_agent"),
> min($"cdnt_referer").as("slice_played_referer"),
> max($"cdnt_event_time").as("slice_end_time"),
> countDistinct($"cdnt_slice_number").as("slice_count_distinct"),
> count($"cdnt_slice_number").as("slice_count_total"),
> min($"cdnt_slice_number").as("min_slice_number"),
> max($"cdnt_slice_number").as("max_slice_number"),
> min($"cdnt_is_live").as("is_live")
> ).show(false)
> {code}
> The +only+ difference between the two queries are that I'm adding more columns to the {{agg}} method.
> I can't reproduce by manually creating a dataFrame from {{DataFrame.parallelize}}. The original sources of the dataFrames are parquet files.
> The explain plans for the two queries are slightly different.
> {code}
> == Physical Plan ==
> TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], functions=[(count(cdnt_slice_number#24L),mode=Final,isDistinct=false),(min(cdnt_slice_number#24L),mode=Final,isDistinct=false),(max(cdnt_slice_number#24L),mode=Final,isDistinct=false),(count(cdnt_slice_number#24L),mode=Complete,isDistinct=true)], output=[slice_played_session_id#780,slice_played_asset_id#781,slice_played_euid#782,slice_count_distinct#783L,slice_count_total#784L,min_slice_number#785L,max_slice_number#786L])
> TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L], functions=[(count(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(min(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(max(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false)], output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,currentCount#795L,min#797L,max#799L])
> TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L], functions=[(count(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(min(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(max(cdnt_slice_number#24L),mode=Partial,isDistinct=false)], output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,currentCount#795L,min#797L,max#799L])
> TungstenProject [cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L]
> SortMergeJoin [cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], [join_session_id#41,join_asset_id#42,join_euid#43]
> TungstenSort [cdnt_session_id#23 ASC,cdnt_asset_id#5 ASC,cdnt_euid#13 ASC], false, 0
> TungstenExchange hashpartitioning(cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13)
> ConvertToUnsafe
> Scan ParquetRelation[hdfs://hadoop-namenode1:8020/user/hive/warehouse/src_cdn_events][cdnt_slice_number#24L,cdnt_euid#13,cdnt_asset_id#5,cdnt_session_id#23]
> TungstenSort [join_session_id#41 ASC,join_asset_id#42 ASC,join_euid#43 ASC], false, 0
> TungstenExchange hashpartitioning(join_session_id#41,join_asset_id#42,join_euid#43)
> ConvertToUnsafe
> Scan PhysicalRDD[join_session_id#41,join_asset_id#42,join_euid#43]
> {code}
> {code}
> == Physical Plan ==
> SortBasedAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], functions=[(max(cdnt_event_time#4),mode=Final,isDistinct=false),(min(cdnt_event_time#4),mode=Final,isDistinct=false),(min(cdnt_is_live#18),mode=Final,isDistinct=false),(min(cdnt_playing_owner_id#21),mode=Final,isDistinct=false),(max(cdnt_slice_number#24L),mode=Final,isDistinct=false),(min(cdnt_slice_number#24L),mode=Final,isDistinct=false),(count(cdnt_slice_number#24L),mode=Final,isDistinct=false),(min(cdnt_user_ip#31),mode=Final,isDistinct=false),(min(cdnt_user_agent#30),mode=Final,isDistinct=false),(min(cdnt_referer#22),mode=Final,isDistinct=false),(count(cdnt_slice_number#24L),mode=Complete,isDistinct=true)], output=[slice_played_session_id#721,slice_played_asset_id#722,slice_played_euid#723,slice_start_time#724,slice_played_playing_owner_id#725,slice_played_user_ip#726,slice_played_user_agent#727,slice_played_referer#728,slice_end_time#729,slice_count_distinct#730L,slice_count_total#731L,min_slice_number#732L,max_slice_number#733L,is_live#734])
> SortBasedAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L], functions=[(max(cdnt_event_time#4),mode=PartialMerge,isDistinct=false),(min(cdnt_event_time#4),mode=PartialMerge,isDistinct=false),(min(cdnt_is_live#18),mode=PartialMerge,isDistinct=false),(min(cdnt_playing_owner_id#21),mode=PartialMerge,isDistinct=false),(max(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(min(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(count(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(min(cdnt_user_ip#31),mode=PartialMerge,isDistinct=false),(min(cdnt_user_agent#30),mode=PartialMerge,isDistinct=false),(min(cdnt_referer#22),mode=PartialMerge,isDistinct=false)], output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,max#758,min#748,min#768,min#750,max#766L,min#764L,currentCount#762L,min#752,min#754,min#756])
> SortBasedAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L], functions=[(max(cdnt_event_time#4),mode=Partial,isDistinct=false),(min(cdnt_event_time#4),mode=Partial,isDistinct=false),(min(cdnt_is_live#18),mode=Partial,isDistinct=false),(min(cdnt_playing_owner_id#21),mode=Partial,isDistinct=false),(max(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(min(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(count(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(min(cdnt_user_ip#31),mode=Partial,isDistinct=false),(min(cdnt_user_agent#30),mode=Partial,isDistinct=false),(min(cdnt_referer#22),mode=Partial,isDistinct=false)], output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,max#758,min#748,min#768,min#750,max#766L,min#764L,currentCount#762L,min#752,min#754,min#756])
> ConvertToSafe
> TungstenProject [cdnt_playing_owner_id#21,cdnt_session_id#23,cdnt_slice_number#24L,cdnt_euid#13,cdnt_event_time#4,cdnt_is_live#18,cdnt_user_ip#31,cdnt_user_agent#30,cdnt_referer#22,cdnt_asset_id#5]
> SortMergeJoin [cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], [join_session_id#41,join_asset_id#42,join_euid#43]
> TungstenSort [cdnt_session_id#23 ASC,cdnt_asset_id#5 ASC,cdnt_euid#13 ASC], false, 0
> TungstenExchange hashpartitioning(cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13)
> ConvertToUnsafe
> Scan ParquetRelation[hdfs://hadoop-namenode1:8020/user/hive/warehouse/src_cdn_events][cdnt_playing_owner_id#21,cdnt_session_id#23,cdnt_slice_number#24L,cdnt_euid#13,cdnt_event_time#4,cdnt_is_live#18,cdnt_user_ip#31,cdnt_user_agent#30,cdnt_referer#22,cdnt_asset_id#5]
> TungstenSort [join_session_id#41 ASC,join_asset_id#42 ASC,join_euid#43 ASC], false, 0
> TungstenExchange hashpartitioning(join_session_id#41,join_asset_id#42,join_euid#43)
> ConvertToUnsafe
> Scan PhysicalRDD[join_session_id#41,join_asset_id#42,join_euid#43]
> {code}
> The biggest difference betwen the two plans is whether TungstenAggregate is used or whether SortBasedAggregate+ConvertToSafe is used. The SortBasedAggregate+ConvertToSafe method gives the inaccurate results. I've been able to get around this issue by adding a {{sortBy}} call before the {{groupBy}} clause, but it strikes me that this particular calculation shouldn't change by adding a manual sort in an intermediate step.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org