You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Claus Stadler (JIRA)" <ji...@apache.org> on 2017/07/14 11:40:00 UTC
[jira] [Updated] (SPARK-21417) Detect transitive join conditions
via expressions
[ https://issues.apache.org/jira/browse/SPARK-21417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Claus Stadler updated SPARK-21417:
----------------------------------
Description:
_Disclaimer: The nature of this report is similar to that of https://issues.apache.org/jira/browse/CALCITE-1887 - yet, as SPARK (to my understanding) uses its own SQL implementation, the requested improvement has to be treated as a separate issue._
Given table aliases ta, tb column names ca, cb, and an arbitrary (deterministic) expression expr then calcite should be capable to infer join conditions by transitivity:
{noformat}
ta.ca = expr AND tb.cb = expr -> ta.ca = tb.cb
{noformat}
The use case for us stems from SPARQL to SQL rewriting, where SPARQL queries such as
{code:java}
SELECT {
dbr:Leipzig a ?type .
dbr:Leipzig dbo:mayor ?mayor
}
{code}
result in an SQL query similar to
{noformat}
SELECT s.rdf a, s.rdf b WHERE a.s = 'dbr:Leipzig' AND b.s = 'dbr:Leipzig'
{noformat}
A consequence of the join condition not being recognized is, that Apache SPARK does not find an executable plan to process the query.
Self contained example:
{code:java}
package my.package;
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.scalatest._
class TestSparkSqlJoin extends FlatSpec {
"SPARK SQL processor" should "be capable of handling transitive join conditions" in {
val spark = SparkSession
.builder()
.master("local[2]")
.appName("Spark SQL parser bug")
.getOrCreate()
import spark.implicits._
// The schema is encoded in a string
val schemaString = "s p o"
// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
val data = List(("s1", "p1", "o1"))
val dataRDD = spark.sparkContext.parallelize(data).map(attributes => Row(attributes._1, attributes._2, attributes._3))
val df = spark.createDataFrame(dataRDD, schema).as("TRIPLES")
df.createOrReplaceTempView("TRIPLES")
println("First Query")
spark.sql("SELECT A.s FROM TRIPLES A, TRIPLES B WHERE A.s = B.s AND A.s = 'dbr:Leipzig'").show(10)
println("Second Query")
spark.sql("SELECT A.s FROM TRIPLES A, TRIPLES B WHERE A.s = 'dbr:Leipzig' AND B.s = 'dbr:Leipzig'").show(10)
}
}
{code}
Output (excerpt):
{noformat}
First Query
...
+---+
| s|
+---+
+---+
Second Query
- should be capable of handling transitive join conditions *** FAILED ***
org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
Project [s#3]
+- Filter (isnotnull(s#3) && (s#3 = dbr:Leipzig))
+- LogicalRDD [s#3, p#4, o#5]
and
Project
+- Filter (isnotnull(s#20) && (s#20 = dbr:Leipzig))
+- LogicalRDD [s#20, p#21, o#22]
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$20.applyOrElse(Optimizer.scala:1080)
at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$20.applyOrElse(Optimizer.scala:1077)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
...
Run completed in 6 seconds, 833 milliseconds.
Total number of tests run: 1
Suites: completed 1, aborted 0
Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
*** 1 TEST FAILED ***
{noformat}
Expected:
A correctly working, executable, query plan for the second query (ideally equivalent to that of the first query)
was:
Disclaimer: The nature of this report is similar to that of https://issues.apache.org/jira/browse/CALCITE-1887 - yet, as SPARK (to my understanding) uses its own SQL implementation, the requested improvement has to be treated as a separate issue.
Given table aliases ta, tb column names ca, cb, and an arbitrary (deterministic) expression expr then calcite should be capable to infer join conditions by transitivity:
{noformat}
ta.ca = expr AND tb.cb = expr -> ta.ca = tb.cb
{noformat}
The use case for us stems from SPARQL to SQL rewriting, where SPARQL queries such as
{code:java}
SELECT {
dbr:Leipzig a ?type .
dbr:Leipzig dbo:mayor ?mayor
}
{code}
result in an SQL query similar to
{noformat}
SELECT s.rdf a, s.rdf b WHERE a.s = 'dbr:Leipzig' AND b.s = 'dbr:Leipzig'
{noformat}
A consequence of the join condition not being recognized is, that Apache Flink does not find an executable plan to process the query.
Self contained example:
{code:java}
package my.package;
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.scalatest._
class TestSparkSqlJoin extends FlatSpec {
"SPARK SQL processor" should "be capable of handling transitive join conditions" in {
val spark = SparkSession
.builder()
.master("local[2]")
.appName("Spark SQL parser bug")
.getOrCreate()
import spark.implicits._
// The schema is encoded in a string
val schemaString = "s p o"
// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
val data = List(("s1", "p1", "o1"))
val dataRDD = spark.sparkContext.parallelize(data).map(attributes => Row(attributes._1, attributes._2, attributes._3))
val df = spark.createDataFrame(dataRDD, schema).as("TRIPLES")
df.createOrReplaceTempView("TRIPLES")
println("First Query")
spark.sql("SELECT A.s FROM TRIPLES A, TRIPLES B WHERE A.s = B.s AND A.s = 'dbr:Leipzig'").show(10)
println("Second Query")
spark.sql("SELECT A.s FROM TRIPLES A, TRIPLES B WHERE A.s = 'dbr:Leipzig' AND B.s = 'dbr:Leipzig'").show(10)
}
}
{code}
Output (excerpt):
{noformat}
First Query
...
+---+
| s|
+---+
+---+
Second Query
- should be capable of handling transitive join conditions *** FAILED ***
org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
Project [s#3]
+- Filter (isnotnull(s#3) && (s#3 = dbr:Leipzig))
+- LogicalRDD [s#3, p#4, o#5]
and
Project
+- Filter (isnotnull(s#20) && (s#20 = dbr:Leipzig))
+- LogicalRDD [s#20, p#21, o#22]
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$20.applyOrElse(Optimizer.scala:1080)
at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$20.applyOrElse(Optimizer.scala:1077)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
...
Run completed in 6 seconds, 833 milliseconds.
Total number of tests run: 1
Suites: completed 1, aborted 0
Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
*** 1 TEST FAILED ***
{noformat}
Expected:
A correctly working, executable, query plan for the second query (ideally equivalent to that of the first query)
> Detect transitive join conditions via expressions
> -------------------------------------------------
>
> Key: SPARK-21417
> URL: https://issues.apache.org/jira/browse/SPARK-21417
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 2.2.0
> Reporter: Claus Stadler
>
> _Disclaimer: The nature of this report is similar to that of https://issues.apache.org/jira/browse/CALCITE-1887 - yet, as SPARK (to my understanding) uses its own SQL implementation, the requested improvement has to be treated as a separate issue._
> Given table aliases ta, tb column names ca, cb, and an arbitrary (deterministic) expression expr then calcite should be capable to infer join conditions by transitivity:
> {noformat}
> ta.ca = expr AND tb.cb = expr -> ta.ca = tb.cb
> {noformat}
> The use case for us stems from SPARQL to SQL rewriting, where SPARQL queries such as
> {code:java}
> SELECT {
> dbr:Leipzig a ?type .
> dbr:Leipzig dbo:mayor ?mayor
> }
> {code}
> result in an SQL query similar to
> {noformat}
> SELECT s.rdf a, s.rdf b WHERE a.s = 'dbr:Leipzig' AND b.s = 'dbr:Leipzig'
> {noformat}
> A consequence of the join condition not being recognized is, that Apache SPARK does not find an executable plan to process the query.
> Self contained example:
> {code:java}
> package my.package;
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types.{StringType, StructField, StructType}
> import org.scalatest._
> class TestSparkSqlJoin extends FlatSpec {
> "SPARK SQL processor" should "be capable of handling transitive join conditions" in {
> val spark = SparkSession
> .builder()
> .master("local[2]")
> .appName("Spark SQL parser bug")
> .getOrCreate()
> import spark.implicits._
> // The schema is encoded in a string
> val schemaString = "s p o"
> // Generate the schema based on the string of schema
> val fields = schemaString.split(" ")
> .map(fieldName => StructField(fieldName, StringType, nullable = true))
> val schema = StructType(fields)
> val data = List(("s1", "p1", "o1"))
> val dataRDD = spark.sparkContext.parallelize(data).map(attributes => Row(attributes._1, attributes._2, attributes._3))
> val df = spark.createDataFrame(dataRDD, schema).as("TRIPLES")
> df.createOrReplaceTempView("TRIPLES")
> println("First Query")
> spark.sql("SELECT A.s FROM TRIPLES A, TRIPLES B WHERE A.s = B.s AND A.s = 'dbr:Leipzig'").show(10)
> println("Second Query")
> spark.sql("SELECT A.s FROM TRIPLES A, TRIPLES B WHERE A.s = 'dbr:Leipzig' AND B.s = 'dbr:Leipzig'").show(10)
> }
> }
> {code}
> Output (excerpt):
> {noformat}
> First Query
> ...
> +---+
> | s|
> +---+
> +---+
> Second Query
> - should be capable of handling transitive join conditions *** FAILED ***
> org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
> Project [s#3]
> +- Filter (isnotnull(s#3) && (s#3 = dbr:Leipzig))
> +- LogicalRDD [s#3, p#4, o#5]
> and
> Project
> +- Filter (isnotnull(s#20) && (s#20 = dbr:Leipzig))
> +- LogicalRDD [s#20, p#21, o#22]
> Join condition is missing or trivial.
> Use the CROSS JOIN syntax to allow cartesian products between these relations.;
> at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$20.applyOrElse(Optimizer.scala:1080)
> at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$20.applyOrElse(Optimizer.scala:1077)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
> at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
> at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
> ...
> Run completed in 6 seconds, 833 milliseconds.
> Total number of tests run: 1
> Suites: completed 1, aborted 0
> Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
> *** 1 TEST FAILED ***
> {noformat}
> Expected:
> A correctly working, executable, query plan for the second query (ideally equivalent to that of the first query)
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org