You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/07/12 16:23:59 UTC

spark git commit: [SPARK-17701][SQL] Refactor RowDataSourceScanExec so its sameResult call does not compare strings

Repository: spark
Updated Branches:
  refs/heads/master d2d2a5de1 -> 780586a9f


[SPARK-17701][SQL] Refactor RowDataSourceScanExec so its sameResult call does not compare strings

## What changes were proposed in this pull request?

Currently, `RowDataSourceScanExec` and `FileSourceScanExec` rely on a "metadata" string map to implement equality comparison, since the RDDs they depend on cannot be directly compared. This has resulted in a number of correctness bugs around exchange reuse, e.g. SPARK-17673 and SPARK-16818.

To make these comparisons less brittle, we should refactor these classes to compare constructor parameters directly instead of relying on the metadata map.

This PR refactors `RowDataSourceScanExec`, `FileSourceScanExec` will be fixed in the follow-up PR.

## How was this patch tested?

existing tests

Author: Wenchen Fan <we...@databricks.com>

Closes #18600 from cloud-fan/minor.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/780586a9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/780586a9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/780586a9

Branch: refs/heads/master
Commit: 780586a9f2400c3fdfdb9a6b954001a3c9663941
Parents: d2d2a5d
Author: Wenchen Fan <we...@databricks.com>
Authored: Wed Jul 12 09:23:54 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed Jul 12 09:23:54 2017 -0700

----------------------------------------------------------------------
 .../sql/execution/DataSourceScanExec.scala      | 65 ++++++++++----------
 .../apache/spark/sql/execution/SparkPlan.scala  |  5 --
 .../spark/sql/execution/SparkPlanInfo.scala     |  4 +-
 .../datasources/DataSourceStrategy.scala        | 57 +++++++----------
 .../spark/sql/execution/ui/SparkPlanGraph.scala |  5 +-
 5 files changed, 56 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/780586a9/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index a0def68..588c937 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -33,21 +33,23 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
 import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 
 trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
   val relation: BaseRelation
-  val metastoreTableIdentifier: Option[TableIdentifier]
+  val tableIdentifier: Option[TableIdentifier]
 
   protected val nodeNamePrefix: String = ""
 
   override val nodeName: String = {
-    s"Scan $relation ${metastoreTableIdentifier.map(_.unquotedString).getOrElse("")}"
+    s"Scan $relation ${tableIdentifier.map(_.unquotedString).getOrElse("")}"
   }
 
