You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/06/19 20:13:19 UTC

[GitHub] [spark] peter-toth commented on a change in pull request #23531: [SPARK-24497][SQL] Support recursive SQL query

peter-toth commented on a change in pull request #23531:
URL: https://github.com/apache/spark/pull/23531#discussion_r443025615



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
##########
@@ -253,6 +257,143 @@ case class FilterExec(condition: Expression, child: SparkPlan)
   }
 }
 
+/**
+ * Physical plan node for a recursive relation that encapsulates the physical plan of the anchor
+ * term and the logical plan of the recursive term.
+ *
+ * Anchor is used to initialize the query in the first run.
+ * Recursive term is used to extend the result with new rows, They are logical plans and contain
+ * references to the result of the previous iteration or to the so far accumulated result. These
+ * references are updated with new statistics and data and then compiled to physical plan before
+ * execution.
+ *
+ * The execution terminates once the anchor term or an iteration of the recursive term return no
+ * rows.
+ *
+ * @param cteName the name of the recursive relation
+ * @param anchorTerm this child is used for initializing the query
+ * @param output the attributes of the recursive relation
+ */
+case class RecursiveRelationExec(
+    cteName: String,
+    anchorTerm: SparkPlan,
+    output: Seq[Attribute]) extends SparkPlan {
+  @transient
+  lazy val logicalRecursiveTerm = logicalLink.get.asInstanceOf[RecursiveRelation].recursiveTerm
+
+  override def children: Seq[SparkPlan] = anchorTerm :: Nil
+
+  override def innerChildren: Seq[QueryPlan[_]] = logicalRecursiveTerm +: super.innerChildren
+
+  override def stringArgs: Iterator[Any] = Iterator(cteName, output)
+
+  private val physicalRecursiveTerms = new LinkedBlockingQueue[SparkPlan]
+
+  def recursiveTermIterations: Seq[SparkPlan] =
+    physicalRecursiveTerms.toArray(Array.empty[SparkPlan])
+
+  override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
+
+  /**
+   * Notify the listeners of the physical plan change.
+   */
+  private def onUpdatePlan(executionId: Long): Unit = {
+    val queryExecution = SQLExecution.getQueryExecution(executionId)

Review comment:
       Unfortunately, I didn't have time for this yet, but it is still on my list.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org