You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/05/23 14:18:58 UTC

[GitHub] [spark] peter-toth commented on a change in pull request #23531: [SPARK-24497][SQL] Support recursive SQL query

peter-toth commented on a change in pull request #23531: [SPARK-24497][SQL] Support recursive SQL query
URL: https://github.com/apache/spark/pull/23531#discussion_r286966578
 
 

 ##########
 File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ##########
 @@ -49,6 +50,66 @@ case class Subquery(child: LogicalPlan) extends OrderPreservingUnaryNode {
   override def output: Seq[Attribute] = child.output
 }
 
+/**
+ * This node defines a table that contains one ore more [[RecursiveReference]]s as child nodes
+ * referring to this table. It can be used to define a recursive CTE query and contains an anchor
+ * and a recursive term as children. The result of the anchor and the repeatedly executed recursive
+ * term are combined to form the final result.
+ *
+ * @param name name of the table
+ * @param anchorTerms this child is used for initializing the query
+ * @param recursiveTerms this child is used for extending the set of results with new rows based on
+ *                      the results of the previous iteration (or the anchor in the first iteration)
+ */
+case class RecursiveTable(
+    name: String,
+    anchorTerms: Seq[LogicalPlan],
+    recursiveTerms: Seq[LogicalPlan],
+    limit: Option[Long]) extends LogicalPlan {
+  override def children: Seq[LogicalPlan] = anchorTerms ++ recursiveTerms
+
+  override def output: Seq[Attribute] = anchorTerms.head.output.map(_.withNullability(true))
+
+  override lazy val resolved: Boolean = {
+    val numberOfOutputMatches =
+      children.length > 1 &&
+        childrenResolved &&
+        children.head.output.length > 0 &&
+        children.map(_.output.length).toSet.size == 1
+    if (numberOfOutputMatches) {
+      children.tail.foreach { child =>
+        val outputTypeMatches = child.output.zip(children.head.output).forall {
+          case (l, r) => l.dataType.sameType(r.dataType)
+        }
+        if (!outputTypeMatches) {
+          throw new AnalysisException(s"Recursive table $name term types " +
+            s"${children.head.output.map(_.dataType)} and ${child.output.map(_.dataType)} do " +
+            "not match")
+        }
+      }
+    }
+    numberOfOutputMatches
+  }
+
+  lazy val anchorsResolved = anchorTerms.forall(_.resolved)
+}
+
+/**
+ * This node is a reference to a recursive table in CTE definitions.
+ *
+ * @param name the name of the table it references to
+ * @param output the attributes of the recursive table
+ */
+case class RecursiveReference(name: String, output: Seq[Attribute]) extends LeafNode {
+  override lazy val resolved = output.forall(_.resolved)
+
+  var statistics = Statistics(BigInt(0))
+
+  // Since the size of a recursive reference can grow beyond the limit that a broadcast join can
+  // have on the broadcasted side, spark.sql.defaultSizeInBytes is used for estimating the stats.
 
 Review comment:
   indeed, thanks! :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org