You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dan Brown (JIRA)" <ji...@apache.org> on 2015/10/06 01:59:26 UTC
[jira] [Created] (SPARK-10939) Misaligned data with RDD.zip after
repartition
Dan Brown created SPARK-10939:
---------------------------------
Summary: Misaligned data with RDD.zip after repartition
Key: SPARK-10939
URL: https://issues.apache.org/jira/browse/SPARK-10939
Project: Spark
Issue Type: Bug
Affects Versions: 1.5.0, 1.4.1, 1.3.0
Environment: - OSX 10.10.4, java 1.7.0_51, hadoop 2.6.0-cdh5.4.5
- Ubuntu 12.04, java 1.7.0_80, hadoop 2.6.0-cdh5.4.5
Reporter: Dan Brown
Split out from https://issues.apache.org/jira/browse/SPARK-10685:
Here's a weird behavior where {{RDD.zip}} after a {{repartition}} produces "misaligned" data, meaning different column values in the same row aren't matched, as if a zip shuffled the collections before zipping them. It's difficult to reproduce because it's nondeterministic, doesn't occur in local mode, and requires ≥2 workers (≥3 in one case). I was able to repro it using pyspark 1.3.0 (cdh5.4.5), 1.4.1 (bin-without-hadoop), and 1.5.0 (bin-without-hadoop).
Also, this {{DataFrame.zip}} issue is related in spirit, since we were trying to build it ourselves when we ran into this problem. Let me put in my vote for reopening the issue and supporting {{DataFrame.zip}} in the standard lib.
- https://issues.apache.org/jira/browse/SPARK-7460
h3. Repro
Fail: RDD.zip after repartition
{code}
df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
df = df.repartition(100)
rdd = df.rdd.zip(df.map(lambda r: Row(b=r.a))).map(lambda (x,y): Row(a=x.a, b=y.b))
[r for r in rdd.collect() if r.a != r.b][:3] # Should be []
{code}
Sample outputs (nondeterministic):
{code}
[]
[Row(a=50, b=6947), Row(a=150, b=7047), Row(a=250, b=7147)]
[]
[]
[Row(a=44, b=644), Row(a=144, b=744), Row(a=244, b=844)]
[]
{code}
Test setup:
- local\[8]: {{MASTER=local\[8]}}
- dist\[N]: 1 driver + 1 master + N workers
{code}
"Fail" tests pass? cluster mode spark version
----------------------------------------------------
yes local[8] 1.3.0-cdh5.4.5
no dist[4] 1.3.0-cdh5.4.5
yes local[8] 1.4.1
yes dist[1] 1.4.1
no dist[2] 1.4.1
no dist[4] 1.4.1
yes local[8] 1.5.0
yes dist[1] 1.5.0
no dist[2] 1.5.0
no dist[4] 1.5.0
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org