You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Struchkov Lookuut Fedorovich (Jira)" <ji...@apache.org> on 2020/10/03 12:31:00 UTC

[jira] [Created] (SPARK-33059) Bug with self joined tables after posexploded column

Struchkov Lookuut Fedorovich created SPARK-33059:
----------------------------------------------------

             Summary: Bug with self joined tables after posexploded column
                 Key: SPARK-33059
                 URL: https://issues.apache.org/jira/browse/SPARK-33059
             Project: Spark
          Issue Type: Bug
          Components: ML, MLlib
    Affects Versions: 3.0.1
         Environment: OS ubuntu 16.07 LTS

8GB Ram

Intel Core i7
            Reporter: Struchkov Lookuut Fedorovich


 

Test below should pass success, but not. 

DataFrames have only one difference, validDF data column type is Array[Int], and in testDF type is Array[Vector].

This bug we catch when use MinHashLSH from Spark ML
{code:java}
// Bug with self joined tables with posexplode columns
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.scalatest.flatspec.AnyFlatSpec
import org.apache.spark.sql.functions.{udf, col, struct, posexplode}
import org.apache.spark.ml.linalg.Vectors

import scala.collection.mutable


case class TestRow(s: String, l: Array[Long], data: Array[Int])

class SparkBug extends AnyFlatSpec{

  "Test case" should "equal count" in {
    val conf = new SparkConf()
      .setMaster("local[8]")
      .set("spark.sql.codegen.wholeStage", "true")//its important, if set false, test will pass success

    val spark = SparkSession.builder().config(conf).getOrCreate()

    val vecUdf = udf(
      (s: mutable.WrappedArray[Long]) =>
        s.map(v => Vectors.dense(v))
    )

    val r = new scala.util.Random(1000)

    import spark.implicits._

    val rowCount = 200000//its important, if row count less than 200000(150000 for example), test will pass success

    val df = (0 to rowCount).map{
      case _ =>
        TestRow(
          "",
          (0 to 40).map(t => t.toLong).toArray,
          (0 to 4)
            .map(_ => r.nextInt(100000)).toArray
        )
    }.toDF()
      .coalesce(1)//Important, without coalesce(1) test pass success

    val testDF = df
      .withColumn("data", vecUdf(col("data")))
      .select(
        struct(col("*")).as("dd"),
        posexplode(col("data"))
      )

    val validDF = df.select(struct(col("*")).as("dd"), posexplode(col("data")))
    //Difference between validDF and testDF, that testDF data col have Array[Vector] type, and valid DF data col have Array[Long]

    val testCount = testDF
      .join(testDF, Seq("pos", "col"))
      .distinct()//Important, without distinct test pass success
      .count

    val validCount = validDF
      .join(validDF, Seq("pos", "col"))
      .distinct()
      .count

    //count should be equal, but not
    assert(testCount == validCount)
  }
}

{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org