You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Utkarsh Sharma (JIRA)" <ji...@apache.org> on 2019/02/22 17:16:00 UTC

[jira] [Created] (SPARK-26974) Invalid data in grouped cached dataset, formed by joining a large cached dataset with a small dataset

Utkarsh Sharma created SPARK-26974:
--------------------------------------

             Summary: Invalid data in grouped cached dataset, formed by joining a large cached dataset with a small dataset
                 Key: SPARK-26974
                 URL: https://issues.apache.org/jira/browse/SPARK-26974
             Project: Spark
          Issue Type: Bug
          Components: Java API, Spark Core, SQL
    Affects Versions: 2.2.0
            Reporter: Utkarsh Sharma


The initial datasets are derived from hive tables using the spark.table() functions.

Dataset descriptions:

*+Sales+* dataset (close to 10 billion rows) with the following columns (and sample rows) : 
||ItemId (bigint)||CustomerId (bigint)||qty_sold (bigint)||
|1|1|20|
|1|2|30|
|2|1|40|

 

+*Customer*+ Dataset (close to 50000 rows) with the following columns (and sample rows):
||CustomerId (bigint)||CustomerGrpNbr (smallint)||
|1|1|
|2|2|
|3|1|

 

I am doing the following steps:
 # Caching sales dataset with close to 10 billion rows.
 # Doing an inner join of 'sales' with 'customer' dataset
 
 # Doing group by on the resultant dataset, based on CustomerGrpNbr column to get sum(qty_sold) and stddev(qty_sold) vales in the customer groups.
 # Caching the resultant grouped dataset.
 # Doing a .count() on the grouped dataset.

The step 5 count is supposed to return only 20, because when you do a customer.select("CustomerGroupNbr").distinct().count you get 20 values. However, you get a value of around 65,000 in step 5.

Following are the commands I am running in spark-shell:
{code:java}
var sales = spark.table("sales_table")
var customer = spark.table(“customer_table”)
var finalDf = sales.join(customer, "CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), stddev("qty_sold"))
sales.cache()
finalDf.cache()
finalDf.count() // returns around 65k rows and the count keeps on varying each // run
customer.select("CustomerGrpNbr").distinct().count() //returns 20{code}
I have been able to replicate the same behavior using the java api as well. This anamolous behavior disappears however, when I remove the caching statements. I.e. if i run the following in spark-shell, it works as expected:
{code:java}
var sales = spark.table("sales_table")
var customer = spark.table(“customer_table”)
var finalDf = sales.join(customer, "CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), stddev("qty_sold")) 
finalDf.count() // returns 20 customer.select("CustomerGrpNbr").distinct().count() //returns 20
{code}
The tables in hive from which the datasets are built do not change during this entire process. So why does the caching cause this problem?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org