You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2016/09/23 11:36:22 UTC
[jira] [Comment Edited] (SPARK-17621) Accumulator value is doubled
when using DataFrame.orderBy()
[ https://issues.apache.org/jira/browse/SPARK-17621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15516190#comment-15516190 ]
Hyukjin Kwon edited comment on SPARK-17621 at 9/23/16 11:35 AM:
----------------------------------------------------------------
Hi [~srowen] and [~sreelalsl], I just happened to look at this JIRA.
I first also thought you evaluated this twice. I manually tested more cases and I found that it seems actually an issue.
For example, running the codes below:
{code}
val acc = spark.sparkContext.longAccumulator("acc")
val df = spark.range(1).as[Long]
df.map { r =>
acc.add(1)
r
}.sort("value").collect()
println(acc.value)
{code}
prints {{2}} but the codes below:
{code}
val acc = spark.sparkContext.longAccumulator("acc")
val df = spark.range(1).as[Long]
df.map { r =>
acc.add(1)
r
}.collect()
println(acc.value)
{code}
prints {{1}}.
I took a look deeper and debugged this with IDE for myself. It seems when the explicitly the sorting field is set, it seems trying to shuffle data with {{RangePartitioner}}. This partitioner seems collecting a sample first and then calculating some ranges for each partition. So it seems {{collect()}} happens here first, which seems launching another Spark job, and then again for final results.
In more details, [{{RangePartitioner.sketch}}|https://github.com/apache/spark/blob/4efcdb7feae24e41d8120b59430f8b77cc2106a6/core/src/main/scala/org/apache/spark/Partitioner.scala#L259-L266] is called which actually collects the data.
Launching another job is also explained here - https://github.com/apache/spark/blob/511f52f8423e151b0d0133baf040d34a0af3d422/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala#L213-L214
was (Author: hyukjin.kwon):
Hi [~srowen] and [~sreelalsl], I just happened to look at this JIRA.
I first also thought you evaluated this twice. I manually tested more cases and I found that it seems actually an issue.
For example, running the codes below:
{code}
val acc = spark.sparkContext.longAccumulator("acc")
val df = spark.range(1).as[Long]
df.map { r =>
acc.add(1)
r
}.sort("value").collect()
println(acc.value)
{code}
prints {{2}} but the codes below:
{code}
val acc = spark.sparkContext.longAccumulator("acc")
val df = spark.range(1).as[Long]
df.map { r =>
acc.add(1)
r
}.collect()
println(acc.value)
{code}
prints {{1}}.
I took a look deeper and debugged this with IDE for myself. It seems when the explicitly the sorting field is set, it seems trying to shuffle data with {{RangePartitioner}}. This partitioner seems collecting a sample first and then calculating some ranges for each partition. So it seems {{collect()}} happens here first, which seems launching another Spark job, and then again for final results.
In more details, [{{RangePartitioner}}|https://github.com/apache/spark/blob/4efcdb7feae24e41d8120b59430f8b77cc2106a6/core/src/main/scala/org/apache/spark/Partitioner.scala#L259-L266] calls sketch [1] which actually collects the data.
This is also explained here - https://github.com/apache/spark/blob/511f52f8423e151b0d0133baf040d34a0af3d422/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala#L213-L214
> Accumulator value is doubled when using DataFrame.orderBy()
> -----------------------------------------------------------
>
> Key: SPARK-17621
> URL: https://issues.apache.org/jira/browse/SPARK-17621
> Project: Spark
> Issue Type: Bug
> Components: Scheduler, SQL
> Affects Versions: 2.0.0
> Environment: Development environment. (Eclipse . Single process)
> Reporter: Sreelal S L
> Priority: Minor
>
> We are tracing the records read by our source using an accumulator. We do a orderBy on the Dataframe before the output operation. When the job is completed, the accumulator values is becoming double of the expected value . .
> Below is the sample code i ran .
> {code}
> val sqlContext = SparkSession.builder()
> .config("spark.sql.retainGroupColumns", false).config("spark.sql.warehouse.dir", "file:///C:/Test").master("local[*]")
> .getOrCreate()
> val sc = sqlContext.sparkContext
> val accumulator1 = sc.accumulator(0, "accumulator1")
> val usersDF = sqlContext.read.json("C:\\users.json") // single row {"name":"sreelal" ,"country":"IND"}
> val usersDFwithCount = usersDF.rdd.map(x => { accumulator1 += 1; x });
> val counterDF = sqlContext.createDataFrame(usersDFwithCount, usersDF.schema);
> val oderedDF = counterDF.orderBy("name")
> val collected = oderedDF.collect()
> collected.foreach { x => println(x) }
> println("accumulator1 : " + accumulator1.value)
> println("Done");
> {code}
> I have only one row in the users.json file. I expect accumulator1 to have value 1. But its coming as 2.
> In the Spark Sql UI , i see two jobs getting generated for the same.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org