You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:02:25 UTC

[jira] [Updated] (SPARK-21417) Detect transitive join conditions via expressions

     [ https://issues.apache.org/jira/browse/SPARK-21417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon updated SPARK-21417:
---------------------------------
    Labels: bulk-closed  (was: )

> Detect transitive join conditions via expressions
> -------------------------------------------------
>
>                 Key: SPARK-21417
>                 URL: https://issues.apache.org/jira/browse/SPARK-21417
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.2.0
>            Reporter: Claus Stadler
>            Assignee: Anton Okolnychyi
>            Priority: Major
>              Labels: bulk-closed
>
> _Disclaimer: The nature of this report is similar to that of https://issues.apache.org/jira/browse/CALCITE-1887 - yet, as SPARK (to my understanding) uses its own SQL implementation, the requested improvement has to be treated as a separate issue._
> Given table aliases ta, tb column names ca, cb, and an arbitrary (deterministic) expression expr then calcite should be capable to infer join conditions by transitivity:
> {noformat}
> ta.ca = expr AND tb.cb = expr -> ta.ca = tb.cb
> {noformat}
> The use case for us stems from SPARQL to SQL rewriting, where SPARQL queries such as
> {code:java}
> SELECT {
>   dbr:Leipzig a ?type .
>   dbr:Leipzig dbo:mayor ?mayor
> }
> {code}
> result in an SQL query similar to
> {noformat}
> SELECT s.rdf a, s.rdf b WHERE a.s = 'dbr:Leipzig' AND b.s = 'dbr:Leipzig'
> {noformat}
> A consequence of the join condition not being recognized is, that Apache SPARK does not find an executable plan to process the query.
> Self contained example:
> {code:java}
> package my.package;
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types.{StringType, StructField, StructType}
> import org.scalatest._
> class TestSparkSqlJoin extends FlatSpec {
>   "SPARK SQL processor" should "be capable of handling transitive join conditions" in {
>     val spark = SparkSession
>       .builder()
>       .master("local[2]")
>       .appName("Spark SQL parser bug")
>       .getOrCreate()
>     import spark.implicits._
>     // The schema is encoded in a string
>     val schemaString = "s p o"
>     // Generate the schema based on the string of schema
>     val fields = schemaString.split(" ")
>       .map(fieldName => StructField(fieldName, StringType, nullable = true))
>     val schema = StructType(fields)
>     val data = List(("s1", "p1", "o1"))
>     val dataRDD = spark.sparkContext.parallelize(data).map(attributes => Row(attributes._1, attributes._2, attributes._3))
>     val df = spark.createDataFrame(dataRDD, schema).as("TRIPLES")
>     df.createOrReplaceTempView("TRIPLES")
>     println("First Query")
>     spark.sql("SELECT A.s FROM TRIPLES A, TRIPLES B WHERE A.s = B.s AND A.s = 'dbr:Leipzig'").show(10)
>     println("Second Query")
>     spark.sql("SELECT A.s FROM TRIPLES A, TRIPLES B WHERE A.s = 'dbr:Leipzig' AND B.s = 'dbr:Leipzig'").show(10)
>   }
> }
> {code}
> Output (excerpt):
> {noformat}
> First Query
> ...
> +---+
> |  s|
> +---+
> +---+
> Second Query
> - should be capable of handling transitive join conditions *** FAILED ***
>   org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
> Project [s#3]
> +- Filter (isnotnull(s#3) && (s#3 = dbr:Leipzig))
>    +- LogicalRDD [s#3, p#4, o#5]
> and
> Project
> +- Filter (isnotnull(s#20) && (s#20 = dbr:Leipzig))
>    +- LogicalRDD [s#20, p#21, o#22]
> Join condition is missing or trivial.
> Use the CROSS JOIN syntax to allow cartesian products between these relations.;
>   at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$20.applyOrElse(Optimizer.scala:1080)
>   at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$20.applyOrElse(Optimizer.scala:1077)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>   at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>   ...
> Run completed in 6 seconds, 833 milliseconds.
> Total number of tests run: 1
> Suites: completed 1, aborted 0
> Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
> *** 1 TEST FAILED ***
> {noformat}
> Expected:
> A correctly working, executable, query plan for the second query (ideally equivalent to that of the first query)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org