You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Mateusz Jukiewicz (JIRA)" <ji...@apache.org> on 2018/02/12 08:23:00 UTC

[jira] [Commented] (SPARK-23298) distinct.count on Dataset/DataFrame yields non-deterministic results

    [ https://issues.apache.org/jira/browse/SPARK-23298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360415#comment-16360415 ] 

Mateusz Jukiewicz commented on SPARK-23298:
-------------------------------------------

Not sure but seems like it could be related to SPARK-23207

> distinct.count on Dataset/DataFrame yields non-deterministic results
> --------------------------------------------------------------------
>
>                 Key: SPARK-23298
>                 URL: https://issues.apache.org/jira/browse/SPARK-23298
>             Project: Spark
>          Issue Type: Bug
>          Components: Shuffle, SQL, YARN
>    Affects Versions: 2.1.0, 2.2.0
>         Environment: Spark 2.2.0 or 2.1.0
> Java 1.8.0_144
> Yarn version:
> {code:java}
> Hadoop 2.6.0-cdh5.12.1
> Subversion http://github.com/cloudera/hadoop -r 520d8b072e666e9f21d645ca6a5219fc37535a52
> Compiled by jenkins on 2017-08-24T16:43Z
> Compiled with protoc 2.5.0
> From source with checksum de51bf9693ab9426379a1cd28142cea0
> This command was run using /usr/lib/hadoop/hadoop-common-2.6.0-cdh5.12.1.jar{code}
>  
>  
>            Reporter: Mateusz Jukiewicz
>            Priority: Major
>
> This is what happens:
> {code:java}
> /* Exemplary spark-shell starting command 
> /opt/spark/bin/spark-shell \
> --num-executors 269 \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
> --conf spark.kryoserializer.buffer.max=512m 
> */
> val dataset = spark.read.textFile("/text_dataset.out")
> dataset.distinct.count
> // res0: Long = 24025868
> dataset.distinct.count
> // res1: Long = 24014227{code}
> The _text_dataset.out_ file is a dataset with one string per line. The string has alphanumeric characters as well as colons and spaces. The line length does not exceed 1200. I don't think that's important though, as the issue appeared on various other datasets, I just tried to narrow it down to the simplest possible case.
> The observations regarding the issue are as follows:
>  * I managed to reproduce it on both spark 2.2 and spark 2.1.
>  * The issue occurs in YARN cluster mode (I haven't tested YARN client mode).
>  * The issue is not reproducible on a single machine (e.g. laptop) in spark local mode.
>  * It seems that once the correct count is computed, it is not possible to reproduce the issue in the same spark session. In other words, I was able to get 2-3 incorrect distinct.count results consecutively, but once it got right, it always returned the correct value. I had to re-run spark-shell to observe the problem again.
>  * The issue appears on both Dataset and DataFrame (i.e. using read.text or read.textFile).
>  * The issue is not reproducible on RDD (i.e. dataset.rdd.distinct.count).
>  * Not a single container has failed in those multiple invalid executions.
>  * YARN doesn't show any warnings or errors in those invalid executions.
>  * The execution plan determined for both valid and invalid executions was always the same (it's shown in the _SQL_ tab of the UI).
>  * The number returned in the invalid executions was always greater than the correct number (24 014 227).
>  * This occurs even though the input is already completely deduplicated (i.e. _distinct.count_ shouldn't change anything).
>  * The input isn't replicated (i.e. there's only one copy of each file block on the HDFS).
>  * The problem is probably not related to reading from HDFS. Spark was always able to correctly read all input records (which was shown in the UI), and that number got malformed after the exchange phase:
>  ** correct execution:
>  Input Size / Records: 3.9 GB / 24014227 _(first stage)_
>  Shuffle Write: 3.3 GB / 24014227 _(first stage)_
>  Shuffle Read: 3.3 GB / 24014227 _(second stage)_
>  ** incorrect execution:
>  Input Size / Records: 3.9 GB / 24014227 _(first stage)_
>  Shuffle Write: 3.3 GB / 24014227 _(first stage)_
>  Shuffle Read: 3.3 GB / 24020150 _(second stage)_
>  * The problem might be related with the internal way of Encoders hashing. The reason might be:
>  ** in a simple `distinct.count` invocation, there are in total three hash-related stages (called `HashAggregate`),
>  ** excerpt from scaladoc for `distinct` method says:
> {code:java}
>    * @note Equality checking is performed directly on the encoded representation of the data
>    * and thus is not affected by a custom `equals` function defined on `T`.{code}
>  * One of my suspicions was the number of partitions we're using (2154). This is greater than 2000, which means that a different data structure (i.e. _HighlyCompressedMapStatus_instead of _CompressedMapStatus_) will be used for book-keeping during the shuffle. Unfortunately after decreasing the number below this threshold the problem still occurs.
>  * It's easier to reproduce the issue with a large number of partitions.
>  * One of my another suspicions was that it's somehow related to the number of blocks on the HDFS (974). I was able to reproduce the problem with both less and more partitions than this value, so I think this is not the case.
>  * Final note: It looks like for some reason the data gets duplicated in the process of data exchange during the shuffle (because shuffle read sees more elements than shuffle write has written).
> Please let me know if you have any other questions.
> I couldn't find much about similar problems on the Web, the only thing I found was on the spark mailing list where someone using PySpark has found that one of his/her executors was hashing things differently than the other one which caused a similar issue.
> I didn't include a reproducible example as this is just a long file with strings and as this occurred on many different datasets, I doubt it's data-related. If that's necessary though, please let me know and I will try to prepare an example.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org