You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2020/02/19 05:02:00 UTC

[jira] [Resolved] (SPARK-26974) Invalid data in grouped cached dataset, formed by joining a large cached dataset with a small dataset

     [ https://issues.apache.org/jira/browse/SPARK-26974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-26974.
----------------------------------
    Resolution: Incomplete

> Invalid data in grouped cached dataset, formed by joining a large cached dataset with a small dataset
> -----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-26974
>                 URL: https://issues.apache.org/jira/browse/SPARK-26974
>             Project: Spark
>          Issue Type: Bug
>          Components: Java API, Spark Core, SQL
>    Affects Versions: 2.2.0
>            Reporter: Utkarsh Sharma
>            Priority: Major
>              Labels: caching, data-corruption, data-integrity
>
> The initial datasets are derived from hive tables using the spark.table() functions.
> Dataset descriptions:
> *+Sales+* dataset (close to 10 billion rows) with the following columns (and sample rows) : 
> ||ItemId (bigint)||CustomerId (bigint)||qty_sold (bigint)||
> |1|1|20|
> |1|2|30|
> |2|1|40|
>  
> +*Customer*+ Dataset (close to 50000 rows) with the following columns (and sample rows):
> ||CustomerId (bigint)||CustomerGrpNbr (smallint)||
> |1|1|
> |2|2|
> |3|1|
>  
> I am doing the following steps:
>  # Caching sales dataset with close to 10 billion rows.
>  # Doing an inner join of 'sales' with 'customer' dataset
>  
>  # Doing group by on the resultant dataset, based on CustomerGrpNbr column to get sum(qty_sold) and stddev(qty_sold) vales in the customer groups.
>  # Caching the resultant grouped dataset.
>  # Doing a .count() on the grouped dataset.
> The step 5 count is supposed to return only 20, because when you do a customer.select("CustomerGroupNbr").distinct().count you get 20 values. However, you get a value of around 65,000 in step 5.
> Following are the commands I am running in spark-shell:
> {code:java}
> var sales = spark.table("sales_table")
> var customer = spark.table("customer_table")
> var finalDf = sales.join(customer, "CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), stddev("qty_sold"))
> sales.cache()
> finalDf.cache()
> finalDf.count() // returns around 65k rows and the count keeps on varying each run
> customer.select("CustomerGrpNbr").distinct().count() //returns 20{code}
> I have been able to replicate the same behavior using the java api as well. This anamolous behavior disappears however, when I remove the caching statements. I.e. if i run the following in spark-shell, it works as expected:
> {code:java}
> var sales = spark.table("sales_table")
> var customer = spark.table("customer_table")
> var finalDf = sales.join(customer, "CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), stddev("qty_sold")) 
> finalDf.count() // returns 20 
> customer.select("CustomerGrpNbr").distinct().count() //returns 20
> {code}
> The tables in hive from which the datasets are built do not change during this entire process. So why does the caching cause this problem?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org