You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Struchkov Lookuut Fedorovich (Jira)" <ji...@apache.org> on 2020/10/03 12:31:00 UTC
[jira] [Created] (SPARK-33059) Bug with self joined tables after
posexploded column
Struchkov Lookuut Fedorovich created SPARK-33059:
----------------------------------------------------
Summary: Bug with self joined tables after posexploded column
Key: SPARK-33059
URL: https://issues.apache.org/jira/browse/SPARK-33059
Project: Spark
Issue Type: Bug
Components: ML, MLlib
Affects Versions: 3.0.1
Environment: OS ubuntu 16.07 LTS
8GB Ram
Intel Core i7
Reporter: Struchkov Lookuut Fedorovich
Test below should pass success, but not.
DataFrames have only one difference, validDF data column type is Array[Int], and in testDF type is Array[Vector].
This bug we catch when use MinHashLSH from Spark ML
{code:java}
// Bug with self joined tables with posexplode columns
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.scalatest.flatspec.AnyFlatSpec
import org.apache.spark.sql.functions.{udf, col, struct, posexplode}
import org.apache.spark.ml.linalg.Vectors
import scala.collection.mutable
case class TestRow(s: String, l: Array[Long], data: Array[Int])
class SparkBug extends AnyFlatSpec{
"Test case" should "equal count" in {
val conf = new SparkConf()
.setMaster("local[8]")
.set("spark.sql.codegen.wholeStage", "true")//its important, if set false, test will pass success
val spark = SparkSession.builder().config(conf).getOrCreate()
val vecUdf = udf(
(s: mutable.WrappedArray[Long]) =>
s.map(v => Vectors.dense(v))
)
val r = new scala.util.Random(1000)
import spark.implicits._
val rowCount = 200000//its important, if row count less than 200000(150000 for example), test will pass success
val df = (0 to rowCount).map{
case _ =>
TestRow(
"",
(0 to 40).map(t => t.toLong).toArray,
(0 to 4)
.map(_ => r.nextInt(100000)).toArray
)
}.toDF()
.coalesce(1)//Important, without coalesce(1) test pass success
val testDF = df
.withColumn("data", vecUdf(col("data")))
.select(
struct(col("*")).as("dd"),
posexplode(col("data"))
)
val validDF = df.select(struct(col("*")).as("dd"), posexplode(col("data")))
//Difference between validDF and testDF, that testDF data col have Array[Vector] type, and valid DF data col have Array[Long]
val testCount = testDF
.join(testDF, Seq("pos", "col"))
.distinct()//Important, without distinct test pass success
.count
val validCount = validDF
.join(validDF, Seq("pos", "col"))
.distinct()
.count
//count should be equal, but not
assert(testCount == validCount)
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org