You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/07/26 07:44:58 UTC

[GitHub] [spark] mgaido91 commented on a change in pull request #23531: [SPARK-24497][SQL] Support recursive SQL query

mgaido91 commented on a change in pull request #23531: [SPARK-24497][SQL] Support recursive SQL query
URL: https://github.com/apache/spark/pull/23531#discussion_r307624090
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ##########
 @@ -228,6 +234,156 @@ case class FilterExec(condition: Expression, child: SparkPlan)
   override def outputPartitioning: Partitioning = child.outputPartitioning
 }
 
+/**
+ * Physical plan node for a recursive table that encapsulates the physical plans of the anchor
+ * terms and the logical plans of the recursive terms and the maximum number of rows to return.
+ *
+ * Anchor terms are physical plans and they are used to initialize the query in the first run.
+ * Recursive terms are used to extend the result with new rows, They are logical plans and contain
+ * references to the result of the previous iteration or to the so far cumulated result. These
+ * references are updated with new statistics and compiled to physical plan and then updated to
+ * reflect the appropriate RDD before execution.
+ *
+ * The execution terminates once the anchor terms or the current iteration of the recursive terms
+ * return no rows or the number of cumulated rows reaches the limit.
+ *
+ * During the execution of a recursive query the previously computed results are reused multiple
+ * times. To avoid massive recomputation of these pieces of the final result, they are cached.
+ *
+ * @param name the name of the recursive table
+ * @param anchorTerms this child is used for initializing the query
+ * @param recursiveTerms this child is used for extending the set of results with new rows based on
+ *                       the results of the previous iteration (or the anchor in the first
+ *                       iteration)
+ * @param limit the maximum number of rows to return
+ */
+case class RecursiveTableExec(
+    name: String,
+    anchorTerms: Seq[SparkPlan],
+    @transient
+    val recursiveTerms: Seq[LogicalPlan],
+    limit: Option[Long]) extends SparkPlan {
+  override def children: Seq[SparkPlan] = anchorTerms
+
+  override def output: Seq[Attribute] = anchorTerms.head.output.map(_.withNullability(true))
+
+  override def simpleString(maxFields: Int): String =
+    s"RecursiveTable $name${limit.map(", " + _).getOrElse("")}"
+
+  override def innerChildren: Seq[QueryPlan[_]] = recursiveTerms ++ super.innerChildren
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val storageLevel = StorageLevel.fromString(conf.getConf(SQLConf.RECURSION_CACHE_STORAGE_LEVEL))
 
 Review comment:
   I remember in the past there were suggestions of using directly the conf when we use it only once by many people to avoid the proliferation of these methods...so that was the reason of my comment.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org