You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Stephen Hankinson <st...@affinio.com> on 2016/10/12 16:55:57 UTC

DataFrame/Dataset join not producing correct results in Spark 2.0/Yarn

Hi,

We have a cluster running Apache Spark 2.0 on Hadoop 2.7.2, Centos 7.2. We
had written some new code using the Spark DataFrame/DataSet APIs but are
noticing incorrect results on a join after writing and then reading data to
Windows Azure Storage Blobs (The default HDFS location). I've been able to
duplicate the issue with the following snippet of code running on the
cluster.

case class UserDimensions(user: Long, dimension: Long, score: Double)
case class CentroidClusterScore(dimension: Long, cluster: Int, score: Double)

val dims = sc.parallelize(Array(UserDimensions(12345, 0, 1.0))).toDS
val cent = sc.parallelize(Array(CentroidClusterScore(0, 1,
1.0),CentroidClusterScore(1, 0, 1.0),CentroidClusterScore(2, 2,
1.0))).toDS

dims.show
cent.show
dims.join(cent, dims("dimension") === cent("dimension") ).show

outputs

+-----+---------+-----+
| user|dimension|score|
+-----+---------+-----+
|12345|        0|  1.0|
+-----+---------+-----+

+---------+-------+-----+
|dimension|cluster|score|
+---------+-------+-----+
|        0|      1|  1.0|
|        1|      0|  1.0|
|        2|      2|  1.0|
+---------+-------+-----+

+-----+---------+-----+---------+-------+-----+
| user|dimension|score|dimension|cluster|score|
+-----+---------+-----+---------+-------+-----+
|12345|        0|  1.0|        0|      1|  1.0|
+-----+---------+-----+---------+-------+-----+

which is correct. However after writing and reading the data, we see this

dims.write.mode("overwrite").save("/tmp/dims2.parquet")
cent.write.mode("overwrite").save("/tmp/cent2.parquet")

val dims2 = spark.read.load("/tmp/dims2.parquet").as[UserDimensions]
val cent2 = spark.read.load("/tmp/cent2.parquet").as[CentroidClusterScore]

dims2.show
cent2.show

dims2.join(cent2, dims2("dimension") === cent2("dimension") ).show

outputs

+-----+---------+-----+
| user|dimension|score|
+-----+---------+-----+
|12345|        0|  1.0|
+-----+---------+-----+

+---------+-------+-----+
|dimension|cluster|score|
+---------+-------+-----+
|        0|      1|  1.0|
|        1|      0|  1.0|
|        2|      2|  1.0|
+---------+-------+-----+

+-----+---------+-----+---------+-------+-----+
| user|dimension|score|dimension|cluster|score|
+-----+---------+-----+---------+-------+-----+
|12345|        0|  1.0|     null|   null| null|
+-----+---------+-----+---------+-------+-----+

However, using the RDD API produces the correct result

dims2.rdd.map( row => (row.dimension, row) ).join( cent2.rdd.map( row
=> (row.dimension, row) ) ).take(5)

res5: Array[(Long, (UserDimensions, CentroidClusterScore))] =
Array((0,(UserDimensions(12345,0,1.0),CentroidClusterScore(0,1,1.0))))

We've tried changing the output format to ORC instead of parquet, but we
see the same results. Running Spark 2.0 locally, not on a cluster, does not
have this issue. Also running spark in local mode on the master node of the
Hadoop cluster also works. Only when running on top of YARN do we see this
issue.

This also seems very similar to this issue: https://issues.apache.
org/jira/browse/SPARK-10896
Thoughts?


*Stephen Hankinson*