You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Glenn Strycker (JIRA)" <ji...@apache.org> on 2015/09/16 18:08:46 UTC

[jira] [Created] (SPARK-10636) RDD filter does not work after if..then..else RDD blocks

Glenn Strycker created SPARK-10636:
--------------------------------------

             Summary: RDD filter does not work after if..then..else RDD blocks
                 Key: SPARK-10636
                 URL: https://issues.apache.org/jira/browse/SPARK-10636
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
            Reporter: Glenn Strycker


I have an RDD declaration of the following form:

{code}
val myRDD = if (some condition) { tempRDD1.some operations } else { tempRDD2.some operations}.filter(a => a._2._5 <= 50)
{code}

When I output the contents of myRDD, I found entries that clearly had a._2._5 > 50... the filter didn't work!

If I move the filter inside of the if..then blocks, it suddenly does work:

{code}
val myRDD = if (some condition) { tempRDD1.some operations.filter(a => a._2._5 <= 50) } else { tempRDD2.some operations.filter(a => a._2._5 <= 50) }
{code}

I ran toDebugString after both of these code examples, and "filter" does appear in the DAG for the second example, but DOES NOT appear in the first DAG.  Why not?

Am I misusing the if..then..else syntax for Spark/Scala?


Here is my actual code... ignore the crazy naming conventions and what it's doing...

{code}
// this does NOT work

val myRDD = if (tempRDD2.count() > 0) {
   tempRDD1.
     map(a => (a._1._1, (a._1._2, a._2._1, a._2._2))).
     leftOuterJoin(tempRDD2).
     map(a => (a._2._1._1, (a._1, a._2._1._2, a._2._1._3, a._2._2.getOrElse(1L)))).
     leftOuterJoin(tempRDD2).
     map(a => ((a._2._1._1, a._1), (a._2._1._2, a._2._1._3, a._2._1._4, a._2._2.getOrElse(1L)))).
     map(a =>((List(a._1._1, a._1._2).min, List(a._1._1, a._1._2).max), (a._2._1, a._2._2, if (a._1._1 <= a._1._2) { a._2._3 } else { a._2._4 }, if (a._1._1 <= a._1._2) { a._2._4 } else { a._2._3 }, a._2._3 + a._2._4)))
   } else {
     tempRDD1.map(a => (a._1, (a._2._1, a._2._2, 1L, 1L, 2L)))
   }.
   filter(a => a._2._5 <= 50).
   partitionBy(partitioner).
   setName("myRDD").
   persist(StorageLevel.MEMORY_AND_DISK_SER)

myRDD.checkpoint()

println(myRDD.toDebugString)

// (4) MapPartitionsRDD[58] at map at myProgram.scala:2120 []
//  |  MapPartitionsRDD[57] at map at myProgram.scala:2119 []
//  |  MapPartitionsRDD[56] at leftOuterJoin at myProgram.scala:2118 []
//  |  MapPartitionsRDD[55] at leftOuterJoin at myProgram.scala:2118 []
//  |  CoGroupedRDD[54] at leftOuterJoin at myProgram.scala:2118 []
//  +-(4) MapPartitionsRDD[53] at map at myProgram.scala:2117 []
//  |  |  MapPartitionsRDD[52] at leftOuterJoin at myProgram.scala:2116 []
//  |  |  MapPartitionsRDD[51] at leftOuterJoin at myProgram.scala:2116 []
//  |  |  CoGroupedRDD[50] at leftOuterJoin at myProgram.scala:2116 []
//  |  +-(4) MapPartitionsRDD[49] at map at myProgram.scala:2115 []
//  |  |  |  clusterGraphWithComponentsRDD MapPartitionsRDD[28] at reduceByKey at myProgram.scala:1689 []
//  |  |  |      CachedPartitions: 4; MemorySize: 1176.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
//  |  |  |  CheckpointRDD[29] at count at myProgram.scala:1701 []
//  |  +-(4) clusterStatsRDD MapPartitionsRDD[16] at distinct at myProgram.scala:383 []
//  |     |      CachedPartitions: 4; MemorySize: 824.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
//  |     |  CheckpointRDD[17] at count at myProgram.scala:394 []
//  +-(4) clusterStatsRDD MapPartitionsRDD[16] at distinct at myProgram.scala:383 []
//     |      CachedPartitions: 4; MemorySize: 824.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
//     |  CheckpointRDD[17] at count at myProgram.scala:394 []




// this DOES work!

val myRDD = if (tempRDD2.count() > 0) {
   tempRDD1.
     map(a => (a._1._1, (a._1._2, a._2._1, a._2._2))).
     leftOuterJoin(tempRDD2).
     map(a => (a._2._1._1, (a._1, a._2._1._2, a._2._1._3, a._2._2.getOrElse(1L)))).
     leftOuterJoin(tempRDD2).
     map(a => ((a._2._1._1, a._1), (a._2._1._2, a._2._1._3, a._2._1._4, a._2._2.getOrElse(1L)))).
     map(a =>((List(a._1._1, a._1._2).min, List(a._1._1, a._1._2).max), (a._2._1, a._2._2, if (a._1._1 <= a._1._2) { a._2._3 } else { a._2._4 }, if (a._1._1 <= a._1._2) { a._2._4 } else { a._2._3 }, a._2._3 + a._2._4))).
     filter(a => a._2._5 <= 50)
   } else {
     tempRDD1.map(a => (a._1, (a._2._1, a._2._2, 1L, 1L, 2L))).
     filter(a => a._2._5 <= 50)
   }.
   partitionBy(partitioner).
   setName("myRDD").
   persist(StorageLevel.MEMORY_AND_DISK_SER)

myRDD.checkpoint()

println(myRDD.toDebugString)

// (4) MapPartitionsRDD[59] at filter at myProgram.scala:2121 []
//  |  MapPartitionsRDD[58] at map at myProgram.scala:2120 []
//  |  MapPartitionsRDD[57] at map at myProgram.scala:2119 []
//  |  MapPartitionsRDD[56] at leftOuterJoin at myProgram.scala:2118 []
//  |  MapPartitionsRDD[55] at leftOuterJoin at myProgram.scala:2118 []
//  |  CoGroupedRDD[54] at leftOuterJoin at myProgram.scala:2118 []
//  +-(4) MapPartitionsRDD[53] at map at myProgram.scala:2117 []
//  |  |  MapPartitionsRDD[52] at leftOuterJoin at myProgram.scala:2116 []
//  |  |  MapPartitionsRDD[51] at leftOuterJoin at myProgram.scala:2116 []
//  |  |  CoGroupedRDD[50] at leftOuterJoin at myProgram.scala:2116 []
//  |  +-(4) MapPartitionsRDD[49] at map at myProgram.scala:2115 []
//  |  |  |  clusterGraphWithComponentsRDD MapPartitionsRDD[28] at reduceByKey at myProgram.scala:1689 []
//  |  |  |      CachedPartitions: 4; MemorySize: 1176.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
//  |  |  |  CheckpointRDD[29] at count at myProgram.scala:1701 []
//  |  +-(4) clusterStatsRDD MapPartitionsRDD[16] at distinct at myProgram.scala:383 []
//  |     |      CachedPartitions: 4; MemorySize: 824.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
//  |     |  CheckpointRDD[17] at count at myProgram.scala:394 []
//  +-(4) clusterStatsRDD MapPartitionsRDD[16] at distinct at myProgram.scala:383 []
//     |      CachedPartitions: 4; MemorySize: 824.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
//     |  CheckpointRDD[17] at count at myProgram.scala:394 []
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org