You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Madhusudanan Kandasamy (JIRA)" <ji...@apache.org> on 2015/06/09 16:53:00 UTC

[jira] [Commented] (SPARK-8048) Explicit partitionning of an RDD with 0 partition will yield empty outer join

    [ https://issues.apache.org/jira/browse/SPARK-8048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14579018#comment-14579018 ] 

Madhusudanan Kandasamy commented on SPARK-8048:
-----------------------------------------------

Whenever RDD action submits a job, the DAG scheduler would split the job into multiple tasks based on number of partition. So using a new partitioner with zero number of partitions is equivalent to dropping all the data in the RDD. So,I think its fair to expect that result of rdd1.leftOuterJoin(rdd2) should match the results when rdd2 is an empty set as detailed in below code snippet.

val baseRDD   = sc.parallelize(1 to 3).map(x=>(x,x))
val emptyrdd1 = sc.parallelize(List[Tuple2[Int,Int]]()).partitionBy(new HashPartitioner(0))
val emptyrdd2 = sc.parallelize(List[Tuple2[Int,Int]]())

println("result 1 = " + baseRDD.leftOuterJoin(emptyrdd1).collect().toList) 
println("result 2 = " + baseRDD.leftOuterJoin(emptyrdd2).collect().toList)

result 1 = List() 
result 2 = List((1,(1,None)), (3,(3,None)), (2,(2,None)))

I've the following questions to the commiter,

1. Is this expectation correct ? or should we consider this as working as designed?
2. If we consider to fix this, 
	1. Can I work on a PR for this issue?
	2. Let me know your comments on the below fix,

If this expectation is correct, then one way to fix the code would be modifying the Partitioner selection code for the co-groupRDDs. As of now, it selects the explicitly defined partitioner as the default partitioner, otherwise it selects the partitioner which has more number of partition. We can fix the code by selecting the explicitly defined partitioner only when number of partitions are > 0.

diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index e53a78e..7052f8e 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -57,7 +57,7 @@ object Partitioner {
   def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
     val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
     for (r <- bySize if r.partitioner.isDefined) {
-      return r.partitioner.get
+      if (r.partitions.size > 0) return r.partitioner.get
     }
     if (rdd.context.conf.contains("spark.default.parallelism")) {
       new HashPartitioner(rdd.context.defaultParallelism)



> Explicit partitionning of an RDD with 0 partition will yield empty outer join
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-8048
>                 URL: https://issues.apache.org/jira/browse/SPARK-8048
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.3.1
>            Reporter: Olivier Toupin
>            Priority: Minor
>
> Check this code =>
> https://gist.github.com/anonymous/0f935915f2bc182841f0
> Because of this => {{.partitionBy(new HashPartitioner(0))}}
> The join will return empty result.
> Here a normal expected behaviour would the join to crash, cause error, or to return unjoined results, but instead will yield an empty RDD.
> This a trivial exemple, but imagine: 
> {{.partitionBy(new HashPartitioner(previous.partitions.length))}}. 
> You join on an empty "previous" rdd, the lookup table is empty, Spark will you lose all your results, instead of returning unjoined results, and this without warnings or errors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org