You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dan Brown (JIRA)" <ji...@apache.org> on 2015/10/06 01:59:26 UTC

[jira] [Created] (SPARK-10939) Misaligned data with RDD.zip after repartition

Dan Brown created SPARK-10939:
---------------------------------

             Summary: Misaligned data with RDD.zip after repartition
                 Key: SPARK-10939
                 URL: https://issues.apache.org/jira/browse/SPARK-10939
             Project: Spark
          Issue Type: Bug
    Affects Versions: 1.5.0, 1.4.1, 1.3.0
         Environment: - OSX 10.10.4, java 1.7.0_51, hadoop 2.6.0-cdh5.4.5
- Ubuntu 12.04, java 1.7.0_80, hadoop 2.6.0-cdh5.4.5
            Reporter: Dan Brown


Split out from https://issues.apache.org/jira/browse/SPARK-10685:

Here's a weird behavior where {{RDD.zip}} after a {{repartition}} produces "misaligned" data, meaning different column values in the same row aren't matched, as if a zip shuffled the collections before zipping them. It's difficult to reproduce because it's nondeterministic, doesn't occur in local mode, and requires ≥2 workers (≥3 in one case). I was able to repro it using pyspark 1.3.0 (cdh5.4.5), 1.4.1 (bin-without-hadoop), and 1.5.0 (bin-without-hadoop).

Also, this {{DataFrame.zip}} issue is related in spirit, since we were trying to build it ourselves when we ran into this problem. Let me put in my vote for reopening the issue and supporting {{DataFrame.zip}} in the standard lib.

- https://issues.apache.org/jira/browse/SPARK-7460

h3. Repro

Fail: RDD.zip after repartition
{code}
df  = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
df  = df.repartition(100)
rdd = df.rdd.zip(df.map(lambda r: Row(b=r.a))).map(lambda (x,y): Row(a=x.a, b=y.b))
[r for r in rdd.collect() if r.a != r.b][:3] # Should be []
{code}

Sample outputs (nondeterministic):
{code}
[]
[Row(a=50, b=6947), Row(a=150, b=7047), Row(a=250, b=7147)]
[]
[]
[Row(a=44, b=644), Row(a=144, b=744), Row(a=244, b=844)]
[]
{code}

Test setup:

- local\[8]: {{MASTER=local\[8]}}
- dist\[N]: 1 driver + 1 master + N workers

{code}
"Fail" tests pass?  cluster mode  spark version
----------------------------------------------------
yes                 local[8]      1.3.0-cdh5.4.5
no                  dist[4]       1.3.0-cdh5.4.5
yes                 local[8]      1.4.1
yes                 dist[1]       1.4.1
no                  dist[2]       1.4.1
no                  dist[4]       1.4.1
yes                 local[8]      1.5.0
yes                 dist[1]       1.5.0
no                  dist[2]       1.5.0
no                  dist[4]       1.5.0
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org