+  // Metadata that describes more details of this scan.
+  protected def metadata: Map[String, String]
+
   override def simpleString: String = {
     val metadataEntries = metadata.toSeq.sorted.map {
       case (key, value) =>
@@ -73,34 +75,25 @@ trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
 
 /** Physical plan node for scanning data from a relation. */
 case class RowDataSourceScanExec(
-    output: Seq[Attribute],
+    fullOutput: Seq[Attribute],
+    requiredColumnsIndex: Seq[Int],
+    filters: Set[Filter],
+    handledFilters: Set[Filter],
     rdd: RDD[InternalRow],
     @transient relation: BaseRelation,
-    override val outputPartitioning: Partitioning,
-    override val metadata: Map[String, String],
-    override val metastoreTableIdentifier: Option[TableIdentifier])
+    override val tableIdentifier: Option[TableIdentifier])
   extends DataSourceScanExec {
 
+  def output: Seq[Attribute] = requiredColumnsIndex.map(fullOutput)
+
   override lazy val metrics =
     Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
-  val outputUnsafeRows = relation match {
-    case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] =>
-      !SparkSession.getActiveSession.get.sessionState.conf.getConf(
-        SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
-    case _: HadoopFsRelation => true
-    case _ => false
-  }
-
   protected override def doExecute(): RDD[InternalRow] = {
-    val unsafeRow = if (outputUnsafeRows) {
-      rdd
-    } else {
-      rdd.mapPartitionsWithIndexInternal { (index, iter) =>
-        val proj = UnsafeProjection.create(schema)
-        proj.initialize(index)
-        iter.map(proj)
-      }
+    val unsafeRow = rdd.mapPartitionsWithIndexInternal { (index, iter) =>
+      val proj = UnsafeProjection.create(schema)
+      proj.initialize(index)
+      iter.map(proj)
     }
 
     val numOutputRows = longMetric("numOutputRows")
@@ -126,24 +119,31 @@ case class RowDataSourceScanExec(
     ctx.INPUT_ROW = row
     ctx.currentVars = null
     val columnsRowInput = exprRows.map(_.genCode(ctx))
-    val inputRow = if (outputUnsafeRows) row else null
     s"""
        |while ($input.hasNext()) {
        |  InternalRow $row = (InternalRow) $input.next();
        |  $numOutputRows.add(1);
-       |  ${consume(ctx, columnsRowInput, inputRow).trim}
+       |  ${consume(ctx, columnsRowInput, null).trim}
        |  if (shouldStop()) return;
        |}
      """.stripMargin
   }
 
-  // Only care about `relation` and `metadata` when canonicalizing.
+  override val metadata: Map[String, String] = {
+    val markedFilters = for (filter <- filters) yield {
+      if (handledFilters.contains(filter)) s"*$filter" else s"$filter"
+    }
+    Map(
+      "ReadSchema" -> output.toStructType.catalogString,
+      "PushedFilters" -> markedFilters.mkString("[", ", ", "]"))
+  }
+
+  // Don't care about `rdd` and `tableIdentifier` when canonicalizing.
   override lazy val canonicalized: SparkPlan =
     copy(
-      output.map(QueryPlan.normalizeExprId(_, output)),
+      fullOutput.map(QueryPlan.normalizeExprId(_, fullOutput)),
       rdd = null,
-      outputPartitioning = null,
-      metastoreTableIdentifier = None)
+      tableIdentifier = None)
 }
 
 /**
@@ -154,7 +154,7 @@ case class RowDataSourceScanExec(
  * @param requiredSchema Required schema of the underlying relation, excluding partition columns.
  * @param partitionFilters Predicates to use for partition pruning.
  * @param dataFilters Filters on non-partition columns.
- * @param metastoreTableIdentifier identifier for the table in the metastore.
+ * @param tableIdentifier identifier for the table in the metastore.
  */
 case class FileSourceScanExec(
     @transient relation: HadoopFsRelation,
@@ -162,7 +162,7 @@ case class FileSourceScanExec(
     requiredSchema: StructType,
     partitionFilters: Seq[Expression],
     dataFilters: Seq[Expression],
-    override val metastoreTableIdentifier: Option[TableIdentifier])
+    override val tableIdentifier: Option[TableIdentifier])
   extends DataSourceScanExec with ColumnarBatchScan  {
 
   val supportsBatch: Boolean = relation.fileFormat.supportBatch(
@@ -261,7 +261,6 @@ case class FileSourceScanExec(
   private val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
   logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
 
-  // These metadata values make scan plans uniquely identifiable for equality checking.
   override val metadata: Map[String, String] = {
     def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
     val location = relation.location

http://git-wip-us.apache.org/repos/asf/spark/blob/780586a9/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index db97561..c7277c2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -72,11 +72,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
   }
 
   /**
-   * @return Metadata that describes more details of this SparkPlan.
-   */
-  def metadata: Map[String, String] = Map.empty
-
-  /**
    * @return All metrics containing metrics of this SparkPlan.
    */
   def metrics: Map[String, SQLMetric] = Map.empty

http://git-wip-us.apache.org/repos/asf/spark/blob/780586a9/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
index 7aa9312..06b6962 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
@@ -31,7 +31,6 @@ class SparkPlanInfo(
     val nodeName: String,
     val simpleString: String,
     val children: Seq[SparkPlanInfo],
-    val metadata: Map[String, String],
     val metrics: Seq[SQLMetricInfo]) {
 
   override def hashCode(): Int = {
@@ -58,7 +57,6 @@ private[execution] object SparkPlanInfo {
       new SQLMetricInfo(metric.name.getOrElse(key), metric.id, metric.metricType)
     }
 
-    new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan),
-      plan.metadata, metrics)
+    new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan), metrics)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/780586a9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index e05a8d5..587b9b4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.datasources
 
 import java.util.concurrent.Callable
 
-import scala.collection.mutable.ArrayBuffer
-
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
@@ -288,10 +286,11 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
     case l @ LogicalRelation(baseRelation: TableScan, _, _) =>
       RowDataSourceScanExec(
         l.output,
+        l.output.indices,
+        Set.empty,
+        Set.empty,
         toCatalystRDD(l, baseRelation.buildScan()),
         baseRelation,
-        UnknownPartitioning(0),
-        Map.empty,
         None) :: Nil
 
     case _ => Nil
@@ -354,36 +353,10 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
     val (unhandledPredicates, pushedFilters, handledFilters) =
       selectFilters(relation.relation, candidatePredicates)
 
-    // A set of column attributes that are only referenced by pushed down filters.  We can eliminate
-    // them from requested columns.
-    val handledSet = {
-      val handledPredicates = filterPredicates.filterNot(unhandledPredicates.contains)
-      val unhandledSet = AttributeSet(unhandledPredicates.flatMap(_.references))
-      AttributeSet(handledPredicates.flatMap(_.references)) --
-        (projectSet ++ unhandledSet).map(relation.attributeMap)
-    }
-
     // Combines all Catalyst filter `Expression`s that are either not convertible to data source
     // `Filter`s or cannot be handled by `relation`.
     val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)
 
-    // These metadata values make scan plans uniquely identifiable for equality checking.
-    // TODO(SPARK-17701) using strings for equality checking is brittle
-    val metadata: Map[String, String] = {
-      val pairs = ArrayBuffer.empty[(String, String)]
-
-      // Mark filters which are handled by the underlying DataSource with an Astrisk
-      if (pushedFilters.nonEmpty) {
-        val markedFilters = for (filter <- pushedFilters) yield {
-            if (handledFilters.contains(filter)) s"*$filter" else s"$filter"
-        }
-        pairs += ("PushedFilters" -> markedFilters.mkString("[", ", ", "]"))
-      }
-      pairs += ("ReadSchema" ->
-        StructType.fromAttributes(projects.map(_.toAttribute)).catalogString)
-      pairs.toMap
-    }
-
     if (projects.map(_.toAttribute) == projects &&
         projectSet.size == projects.size &&
         filterSet.subsetOf(projectSet)) {
@@ -395,24 +368,36 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
         .asInstanceOf[Seq[Attribute]]
         // Match original case of attributes.
         .map(relation.attributeMap)
-        // Don't request columns that are only referenced by pushed filters.
-        .filterNot(handledSet.contains)
 
       val scan = RowDataSourceScanExec(
-        projects.map(_.toAttribute),
+        relation.output,
+        requestedColumns.map(relation.output.indexOf),
+        pushedFilters.toSet,
+        handledFilters,
         scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
-        relation.relation, UnknownPartitioning(0), metadata,
+        relation.relation,
         relation.catalogTable.map(_.identifier))
       filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
     } else {
+      // A set of column attributes that are only referenced by pushed down filters.  We can
+      // eliminate them from requested columns.
+      val handledSet = {
+        val handledPredicates = filterPredicates.filterNot(unhandledPredicates.contains)
+        val unhandledSet = AttributeSet(unhandledPredicates.flatMap(_.references))
+        AttributeSet(handledPredicates.flatMap(_.references)) --
+          (projectSet ++ unhandledSet).map(relation.attributeMap)
+      }
       // Don't request columns that are only referenced by pushed filters.
       val requestedColumns =
         (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq
 
       val scan = RowDataSourceScanExec(
-        requestedColumns,
+        relation.output,
+        requestedColumns.map(relation.output.indexOf),
+        pushedFilters.toSet,
+        handledFilters,
         scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
-        relation.relation, UnknownPartitioning(0), metadata,
+        relation.relation,
         relation.catalogTable.map(_.identifier))
       execution.ProjectExec(
         projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))

http://git-wip-us.apache.org/repos/asf/spark/blob/780586a9/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
index 9d4ebcc..884f945 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
@@ -113,7 +113,7 @@ object SparkPlanGraph {
         }
         val node = new SparkPlanGraphNode(
           nodeIdGenerator.getAndIncrement(), planInfo.nodeName,
-          planInfo.simpleString, planInfo.metadata, metrics)
+          planInfo.simpleString, metrics)
         if (subgraph == null) {
           nodes += node
         } else {
@@ -143,7 +143,6 @@ private[ui] class SparkPlanGraphNode(
     val id: Long,
     val name: String,
     val desc: String,
-    val metadata: Map[String, String],
     val metrics: Seq[SQLPlanMetric]) {
 
   def makeDotNode(metricsValue: Map[Long, String]): String = {
@@ -177,7 +176,7 @@ private[ui] class SparkPlanGraphCluster(
     desc: String,
     val nodes: mutable.ArrayBuffer[SparkPlanGraphNode],
     metrics: Seq[SQLPlanMetric])
-  extends SparkPlanGraphNode(id, name, desc, Map.empty, metrics) {
+  extends SparkPlanGraphNode(id, name, desc, metrics) {
 
   override def makeDotNode(metricsValue: Map[Long, String]): String = {
     val duration = metrics.filter(_.name.startsWith(WholeStageCodegenExec.PIPELINE_DURATION_METRIC))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org