You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Xiao Li (JIRA)" <ji...@apache.org> on 2017/11/30 22:49:00 UTC
[jira] [Assigned] (SPARK-21417) Detect transitive join conditions
via expressions
[ https://issues.apache.org/jira/browse/SPARK-21417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xiao Li reassigned SPARK-21417:
-------------------------------
Assignee: Anton Okolnychyi
> Detect transitive join conditions via expressions
> -------------------------------------------------
>
> Key: SPARK-21417
> URL: https://issues.apache.org/jira/browse/SPARK-21417
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 2.2.0
> Reporter: Claus Stadler
> Assignee: Anton Okolnychyi
> Fix For: 2.3.0
>
>
> _Disclaimer: The nature of this report is similar to that of https://issues.apache.org/jira/browse/CALCITE-1887 - yet, as SPARK (to my understanding) uses its own SQL implementation, the requested improvement has to be treated as a separate issue._
> Given table aliases ta, tb column names ca, cb, and an arbitrary (deterministic) expression expr then calcite should be capable to infer join conditions by transitivity:
> {noformat}
> ta.ca = expr AND tb.cb = expr -> ta.ca = tb.cb
> {noformat}
> The use case for us stems from SPARQL to SQL rewriting, where SPARQL queries such as
> {code:java}
> SELECT {
> dbr:Leipzig a ?type .
> dbr:Leipzig dbo:mayor ?mayor
> }
> {code}
> result in an SQL query similar to
> {noformat}
> SELECT s.rdf a, s.rdf b WHERE a.s = 'dbr:Leipzig' AND b.s = 'dbr:Leipzig'
> {noformat}
> A consequence of the join condition not being recognized is, that Apache SPARK does not find an executable plan to process the query.
> Self contained example:
> {code:java}
> package my.package;
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types.{StringType, StructField, StructType}
> import org.scalatest._
> class TestSparkSqlJoin extends FlatSpec {
> "SPARK SQL processor" should "be capable of handling transitive join conditions" in {
> val spark = SparkSession
> .builder()
> .master("local[2]")
> .appName("Spark SQL parser bug")
> .getOrCreate()
> import spark.implicits._
> // The schema is encoded in a string
> val schemaString = "s p o"
> // Generate the schema based on the string of schema
> val fields = schemaString.split(" ")
> .map(fieldName => StructField(fieldName, StringType, nullable = true))
> val schema = StructType(fields)
> val data = List(("s1", "p1", "o1"))
> val dataRDD = spark.sparkContext.parallelize(data).map(attributes => Row(attributes._1, attributes._2, attributes._3))
> val df = spark.createDataFrame(dataRDD, schema).as("TRIPLES")
> df.createOrReplaceTempView("TRIPLES")
> println("First Query")
> spark.sql("SELECT A.s FROM TRIPLES A, TRIPLES B WHERE A.s = B.s AND A.s = 'dbr:Leipzig'").show(10)
> println("Second Query")
> spark.sql("SELECT A.s FROM TRIPLES A, TRIPLES B WHERE A.s = 'dbr:Leipzig' AND B.s = 'dbr:Leipzig'").show(10)
> }
> }
> {code}
> Output (excerpt):
> {noformat}
> First Query
> ...
> +---+
> | s|
> +---+
> +---+
> Second Query
> - should be capable of handling transitive join conditions *** FAILED ***
> org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
> Project [s#3]
> +- Filter (isnotnull(s#3) && (s#3 = dbr:Leipzig))
> +- LogicalRDD [s#3, p#4, o#5]
> and
> Project
> +- Filter (isnotnull(s#20) && (s#20 = dbr:Leipzig))
> +- LogicalRDD [s#20, p#21, o#22]
> Join condition is missing or trivial.
> Use the CROSS JOIN syntax to allow cartesian products between these relations.;
> at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$20.applyOrElse(Optimizer.scala:1080)
> at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$20.applyOrElse(Optimizer.scala:1077)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
> at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
> at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
> ...
> Run completed in 6 seconds, 833 milliseconds.
> Total number of tests run: 1
> Suites: completed 1, aborted 0
> Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
> *** 1 TEST FAILED ***
> {noformat}
> Expected:
> A correctly working, executable, query plan for the second query (ideally equivalent to that of the first query)
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org