You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/05/09 02:15:14 UTC

spark git commit: [SPARK-7469] [SQL] DAG visualization: show SQL query operators

Repository: spark
Updated Branches:
  refs/heads/master ffdc40ce7 -> bd61f0703


[SPARK-7469] [SQL] DAG visualization: show SQL query operators

The DAG visualization currently displays only low-level Spark primitives (e.g. `map`, `reduceByKey`, `filter` etc.). For SQL, these aren't particularly useful. Instead, we should display higher level physical operators (e.g. `Filter`, `Exchange`, `ShuffleHashJoin`). cc marmbrus

-----------------
**Before**
<img src="https://issues.apache.org/jira/secure/attachment/12731586/before.png" width="600px"/>
-----------------
**After** (Pay attention to the words)
<img src="https://issues.apache.org/jira/secure/attachment/12731587/after.png" width="600px"/>
-----------------

Author: Andrew Or <an...@databricks.com>

Closes #5999 from andrewor14/dag-viz-sql and squashes the following commits:

0db23a4 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-sql
1e211db [Andrew Or] Update comment
0d49fd6 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-sql
ffd237a [Andrew Or] Fix style
202dac1 [Andrew Or] Make ignoreParent false by default
e61b1ab [Andrew Or] Visualize SQL operators, not low-level Spark primitives
569034a [Andrew Or] Add a flag to ignore parent settings and scopes


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd61f070
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd61f070
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd61f070

Branch: refs/heads/master
Commit: bd61f07039064833108070e19b752d4c46045766
Parents: ffdc40c
Author: Andrew Or <an...@databricks.com>
Authored: Fri May 8 17:15:10 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Fri May 8 17:15:10 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/rdd/RDDOperationScope.scala    | 20 ++++++++++-----
 .../columnar/InMemoryColumnarTableScan.scala    |  2 +-
 .../apache/spark/sql/execution/Aggregate.scala  |  2 +-
 .../apache/spark/sql/execution/Exchange.scala   |  2 +-
 .../spark/sql/execution/ExistingRDD.scala       |  2 +-
 .../org/apache/spark/sql/execution/Expand.scala |  2 +-
 .../apache/spark/sql/execution/Generate.scala   |  2 +-
 .../sql/execution/GeneratedAggregate.scala      |  2 +-
 .../spark/sql/execution/LocalTableScan.scala    |  2 +-
 .../apache/spark/sql/execution/SparkPlan.scala  | 19 +++++++++++---
 .../org/apache/spark/sql/execution/Window.scala |  2 +-
 .../spark/sql/execution/basicOperators.scala    | 26 ++++++++++----------
 .../apache/spark/sql/execution/commands.scala   |  2 +-
 .../spark/sql/execution/debug/package.scala     |  4 +--
 .../sql/execution/joins/BroadcastHashJoin.scala |  2 +-
 .../joins/BroadcastLeftSemiJoinHash.scala       |  2 +-
 .../joins/BroadcastNestedLoopJoin.scala         |  2 +-
 .../sql/execution/joins/CartesianProduct.scala  |  2 +-
 .../sql/execution/joins/HashOuterJoin.scala     |  2 +-
 .../sql/execution/joins/LeftSemiJoinBNL.scala   |  2 +-
 .../sql/execution/joins/LeftSemiJoinHash.scala  |  2 +-
 .../sql/execution/joins/ShuffledHashJoin.scala  |  2 +-
 .../sql/execution/joins/SortMergeJoin.scala     |  2 +-
 .../apache/spark/sql/execution/pythonUdfs.scala |  2 +-
 .../sql/parquet/ParquetTableOperations.scala    |  4 +--
 .../sql/hive/execution/HiveTableScan.scala      |  2 +-
 .../hive/execution/InsertIntoHiveTable.scala    |  4 ++-
 .../hive/execution/ScriptTransformation.scala   |  2 +-
 28 files changed, 71 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bd61f070/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala b/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala
