You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:34:21 UTC

[jira] [Resolved] (SPARK-15389) DataFrame filter by isNotNull fails in complex, large case

     [ https://issues.apache.org/jira/browse/SPARK-15389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-15389.
----------------------------------
    Resolution: Incomplete

> DataFrame filter by isNotNull fails in complex, large case
> ----------------------------------------------------------
>
>                 Key: SPARK-15389
>                 URL: https://issues.apache.org/jira/browse/SPARK-15389
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.1
>            Reporter: Joseph K. Bradley
>            Priority: Major
>              Labels: bulk-closed
>         Attachments: final.spark1.6.1.hadoop2.bug
>
>
> h2. Problem description
> I have the following code (part of GraphFrames prototyping) which does the following:
> * Given: DataFrames for vertices (column "id") and edges (columns "src" and "dst")
> * Add index to vertices, from 0
> * Do several joins with edges to change the original src,dst in edges to use the new vertex indices
> * The complex thing is that this handles high-degree vertices (HDV) separately from low-degree vertices.  It uses a broadcast join for the small number of HDVs, and a default one for the others.
> The bug which appears is:
> * At one point, I create a DataFrame "edgesWithHDVmark" which has a column with many null values.
> * I filter the DataFrame to select rows where that column is not null.
> * The resulting DataFrame has the same number of rows; i.e., the filter does nothing.
> The strange thing is that this works on a graph with a small number of vertices & edges (100 - 10000), but it fails on a large graph (100 million).
> I'm listing 3 sections below:
> * data generation
> * code which does the joins
> * output which shows incorrect values
> h2. Data generation
> Below, look at the "Create a star" section in the code.  The setting below should work.  This setting makes the code in the next section fail:
> * numPartitions = 512
> * numStars = 10
> * starSize = 10000000
> {code}
> import org.apache.spark.sql.{Column, DataFrame}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.types._
> def shiftVertexIds(edges: DataFrame, i: Long): DataFrame = {
>   require(Seq(DataTypes.LongType, DataTypes.IntegerType).contains(edges.schema("src").dataType))
>   val newEdges = edges.select((col("src") + i).as("src"),
>     (col("dst") + i).as("dst"))
>   newEdges
> }
> def star(n: Int, numPartitions: Int): DataFrame = {
>   val edges = sqlContext.range(start=1, end=n+1, step=1, numPartitions=numPartitions).toDF("src").select(col("src"), lit(0L).as("dst"))
>   edges.where(col("src").isNotNull && col("dst").isNotNull)
> }
> case class Edge(src: Int, dst: Int)
> /** n: side of grid */
> def grid(n: Int, numPartitions: Int): DataFrame = {
>   val edges = sc.parallelize(n until n * n, numPartitions).flatMap { i =>
>     Seq(
>       Edge(i, i - n),
>       Edge(i, i - 1),
>       Edge(i, i + 1),
>       Edge(i, i + n))
>   }.toDF
>   edges.where(col("src").isNotNull && col("dst").isNotNull)
> }
> def chainOfStars(numStars: Int, starSize: Int, numPartitions: Int): DataFrame = {
>   val edges0 = Range(0, numStars).map { i =>
>     val edges = star(starSize, numPartitions)
>     shiftVertexIds(edges, starSize * i)
>   }.reduce((e0,e1) => e0.unionAll(e1))
>   edges0.repartition(numPartitions)
> }
> def verticesFromEdges(e: DataFrame, numPartitions: Int): DataFrame = {
>   val srcs = e.select(e("src").as("id"))
>   val dsts = e.select(e("dst").as("id"))
>   val v = srcs.unionAll(dsts).distinct
>   v.repartition(numPartitions)
> }
> val dataPath = "/tmp/joseph/graphs"
> // Create a star
> val numPartitions = 16
> val numStars = 3
> val starSize = 10
> val edges = chainOfStars(numStars, starSize, numPartitions)
> val vertices = verticesFromEdges(edges, numPartitions)
> val path = dataPath + s"/star-$numStars-$starSize"
> dbutils.fs.rm(path, recurse=true)
> dbutils.fs.mkdirs(path)
> edges.write.format("parquet").save(path + "/edges")
> vertices.write.format("parquet").save(path + "/vertices")
> {code}
> h2. Code which does the joins
> {code}
> import org.apache.spark.sql.{Column, DataFrame}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.types._
> import org.apache.spark.storage.StorageLevel
> val DST = "dst"
> val ID = "id"
> val SRC = "src"
> val LONG_ID = "long_id"
> val LONG_SRC = "long_src"
> val LONG_DST = "long_dst"
> val ORIG_ID = "ORIG_ID"
> val COMPONENT_ID = "component"
> import scala.collection.mutable
> def zipWithUniqueIdFrom0(df: DataFrame): DataFrame = {
>   val sqlContext = df.sqlContext
>   val schema = df.schema
>   val outputSchema = StructType(Seq(
>     StructField("row", schema, false), StructField("uniq_id", DataTypes.IntegerType, false)))
>   val rdd = df.rdd.zipWithIndex().map { case (row: Row, id: Long) => Row(row, id.toInt) }
>   sqlContext.createDataFrame(rdd, outputSchema)
> }
> def getIndexedEdges(idxV: DataFrame, edges0: DataFrame): DataFrame = {
>   def summary(name: String, df: DataFrame): Unit = {
>     //return
>     println(name)
>     df.printSchema()
>     println(s"${name}.count: ${df.count()}")
>     df.show(20)
>   }
>   val edges = edges0.select(SRC, DST)
>   summary("idxV", idxV)
>   summary("edges", edges)
>   // Separately index high-degree vertices and other vertices.
>   val degrees = edges.select(explode(array(SRC, DST)).as(ID)).groupBy(ID).agg(count("*").cast("int").as("degree"))
>   val LARGE_DEGREE = 2000000
>   val highDegreeVertices = degrees.where(col("degree") > LARGE_DEGREE).select(ID)
>   summary("highDegreeVertices", highDegreeVertices)
>   val idxHDV = idxV.join(highDegreeVertices, ID) // columns LONG_ID, ID
>   summary("idxHDV", idxHDV)
>   // Indexed low-degree vertices 
>   val idxLDV = idxV.join(highDegreeVertices, idxV(ID) === highDegreeVertices(ID), "left_outer")
>     .where(highDegreeVertices(ID).isNull).select(idxV(ID), idxV(LONG_ID))
>   summary("idxLDV", idxLDV)
>   def indexEdgeSide(LONG_SIDE: String, SIDE: String, OTHER: String): DataFrame = {
>     // Use broadcast join to index high-degree vertices and filter HDVs from edges
>     val tmpIdxHDV = broadcast(idxHDV.select(col(LONG_ID).as(LONG_SIDE), col(ID).as(SIDE)))
>     summary("tmpIdxHDV", tmpIdxHDV)
>     val edgesWithHDVmark = edges.join(tmpIdxHDV, edges(SIDE) === tmpIdxHDV(SIDE), "left_outer") // SIDEx2, LONG_SIDE, OTHER
>     summary("edgesWithHDVmark", edgesWithHDVmark)
>     val idxHDVEdges = edgesWithHDVmark.where(tmpIdxHDV(SIDE).isNotNull)
>       .select(edges(SIDE), edges(OTHER), col(LONG_SIDE))
>     summary("idxHDVEdges", idxHDVEdges)
>     val ldvEdges = edgesWithHDVmark.where(tmpIdxHDV(SIDE).isNull)
>       .select(edges(SIDE), edges(OTHER))
>     summary("ldvEdges", ldvEdges)
>     // Use hash join to index low-degree vertices
>     val tmpIdxLDV = idxLDV.select(col(LONG_ID).as(LONG_SIDE), col(ID).as(SIDE))
>     summary("tmpIdxLDV", tmpIdxLDV)
>     val idxLDVEdges = ldvEdges.join(tmpIdxLDV, ldvEdges(SIDE) === tmpIdxLDV(SIDE), "inner").drop(tmpIdxLDV(SIDE))
>     summary("idxLDVEdges", idxLDVEdges)
>     // Join ldv, hdv edges
>     val indexedSideEdges = idxHDVEdges.unionAll(idxLDVEdges)
>     summary("indexedSideEdges", indexedSideEdges)
>     // Sanity check; remove later
>     // assert(indexedSideEdges.where(col(LONG_SIDE).isNull).count() == 0)
>     // println("yay not null")
>     indexedSideEdges
>   }
>   println("COMPUTING indexedSrcEdges")
>   val indexedSrcEdges = indexEdgeSide(LONG_SRC, SRC, DST).select(LONG_SRC, SRC, DST)
>   println("COMPUTING indexedDstEdges")
>   val indexedDstEdges = indexEdgeSide(LONG_DST, DST, SRC).select(LONG_DST, SRC, DST)
>   val indexedEdges = indexedSrcEdges.join(indexedDstEdges,
>     (indexedSrcEdges(SRC) === indexedDstEdges(SRC)) && (indexedSrcEdges(DST) === indexedDstEdges(DST)),
>     "inner")
>     .select(indexedSrcEdges(SRC), indexedSrcEdges(LONG_SRC), indexedSrcEdges(DST), indexedDstEdges(LONG_DST))
>   summary("indexedEdges", indexedEdges)
>   indexedEdges
> }
> def getIndexedGraph(vertices: DataFrame, edges: DataFrame, numVertices: Long): (DataFrame, DataFrame) = {
>   assert(numVertices < Int.MaxValue, s"This impl only works for numVertices < ${Int.MaxValue}, but this was given $numVertices vertices.")
>   val indexedV0: DataFrame = {
>     val indexedVertices = zipWithUniqueIdFrom0(vertices)
>     indexedVertices.select(col("uniq_id").as(LONG_ID), col("row")(ID).as(ID))
>   }
>   // remove self-edges, and index edges
>   val indexedE0: DataFrame = getIndexedEdges(indexedV0, edges.where(col(SRC) !== col(DST)))
>   val indexedVertices: DataFrame = indexedV0
>     .select(col(ID).as(ORIG_ID), col(LONG_ID).as(ID), col(LONG_ID).as(COMPONENT_ID))
>   val indexedEdges: DataFrame = indexedE0
>     .select(col(LONG_SRC).as(SRC), col(LONG_DST).as(DST))
>   (indexedVertices, indexedEdges)
> }
> val internalNumPartitions = 32
> // Adjust the data path for whatever you are testing on
> val dataPath = "/tmp/joseph/graphs" + "/star-10-10000000"
> val edges = sqlContext.read.parquet(dataPath + "/edges")
> val vertices = sqlContext.read.parquet(dataPath + "/vertices")
> (vertices.cache().count(), edges.cache().count())
> // This runs the code and prints out a bunch of stuff which shows the bug.
> val numOrigVertices = vertices.count()
> val (origVertices0: DataFrame, edges0: DataFrame) =
>   getIndexedGraph(vertices, edges, numOrigVertices)
> origVertices0.cache()
> edges0.cache()
> println(s"origVertices0.count: ${origVertices0.count()}")
> println(s"edges0.count: ${edges0.count()}")
> origVertices0.groupBy().agg(min(col("id")), max(col("id")), min(col(ORIG_ID)), max(col(ORIG_ID)), min(col("component")), max(col("component"))).show()
> edges0.groupBy().agg(min(col("src")), max(col("src")), min(col("dst")), max(col("dst"))).show()
> {code}
> h2. Output which shows incorrect values
> I'm attaching a text file with the output.  Search for "HERE IS THE BUG" and "ALSO SHOWS BUG"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org