You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Utkarsh Sharma (JIRA)" <ji...@apache.org> on 2019/02/22 17:18:00 UTC
[jira] [Updated] (SPARK-26974) Invalid data in grouped cached
dataset, formed by joining a large cached dataset with a small dataset
[ https://issues.apache.org/jira/browse/SPARK-26974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Utkarsh Sharma updated SPARK-26974:
-----------------------------------
Description:
The initial datasets are derived from hive tables using the spark.table() functions.
Dataset descriptions:
*+Sales+* dataset (close to 10 billion rows) with the following columns (and sample rows) :
||ItemId (bigint)||CustomerId (bigint)||qty_sold (bigint)||
|1|1|20|
|1|2|30|
|2|1|40|
+*Customer*+ Dataset (close to 50000 rows) with the following columns (and sample rows):
||CustomerId (bigint)||CustomerGrpNbr (smallint)||
|1|1|
|2|2|
|3|1|
I am doing the following steps:
# Caching sales dataset with close to 10 billion rows.
# Doing an inner join of 'sales' with 'customer' dataset
# Doing group by on the resultant dataset, based on CustomerGrpNbr column to get sum(qty_sold) and stddev(qty_sold) vales in the customer groups.
# Caching the resultant grouped dataset.
# Doing a .count() on the grouped dataset.
The step 5 count is supposed to return only 20, because when you do a customer.select("CustomerGroupNbr").distinct().count you get 20 values. However, you get a value of around 65,000 in step 5.
Following are the commands I am running in spark-shell:
{code:java}
var sales = spark.table("sales_table")
var customer = spark.table("customer_table")
var finalDf = sales.join(customer, "CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), stddev("qty_sold"))
sales.cache()
finalDf.cache()
finalDf.count() // returns around 65k rows and the count keeps on varying each // run
customer.select("CustomerGrpNbr").distinct().count() //returns 20{code}
I have been able to replicate the same behavior using the java api as well. This anamolous behavior disappears however, when I remove the caching statements. I.e. if i run the following in spark-shell, it works as expected:
{code:java}
var sales = spark.table("sales_table")
var customer = spark.table("customer_table")
var finalDf = sales.join(customer, "CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), stddev("qty_sold"))
finalDf.count() // returns 20 customer.select("CustomerGrpNbr").distinct().count() //returns 20
{code}
The tables in hive from which the datasets are built do not change during this entire process. So why does the caching cause this problem?
was:
The initial datasets are derived from hive tables using the spark.table() functions.
Dataset descriptions:
*+Sales+* dataset (close to 10 billion rows) with the following columns (and sample rows) :
||ItemId (bigint)||CustomerId (bigint)||qty_sold (bigint)||
|1|1|20|
|1|2|30|
|2|1|40|
+*Customer*+ Dataset (close to 50000 rows) with the following columns (and sample rows):
||CustomerId (bigint)||CustomerGrpNbr (smallint)||
|1|1|
|2|2|
|3|1|
I am doing the following steps:
# Caching sales dataset with close to 10 billion rows.
# Doing an inner join of 'sales' with 'customer' dataset
# Doing group by on the resultant dataset, based on CustomerGrpNbr column to get sum(qty_sold) and stddev(qty_sold) vales in the customer groups.
# Caching the resultant grouped dataset.
# Doing a .count() on the grouped dataset.
The step 5 count is supposed to return only 20, because when you do a customer.select("CustomerGroupNbr").distinct().count you get 20 values. However, you get a value of around 65,000 in step 5.
Following are the commands I am running in spark-shell:
{code:java}
var sales = spark.table("sales_table")
var customer = spark.table(“customer_table”)
var finalDf = sales.join(customer, "CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), stddev("qty_sold"))
sales.cache()
finalDf.cache()
finalDf.count() // returns around 65k rows and the count keeps on varying each // run
customer.select("CustomerGrpNbr").distinct().count() //returns 20{code}
I have been able to replicate the same behavior using the java api as well. This anamolous behavior disappears however, when I remove the caching statements. I.e. if i run the following in spark-shell, it works as expected:
{code:java}
var sales = spark.table("sales_table")
var customer = spark.table(“customer_table”)
var finalDf = sales.join(customer, "CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), stddev("qty_sold"))
finalDf.count() // returns 20 customer.select("CustomerGrpNbr").distinct().count() //returns 20
{code}
The tables in hive from which the datasets are built do not change during this entire process. So why does the caching cause this problem?
> Invalid data in grouped cached dataset, formed by joining a large cached dataset with a small dataset
> -----------------------------------------------------------------------------------------------------
>
> Key: SPARK-26974
> URL: https://issues.apache.org/jira/browse/SPARK-26974
> Project: Spark
> Issue Type: Bug
> Components: Java API, Spark Core, SQL
> Affects Versions: 2.2.0
> Reporter: Utkarsh Sharma
> Priority: Major
> Labels: caching, data-corruption, data-integrity
>
> The initial datasets are derived from hive tables using the spark.table() functions.
> Dataset descriptions:
> *+Sales+* dataset (close to 10 billion rows) with the following columns (and sample rows) :
> ||ItemId (bigint)||CustomerId (bigint)||qty_sold (bigint)||
> |1|1|20|
> |1|2|30|
> |2|1|40|
>
> +*Customer*+ Dataset (close to 50000 rows) with the following columns (and sample rows):
> ||CustomerId (bigint)||CustomerGrpNbr (smallint)||
> |1|1|
> |2|2|
> |3|1|
>
> I am doing the following steps:
> # Caching sales dataset with close to 10 billion rows.
> # Doing an inner join of 'sales' with 'customer' dataset
>
> # Doing group by on the resultant dataset, based on CustomerGrpNbr column to get sum(qty_sold) and stddev(qty_sold) vales in the customer groups.
> # Caching the resultant grouped dataset.
> # Doing a .count() on the grouped dataset.
> The step 5 count is supposed to return only 20, because when you do a customer.select("CustomerGroupNbr").distinct().count you get 20 values. However, you get a value of around 65,000 in step 5.
> Following are the commands I am running in spark-shell:
> {code:java}
> var sales = spark.table("sales_table")
> var customer = spark.table("customer_table")
> var finalDf = sales.join(customer, "CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), stddev("qty_sold"))
> sales.cache()
> finalDf.cache()
> finalDf.count() // returns around 65k rows and the count keeps on varying each // run
> customer.select("CustomerGrpNbr").distinct().count() //returns 20{code}
> I have been able to replicate the same behavior using the java api as well. This anamolous behavior disappears however, when I remove the caching statements. I.e. if i run the following in spark-shell, it works as expected:
> {code:java}
> var sales = spark.table("sales_table")
> var customer = spark.table("customer_table")
> var finalDf = sales.join(customer, "CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), stddev("qty_sold"))
> finalDf.count() // returns 20 customer.select("CustomerGrpNbr").distinct().count() //returns 20
> {code}
> The tables in hive from which the datasets are built do not change during this entire process. So why does the caching cause this problem?
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org