index 9440d45..93ec606 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala
@@ -102,16 +102,21 @@ private[spark] object RDDOperationScope {
   /**
    * Execute the given body such that all RDDs created in this body will have the same scope.
    *
-   * If nesting is allowed, this concatenates the previous scope with the new one in a way that
-   * signifies the hierarchy. Otherwise, if nesting is not allowed, then any children calls to
-   * this method executed in the body will have no effect.
+   * If nesting is allowed, any subsequent calls to this method in the given body will instantiate
+   * child scopes that are nested within our scope. Otherwise, these calls will take no effect.
+   *
+   * Additionally, the caller of this method may optionally ignore the configurations and scopes
+   * set by the higher level caller. In this case, this method will ignore the parent caller's
+   * intention to disallow nesting, and the new scope instantiated will not have a parent. This
+   * is useful for scoping physical operations in Spark SQL, for instance.
    *
    * Note: Return statements are NOT allowed in body.
    */
   private[spark] def withScope[T](
       sc: SparkContext,
       name: String,
-      allowNesting: Boolean)(body: => T): T = {
+      allowNesting: Boolean,
+      ignoreParent: Boolean = false)(body: => T): T = {
     // Save the old scope to restore it later
     val scopeKey = SparkContext.RDD_SCOPE_KEY
     val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
@@ -119,8 +124,11 @@ private[spark] object RDDOperationScope {
     val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson)
     val oldNoOverride = sc.getLocalProperty(noOverrideKey)
     try {
-      // Set the scope only if the higher level caller allows us to do so
-      if (sc.getLocalProperty(noOverrideKey) == null) {
+      if (ignoreParent) {
+        // Ignore all parent settings and scopes and start afresh with our own root scope
+        sc.setLocalProperty(scopeKey, new RDDOperationScope(name).toJson)
+      } else if (sc.getLocalProperty(noOverrideKey) == null) {
+        // Otherwise, set the scope only if the higher level caller allows us to do so
         sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson)
       }
       // Optionally disallow the child body to override our scope

http://git-wip-us.apache.org/repos/asf/spark/blob/bd61f070/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index d9b6fb4..0ded1cc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -267,7 +267,7 @@ private[sql] case class InMemoryColumnarTableScan(
 
   private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning
 
-  override def execute(): RDD[Row] = {
+  protected override def doExecute(): RDD[Row] = {
     if (enableAccumulators) {
       readPartitions.setValue(0)
       readBatches.setValue(0)

http://git-wip-us.apache.org/repos/asf/spark/blob/bd61f070/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
index 18b1ba4..8d16749 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
@@ -121,7 +121,7 @@ case class Aggregate(
     }
   }
 
-  override def execute(): RDD[Row] = attachTree(this, "execute") {
+  protected override def doExecute(): RDD[Row] = attachTree(this, "execute") {
     if (groupingExpressions.isEmpty) {
       child.execute().mapPartitions { iter =>
         val buffer = newAggregateBuffer()

http://git-wip-us.apache.org/repos/asf/spark/blob/bd61f070/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index f0d54cd..f02fa81 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -109,7 +109,7 @@ case class Exchange(
     serializer
   }
 
-  override def execute(): RDD[Row] = attachTree(this , "execute") {
+  protected override def doExecute(): RDD[Row] = attachTree(this , "execute") {
     newPartitioning match {
       case HashPartitioning(expressions, numPartitions) =>
         // TODO: Eliminate redundant expressions in grouping key and value.

http://git-wip-us.apache.org/repos/asf/spark/blob/bd61f070/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 57effbf..a500269 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -106,7 +106,7 @@ private[sql] case class LogicalRDD(output: Seq[Attribute], rdd: RDD[Row])(sqlCon
 
 /** Physical plan node for scanning data from an RDD. */
 private[sql] case class PhysicalRDD(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
-  override def execute(): RDD[Row] = rdd
+  protected override def doExecute(): RDD[Row] = rdd
 }
 
 /** Logical plan node for scanning data from a local collection. */

http://git-wip-us.apache.org/repos/asf/spark/blob/bd61f070/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
index 5758494..f16ca36 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
@@ -43,7 +43,7 @@ case class Expand(
   // as UNKNOWN partitioning
   override def outputPartitioning: Partitioning = UnknownPartitioning(0)
 
-  override def execute(): RDD[Row] = attachTree(this, "execute") {
+  protected override def doExecute(): RDD[Row] = attachTree(this, "execute") {
     child.execute().mapPartitions { iter =>
       // TODO Move out projection objects creation and transfer to
       // workers via closure. However we can't assume the Projection

http://git-wip-us.apache.org/repos/asf/spark/blob/bd61f070/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
index 5201e20..08d9079 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
@@ -46,7 +46,7 @@ case class Generate(
 
   val boundGenerator = BindReferences.bindReference(generator, child.output)
 
-  override def execute(): RDD[Row] = {
+  protected override def doExecute(): RDD[Row] = {
     if (join) {
       child.execute().mapPartitions { iter =>
         val nullValues = Seq.fill(generator.elementTypes.size)(Literal(null))

http://git-wip-us.apache.org/repos/asf/spark/blob/bd61f070/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
index 5d9f202..2ec7d4f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
@@ -66,7 +66,7 @@ case class GeneratedAggregate(
 
   override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
 
-  override def execute(): RDD[Row] = {
+  protected override def doExecute(): RDD[Row] = {
     val aggregatesToCompute = aggregateExpressions.flatMap { a =>
       a.collect { case agg: AggregateExpression => agg}
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/bd61f070/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
index ace9af5..03bee80 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
@@ -30,7 +30,7 @@ private[sql] case class LocalTableScan(output: Seq[Attribute], rows: Seq[Row]) e
 
   private lazy val rdd = sqlContext.sparkContext.parallelize(rows)
 
-  override def execute(): RDD[Row] = rdd
+  protected override def doExecute(): RDD[Row] = rdd
 
 
   override def executeCollect(): Array[Row] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/bd61f070/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 59c8980..435ac01 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.Logging
-import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.{RDD, RDDOperationScope}
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, trees}
 import org.apache.spark.sql.catalyst.expressions._
@@ -79,14 +79,25 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
   def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil)
 
   /**
-   * Runs this query returning the result as an RDD.
+   * Returns the result of this query as an RDD[Row] by delegating to doExecute
+   * after adding query plan information to created RDDs for visualization.
+   * Concrete implementations of SparkPlan should override doExecute instead.
    */
-  def execute(): RDD[Row]
+  final def execute(): RDD[Row] = {
+    RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
+      doExecute()
+    }
+  }
 
   /**
-   * Runs this query returning the result as an array.
+   * Overridden by concrete implementations of SparkPlan.
+   * Produces the result of the query as an RDD[Row]
    */
+  protected def doExecute(): RDD[Row]
 
+  /**
+   * Runs this query returning the result as an array.
+   */
   def executeCollect(): Array[Row] = {
     execute().mapPartitions { iter =>
       val converter = CatalystTypeConverters.createToScalaConverter(schema)

http://git-wip-us.apache.org/repos/asf/spark/blob/bd61f070/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
index 217b559..c4327ce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -112,7 +112,7 @@ case class Window(
     }
   }
 
-  def execute(): RDD[Row] = {
+  protected override def doExecute(): RDD[Row] = {
     child.execute().mapPartitions { iter =>
       new Iterator[Row] {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bd61f070/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 5ca11e6..6cb67b4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -37,7 +37,7 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends
 
   @transient lazy val buildProjection = newMutableProjection(projectList, child.output)
 
-  override def execute(): RDD[Row] = child.execute().mapPartitions { iter =>
+  protected override def doExecute(): RDD[Row] = child.execute().mapPartitions { iter =>
     val resuableProjection = buildProjection()
     iter.map(resuableProjection)
   }
@@ -54,7 +54,7 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {
 
   @transient lazy val conditionEvaluator: (Row) => Boolean = newPredicate(condition, child.output)
 
-  override def execute(): RDD[Row] = child.execute().mapPartitions { iter =>
+  protected override def doExecute(): RDD[Row] = child.execute().mapPartitions { iter =>
     iter.filter(conditionEvaluator)
   }
 
@@ -83,7 +83,7 @@ case class Sample(
   override def output: Seq[Attribute] = child.output
 
   // TODO: How to pick seed?
-  override def execute(): RDD[Row] = {
+  protected override def doExecute(): RDD[Row] = {
     if (withReplacement) {
       child.execute().map(_.copy()).sample(withReplacement, upperBound - lowerBound, seed)
     } else {
@@ -99,7 +99,7 @@ case class Sample(
 case class Union(children: Seq[SparkPlan]) extends SparkPlan {
   // TODO: attributes output by union should be distinct for nullability purposes
   override def output: Seq[Attribute] = children.head.output
-  override def execute(): RDD[Row] = sparkContext.union(children.map(_.execute()))
+  protected override def doExecute(): RDD[Row] = sparkContext.union(children.map(_.execute()))
 }
 
 /**
@@ -124,7 +124,7 @@ case class Limit(limit: Int, child: SparkPlan)
 
   override def executeCollect(): Array[Row] = child.executeTake(limit)
 
-  override def execute(): RDD[Row] = {
+  protected override def doExecute(): RDD[Row] = {
     val rdd: RDD[_ <: Product2[Boolean, Row]] = if (sortBasedShuffleOn) {
       child.execute().mapPartitions { iter =>
         iter.take(limit).map(row => (false, row.copy()))
@@ -166,7 +166,7 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)
 
   // TODO: Terminal split should be implemented differently from non-terminal split.
   // TODO: Pick num splits based on |limit|.
-  override def execute(): RDD[Row] = sparkContext.makeRDD(collectData(), 1)
+  protected override def doExecute(): RDD[Row] = sparkContext.makeRDD(collectData(), 1)
 
   override def outputOrdering: Seq[SortOrder] = sortOrder
 }
@@ -186,7 +186,7 @@ case class Sort(
   override def requiredChildDistribution: Seq[Distribution] =
     if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
 
-  override def execute(): RDD[Row] = attachTree(this, "sort") {
+  protected override def doExecute(): RDD[Row] = attachTree(this, "sort") {
     child.execute().mapPartitions( { iterator =>
       val ordering = newOrdering(sortOrder, child.output)
       iterator.map(_.copy()).toArray.sorted(ordering).iterator
@@ -214,7 +214,7 @@ case class ExternalSort(
   override def requiredChildDistribution: Seq[Distribution] =
     if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
 
-  override def execute(): RDD[Row] = attachTree(this, "sort") {
+  protected override def doExecute(): RDD[Row] = attachTree(this, "sort") {
     child.execute().mapPartitions( { iterator =>
       val ordering = newOrdering(sortOrder, child.output)
       val sorter = new ExternalSorter[Row, Null, Row](ordering = Some(ordering))
@@ -244,7 +244,7 @@ case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {
   override def requiredChildDistribution: Seq[Distribution] =
     if (partial) UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: Nil
 
-  override def execute(): RDD[Row] = {
+  protected override def doExecute(): RDD[Row] = {
     child.execute().mapPartitions { iter =>
       val hashSet = new scala.collection.mutable.HashSet[Row]()
 
@@ -270,7 +270,7 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan)
   extends UnaryNode {
   override def output: Seq[Attribute] = child.output
 
-  override def execute(): RDD[Row] = {
+  protected override def doExecute(): RDD[Row] = {
     child.execute().map(_.copy()).coalesce(numPartitions, shuffle)
   }
 }
@@ -285,7 +285,7 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan)
 case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
   override def output: Seq[Attribute] = left.output
 
-  override def execute(): RDD[Row] = {
+  protected override def doExecute(): RDD[Row] = {
     left.execute().map(_.copy()).subtract(right.execute().map(_.copy()))
   }
 }
@@ -299,7 +299,7 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
 case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
   override def output: Seq[Attribute] = children.head.output
 
-  override def execute(): RDD[Row] = {
+  protected override def doExecute(): RDD[Row] = {
     left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
   }
 }
@@ -314,5 +314,5 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
 case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPlan {
   def children: Seq[SparkPlan] = child :: Nil
 
-  def execute(): RDD[Row] = child.execute()
+  protected override def doExecute(): RDD[Row] = child.execute()
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bd61f070/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 388a818..49b361e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -64,7 +64,7 @@ private[sql] case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan
 
   override def executeTake(limit: Int): Array[Row] = sideEffectResult.take(limit).toArray
 
-  override def execute(): RDD[Row] = {
+  protected override def doExecute(): RDD[Row] = {
     val converted = sideEffectResult.map(r =>
       CatalystTypeConverters.convertToCatalyst(r, schema).asInstanceOf[Row])
     sqlContext.sparkContext.parallelize(converted, 1)

http://git-wip-us.apache.org/repos/asf/spark/blob/bd61f070/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index 7107870..dffb265 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -125,7 +125,7 @@ package object debug {
       }
     }
 
-    def execute(): RDD[Row] = {
+    protected override def doExecute(): RDD[Row] = {
       child.execute().mapPartitions { iter =>
         new Iterator[Row] {
           def hasNext: Boolean = iter.hasNext
@@ -193,7 +193,7 @@ package object debug {
 
     def children: List[SparkPlan] = child :: Nil
 
-    def execute(): RDD[Row] = {
+    protected override def doExecute(): RDD[Row] = {
       child.execute().map { row =>
         try typeCheck(row, child.schema) catch {
           case e: Exception =>

http://git-wip-us.apache.org/repos/asf/spark/blob/bd61f070/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index 926f5e6..05dd568 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -66,7 +66,7 @@ case class BroadcastHashJoin(
     sparkContext.broadcast(hashed)
   }
 
-  override def execute(): RDD[Row] = {
+  protected override def doExecute(): RDD[Row] = {
     val broadcastRelation = Await.result(broadcastFuture, timeout)
 
     streamedPlan.execute().mapPartitions { streamedIter =>

http://git-wip-us.apache.org/repos/asf/spark/blob/bd61f070/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
index 3ef1e0d..640fc26 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
@@ -38,7 +38,7 @@ case class BroadcastLeftSemiJoinHash(
 
   override def output: Seq[Attribute] = left.output
 
-  override def execute(): RDD[Row] = {
+  protected override def doExecute(): RDD[Row] = {
     val buildIter= buildPlan.execute().map(_.copy()).collect().toIterator
     val hashSet = new java.util.HashSet[Row]()
     var currentRow: Row = null

http://git-wip-us.apache.org/repos/asf/spark/blob/bd61f070/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
index 6aaf35f..caad3df 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
@@ -61,7 +61,7 @@ case class BroadcastNestedLoopJoin(
   @transient private lazy val boundCondition =
     newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
 
-  override def execute(): RDD[Row] = {
+  protected override def doExecute(): RDD[Row] = {
     val broadcastedRelation =
       sparkContext.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bd61f070/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
index 1cbc983..191c00c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
 case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode {
   override def output: Seq[Attribute] = left.output ++ right.output
 
-  override def execute(): RDD[Row] = {
+  protected override def doExecute(): RDD[Row] = {
     val leftResults = left.execute().map(_.copy())
     val rightResults = right.execute().map(_.copy())
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bd61f070/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
index a396c0f..4557439 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
@@ -183,7 +183,7 @@ case class HashOuterJoin(
     hashTable
   }
 
-  override def execute(): RDD[Row] = {
+  protected override def doExecute(): RDD[Row] = {
     val joinedRow = new JoinedRow()
     left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
       // TODO this probably can be replaced by external sort (sort merged join?)

http://git-wip-us.apache.org/repos/asf/spark/blob/bd61f070/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
index b03af41..036423e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
@@ -47,7 +47,7 @@ case class LeftSemiJoinBNL(
   @transient private lazy val boundCondition =
     newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
 
-  override def execute(): RDD[Row] = {
+  protected override def doExecute(): RDD[Row] = {
     val broadcastedRelation =
       sparkContext.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bd61f070/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
index a04f2a6..8ad27ea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
@@ -42,7 +42,7 @@ case class LeftSemiJoinHash(
 
   override def output: Seq[Attribute] = left.output
 
-  override def execute(): RDD[Row] = {
+  protected override def doExecute(): RDD[Row] = {
     buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
       val hashSet = new java.util.HashSet[Row]()
       var currentRow: Row = null

http://git-wip-us.apache.org/repos/asf/spark/blob/bd61f070/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
index a6cd833..219525d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
@@ -43,7 +43,7 @@ case class ShuffledHashJoin(
   override def requiredChildDistribution: Seq[ClusteredDistribution] =
     ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
 
-  override def execute(): RDD[Row] = {
+  protected override def doExecute(): RDD[Row] = {
     buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
       val hashed = HashedRelation(buildIter, buildSideKeyGenerator)
       hashJoin(streamIter, hashed)

http://git-wip-us.apache.org/repos/asf/spark/blob/bd61f070/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
index b512366..1a39fb4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
@@ -60,7 +60,7 @@ case class SortMergeJoin(
   private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] =
     keys.map(SortOrder(_, Ascending))
 
-  override def execute(): RDD[Row] = {
+  protected override def doExecute(): RDD[Row] = {
     val leftResults = left.execute().map(_.copy())
     val rightResults = right.execute().map(_.copy())
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bd61f070/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
index 58cb198..3dbc383 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
@@ -228,7 +228,7 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child:
 
   def children: Seq[SparkPlan] = child :: Nil
 
-  def execute(): RDD[Row] = {
+  protected override def doExecute(): RDD[Row] = {
     val childResults = child.execute().map(_.copy())
 
     val parent = childResults.mapPartitions { iter =>

http://git-wip-us.apache.org/repos/asf/spark/blob/bd61f070/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index aded126..75ac52d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -77,7 +77,7 @@ private[sql] case class ParquetTableScan(
     }
   }.toArray
 
-  override def execute(): RDD[Row] = {
+  protected override def doExecute(): RDD[Row] = {
     import parquet.filter2.compat.FilterCompat.FilterPredicateCompat
 
     val sc = sqlContext.sparkContext
@@ -255,7 +255,7 @@ private[sql] case class InsertIntoParquetTable(
   /**
    * Inserts all rows into the Parquet file.
    */
-  override def execute(): RDD[Row] = {
+  protected override def doExecute(): RDD[Row] = {
     // TODO: currently we do not check whether the "schema"s are compatible
     // That means if one first creates a table and then INSERTs data with
     // and incompatible schema the execution will fail. It would be nice

http://git-wip-us.apache.org/repos/asf/spark/blob/bd61f070/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
index 0a5f19e..62dc416 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
@@ -129,7 +129,7 @@ case class HiveTableScan(
     }
   }
 
-  override def execute(): RDD[Row] = if (!relation.hiveQlTable.isPartitioned) {
+  protected override def doExecute(): RDD[Row] = if (!relation.hiveQlTable.isPartitioned) {
     hadoopReader.makeRDDForTable(relation.hiveQlTable)
   } else {
     hadoopReader.makeRDDForPartitionedTable(prunePartitions(relation.hiveQlPartitions))

http://git-wip-us.apache.org/repos/asf/spark/blob/bd61f070/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index de8954d..c0b0b10 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -258,5 +258,7 @@ case class InsertIntoHiveTable(
 
   override def executeCollect(): Array[Row] = sideEffectResult.toArray
 
-  override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1)
+  protected override def doExecute(): RDD[Row] = {
+    sqlContext.sparkContext.parallelize(sideEffectResult, 1)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bd61f070/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index 3eddda3..bfd26e0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -54,7 +54,7 @@ case class ScriptTransformation(
 
   override def otherCopyArgs: Seq[HiveContext] = sc :: Nil
 
-  def execute(): RDD[Row] = {
+  protected override def doExecute(): RDD[Row] = {
     child.execute().mapPartitions { iter =>
       val cmd = List("/bin/bash", "-c", script)
       val builder = new ProcessBuilder(cmd)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org