You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2015/10/08 13:45:26 UTC

[jira] [Updated] (SPARK-10939) Misaligned data with RDD.zip after repartition

     [ https://issues.apache.org/jira/browse/SPARK-10939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean Owen updated SPARK-10939:
------------------------------
    Component/s: Spark Core

> Misaligned data with RDD.zip after repartition
> ----------------------------------------------
>
>                 Key: SPARK-10939
>                 URL: https://issues.apache.org/jira/browse/SPARK-10939
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.3.0, 1.4.1, 1.5.0
>         Environment: - OSX 10.10.4, java 1.7.0_51, hadoop 2.6.0-cdh5.4.5
> - Ubuntu 12.04, java 1.7.0_80, hadoop 2.6.0-cdh5.4.5
>            Reporter: Dan Brown
>
> Split out from https://issues.apache.org/jira/browse/SPARK-10685:
> Here's a weird behavior where {{RDD.zip}} after a {{repartition}} produces "misaligned" data, meaning different column values in the same row aren't matched, as if a zip shuffled the collections before zipping them. It's difficult to reproduce because it's nondeterministic, doesn't occur in local mode, and requires ≥2 workers (≥3 in one case). I was able to repro it using pyspark 1.3.0 (cdh5.4.5), 1.4.1 (bin-without-hadoop), and 1.5.0 (bin-without-hadoop).
> Also, this {{DataFrame.zip}} issue is related in spirit, since we were trying to build it ourselves when we ran into this problem. Let me put in my vote for reopening the issue and supporting {{DataFrame.zip}} in the standard lib.
> - https://issues.apache.org/jira/browse/SPARK-7460
> h3. Repro
> Fail: RDD.zip after repartition
> {code}
> df  = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
> df  = df.repartition(100)
> rdd = df.rdd.zip(df.map(lambda r: Row(b=r.a))).map(lambda (x,y): Row(a=x.a, b=y.b))
> [r for r in rdd.collect() if r.a != r.b][:3] # Should be []
> {code}
> Sample outputs (nondeterministic):
> {code}
> []
> [Row(a=50, b=6947), Row(a=150, b=7047), Row(a=250, b=7147)]
> []
> []
> [Row(a=44, b=644), Row(a=144, b=744), Row(a=244, b=844)]
> []
> {code}
> Test setup:
> - local\[8]: {{MASTER=local\[8]}}
> - dist\[N]: 1 driver + 1 master + N workers
> {code}
> "Fail" tests pass?  cluster mode  spark version
> ----------------------------------------------------
> yes                 local[8]      1.3.0-cdh5.4.5
> no                  dist[4]       1.3.0-cdh5.4.5
> yes                 local[8]      1.4.1
> yes                 dist[1]       1.4.1
> no                  dist[2]       1.4.1
> no                  dist[4]       1.4.1
> yes                 local[8]      1.5.0
> yes                 dist[1]       1.5.0
> no                  dist[2]       1.5.0
> no                  dist[4]       1.5.0
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org