You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:24:13 UTC
[jira] [Updated] (SPARK-15389) DataFrame filter by isNotNull fails
in complex, large case
[ https://issues.apache.org/jira/browse/SPARK-15389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon updated SPARK-15389:
---------------------------------
Labels: bulk-closed (was: )
> DataFrame filter by isNotNull fails in complex, large case
> ----------------------------------------------------------
>
> Key: SPARK-15389
> URL: https://issues.apache.org/jira/browse/SPARK-15389
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.6.1
> Reporter: Joseph K. Bradley
> Priority: Major
> Labels: bulk-closed
> Attachments: final.spark1.6.1.hadoop2.bug
>
>
> h2. Problem description
> I have the following code (part of GraphFrames prototyping) which does the following:
> * Given: DataFrames for vertices (column "id") and edges (columns "src" and "dst")
> * Add index to vertices, from 0
> * Do several joins with edges to change the original src,dst in edges to use the new vertex indices
> * The complex thing is that this handles high-degree vertices (HDV) separately from low-degree vertices. It uses a broadcast join for the small number of HDVs, and a default one for the others.
> The bug which appears is:
> * At one point, I create a DataFrame "edgesWithHDVmark" which has a column with many null values.
> * I filter the DataFrame to select rows where that column is not null.
> * The resulting DataFrame has the same number of rows; i.e., the filter does nothing.
> The strange thing is that this works on a graph with a small number of vertices & edges (100 - 10000), but it fails on a large graph (100 million).
> I'm listing 3 sections below:
> * data generation
> * code which does the joins
> * output which shows incorrect values
> h2. Data generation
> Below, look at the "Create a star" section in the code. The setting below should work. This setting makes the code in the next section fail:
> * numPartitions = 512
> * numStars = 10
> * starSize = 10000000
> {code}
> import org.apache.spark.sql.{Column, DataFrame}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.types._
> def shiftVertexIds(edges: DataFrame, i: Long): DataFrame = {
> require(Seq(DataTypes.LongType, DataTypes.IntegerType).contains(edges.schema("src").dataType))
> val newEdges = edges.select((col("src") + i).as("src"),
> (col("dst") + i).as("dst"))
> newEdges
> }
> def star(n: Int, numPartitions: Int): DataFrame = {
> val edges = sqlContext.range(start=1, end=n+1, step=1, numPartitions=numPartitions).toDF("src").select(col("src"), lit(0L).as("dst"))
> edges.where(col("src").isNotNull && col("dst").isNotNull)
> }
> case class Edge(src: Int, dst: Int)
> /** n: side of grid */
> def grid(n: Int, numPartitions: Int): DataFrame = {
> val edges = sc.parallelize(n until n * n, numPartitions).flatMap { i =>
> Seq(
> Edge(i, i - n),
> Edge(i, i - 1),
> Edge(i, i + 1),
> Edge(i, i + n))
> }.toDF
> edges.where(col("src").isNotNull && col("dst").isNotNull)
> }
> def chainOfStars(numStars: Int, starSize: Int, numPartitions: Int): DataFrame = {
> val edges0 = Range(0, numStars).map { i =>
> val edges = star(starSize, numPartitions)
> shiftVertexIds(edges, starSize * i)
> }.reduce((e0,e1) => e0.unionAll(e1))
> edges0.repartition(numPartitions)
> }
> def verticesFromEdges(e: DataFrame, numPartitions: Int): DataFrame = {
> val srcs = e.select(e("src").as("id"))
> val dsts = e.select(e("dst").as("id"))
> val v = srcs.unionAll(dsts).distinct
> v.repartition(numPartitions)
> }
> val dataPath = "/tmp/joseph/graphs"
> // Create a star
> val numPartitions = 16
> val numStars = 3
> val starSize = 10
> val edges = chainOfStars(numStars, starSize, numPartitions)
> val vertices = verticesFromEdges(edges, numPartitions)
> val path = dataPath + s"/star-$numStars-$starSize"
> dbutils.fs.rm(path, recurse=true)
> dbutils.fs.mkdirs(path)
> edges.write.format("parquet").save(path + "/edges")
> vertices.write.format("parquet").save(path + "/vertices")
> {code}
> h2. Code which does the joins
> {code}
> import org.apache.spark.sql.{Column, DataFrame}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.types._
> import org.apache.spark.storage.StorageLevel
> val DST = "dst"
> val ID = "id"
> val SRC = "src"
> val LONG_ID = "long_id"
> val LONG_SRC = "long_src"
> val LONG_DST = "long_dst"
> val ORIG_ID = "ORIG_ID"
> val COMPONENT_ID = "component"
> import scala.collection.mutable
> def zipWithUniqueIdFrom0(df: DataFrame): DataFrame = {
> val sqlContext = df.sqlContext
> val schema = df.schema
> val outputSchema = StructType(Seq(
> StructField("row", schema, false), StructField("uniq_id", DataTypes.IntegerType, false)))
> val rdd = df.rdd.zipWithIndex().map { case (row: Row, id: Long) => Row(row, id.toInt) }
> sqlContext.createDataFrame(rdd, outputSchema)
> }
> def getIndexedEdges(idxV: DataFrame, edges0: DataFrame): DataFrame = {
> def summary(name: String, df: DataFrame): Unit = {
> //return
> println(name)
> df.printSchema()
> println(s"${name}.count: ${df.count()}")
> df.show(20)
> }
> val edges = edges0.select(SRC, DST)
> summary("idxV", idxV)
> summary("edges", edges)
> // Separately index high-degree vertices and other vertices.
> val degrees = edges.select(explode(array(SRC, DST)).as(ID)).groupBy(ID).agg(count("*").cast("int").as("degree"))
> val LARGE_DEGREE = 2000000
> val highDegreeVertices = degrees.where(col("degree") > LARGE_DEGREE).select(ID)
> summary("highDegreeVertices", highDegreeVertices)
> val idxHDV = idxV.join(highDegreeVertices, ID) // columns LONG_ID, ID
> summary("idxHDV", idxHDV)
> // Indexed low-degree vertices
> val idxLDV = idxV.join(highDegreeVertices, idxV(ID) === highDegreeVertices(ID), "left_outer")
> .where(highDegreeVertices(ID).isNull).select(idxV(ID), idxV(LONG_ID))
> summary("idxLDV", idxLDV)
> def indexEdgeSide(LONG_SIDE: String, SIDE: String, OTHER: String): DataFrame = {
> // Use broadcast join to index high-degree vertices and filter HDVs from edges
> val tmpIdxHDV = broadcast(idxHDV.select(col(LONG_ID).as(LONG_SIDE), col(ID).as(SIDE)))
> summary("tmpIdxHDV", tmpIdxHDV)
> val edgesWithHDVmark = edges.join(tmpIdxHDV, edges(SIDE) === tmpIdxHDV(SIDE), "left_outer") // SIDEx2, LONG_SIDE, OTHER
> summary("edgesWithHDVmark", edgesWithHDVmark)
> val idxHDVEdges = edgesWithHDVmark.where(tmpIdxHDV(SIDE).isNotNull)
> .select(edges(SIDE), edges(OTHER), col(LONG_SIDE))
> summary("idxHDVEdges", idxHDVEdges)
> val ldvEdges = edgesWithHDVmark.where(tmpIdxHDV(SIDE).isNull)
> .select(edges(SIDE), edges(OTHER))
> summary("ldvEdges", ldvEdges)
> // Use hash join to index low-degree vertices
> val tmpIdxLDV = idxLDV.select(col(LONG_ID).as(LONG_SIDE), col(ID).as(SIDE))
> summary("tmpIdxLDV", tmpIdxLDV)
> val idxLDVEdges = ldvEdges.join(tmpIdxLDV, ldvEdges(SIDE) === tmpIdxLDV(SIDE), "inner").drop(tmpIdxLDV(SIDE))
> summary("idxLDVEdges", idxLDVEdges)
> // Join ldv, hdv edges
> val indexedSideEdges = idxHDVEdges.unionAll(idxLDVEdges)
> summary("indexedSideEdges", indexedSideEdges)
> // Sanity check; remove later
> // assert(indexedSideEdges.where(col(LONG_SIDE).isNull).count() == 0)
> // println("yay not null")
> indexedSideEdges
> }
> println("COMPUTING indexedSrcEdges")
> val indexedSrcEdges = indexEdgeSide(LONG_SRC, SRC, DST).select(LONG_SRC, SRC, DST)
> println("COMPUTING indexedDstEdges")
> val indexedDstEdges = indexEdgeSide(LONG_DST, DST, SRC).select(LONG_DST, SRC, DST)
> val indexedEdges = indexedSrcEdges.join(indexedDstEdges,
> (indexedSrcEdges(SRC) === indexedDstEdges(SRC)) && (indexedSrcEdges(DST) === indexedDstEdges(DST)),
> "inner")
> .select(indexedSrcEdges(SRC), indexedSrcEdges(LONG_SRC), indexedSrcEdges(DST), indexedDstEdges(LONG_DST))
> summary("indexedEdges", indexedEdges)
> indexedEdges
> }
> def getIndexedGraph(vertices: DataFrame, edges: DataFrame, numVertices: Long): (DataFrame, DataFrame) = {
> assert(numVertices < Int.MaxValue, s"This impl only works for numVertices < ${Int.MaxValue}, but this was given $numVertices vertices.")
> val indexedV0: DataFrame = {
> val indexedVertices = zipWithUniqueIdFrom0(vertices)
> indexedVertices.select(col("uniq_id").as(LONG_ID), col("row")(ID).as(ID))
> }
> // remove self-edges, and index edges
> val indexedE0: DataFrame = getIndexedEdges(indexedV0, edges.where(col(SRC) !== col(DST)))
> val indexedVertices: DataFrame = indexedV0
> .select(col(ID).as(ORIG_ID), col(LONG_ID).as(ID), col(LONG_ID).as(COMPONENT_ID))
> val indexedEdges: DataFrame = indexedE0
> .select(col(LONG_SRC).as(SRC), col(LONG_DST).as(DST))
> (indexedVertices, indexedEdges)
> }
> val internalNumPartitions = 32
> // Adjust the data path for whatever you are testing on
> val dataPath = "/tmp/joseph/graphs" + "/star-10-10000000"
> val edges = sqlContext.read.parquet(dataPath + "/edges")
> val vertices = sqlContext.read.parquet(dataPath + "/vertices")
> (vertices.cache().count(), edges.cache().count())
> // This runs the code and prints out a bunch of stuff which shows the bug.
> val numOrigVertices = vertices.count()
> val (origVertices0: DataFrame, edges0: DataFrame) =
> getIndexedGraph(vertices, edges, numOrigVertices)
> origVertices0.cache()
> edges0.cache()
> println(s"origVertices0.count: ${origVertices0.count()}")
> println(s"edges0.count: ${edges0.count()}")
> origVertices0.groupBy().agg(min(col("id")), max(col("id")), min(col(ORIG_ID)), max(col(ORIG_ID)), min(col("component")), max(col("component"))).show()
> edges0.groupBy().agg(min(col("src")), max(col("src")), min(col("dst")), max(col("dst"))).show()
> {code}
> h2. Output which shows incorrect values
> I'm attaching a text file with the output. Search for "HERE IS THE BUG" and "ALSO SHOWS BUG"
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org