You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2016/02/25 02:16:50 UTC

spark git commit: [SPARK-13250] [SQL] Update PhysicallRDD to convert to UnsafeRow if using the vectorized scanner.

Repository: spark
Updated Branches:
  refs/heads/master cbb0b65ad -> 5a7af9e7a


[SPARK-13250] [SQL] Update PhysicallRDD to convert to UnsafeRow if using the vectorized scanner.

Some parts of the engine rely on UnsafeRow which the vectorized parquet scanner does not want
to produce. This add a conversion in Physical RDD. In the case where codegen is used (and the
scan is the start of the pipeline), there is no requirement to use UnsafeRow. This patch adds
update PhysicallRDD to support codegen, which eliminates the need for the UnsafeRow conversion
in all cases.

The result of these changes for TPCDS-Q19 at the 10gb sf reduces the query time from 9.5 seconds
to 6.5 seconds.

Author: Nong Li <no...@databricks.com>

Closes #11141 from nongli/spark-13250.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a7af9e7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a7af9e7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a7af9e7

Branch: refs/heads/master
Commit: 5a7af9e7ac85e04aa4a420bc2887207bfa18f792
Parents: cbb0b65
Author: Nong Li <no...@databricks.com>
Authored: Wed Feb 24 17:16:45 2016 -0800
Committer: Davies Liu <da...@gmail.com>
Committed: Wed Feb 24 17:16:45 2016 -0800

----------------------------------------------------------------------
 python/pyspark/sql/dataframe.py                 |  3 +-
 .../spark/sql/execution/ExistingRDD.scala       | 45 ++++++++++++++--
 .../spark/sql/execution/WholeStageCodegen.scala |  1 +
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   |  6 ++-
 .../spark/sql/sources/FilteredScanSuite.scala   | 54 +++++++++++---------
 .../spark/sql/sources/PrunedScanSuite.scala     | 45 +++++++++-------
 .../spark/sql/sources/BucketedReadSuite.scala   | 46 +++++++++--------
 7 files changed, 129 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5a7af9e7/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index bf43452..7275e69 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -173,7 +173,8 @@ class DataFrame(object):
 
         >>> df.explain()
         == Physical Plan ==
-        Scan ExistingRDD[age#0,name#1]
+        WholeStageCodegen
+        :  +- Scan ExistingRDD[age#0,name#1]
 
         >>> df.explain(True)
         == Parsed Logical Plan ==

http://git-wip-us.apache.org/repos/asf/spark/blob/5a7af9e7/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index cad7e25..8649d2d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -18,12 +18,14 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
+import org.apache.spark.sql.{AnalysisException, Row, SQLConf, SQLContext}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation}
 import org.apache.spark.sql.types.DataType
@@ -102,7 +104,7 @@ private[sql] case class PhysicalRDD(
     override val metadata: Map[String, String] = Map.empty,
     isUnsafeRow: Boolean = false,
     override val outputPartitioning: Partitioning = UnknownPartitioning(0))
-  extends LeafNode {
+  extends LeafNode with CodegenSupport {
 
   private[sql] override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
@@ -128,6 +130,36 @@ private[sql] case class PhysicalRDD(
     val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"
     s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}"
   }
+
+  override def upstreams(): Seq[RDD[InternalRow]] = {
+    rdd :: Nil
+  }
+
+  // Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen
+  // never requires UnsafeRow as input.
+  override protected def doProduce(ctx: CodegenContext): String = {
+    val input = ctx.freshName("input")
+    // PhysicalRDD always just has one input
+    ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
+
+    val exprs = output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, true))
+    val row = ctx.freshName("row")
+    val numOutputRows = metricTerm(ctx, "numOutputRows")
+    ctx.INPUT_ROW = row
+    ctx.currentVars = null
+    val columns = exprs.map(_.gen(ctx))
+    s"""
+       | while ($input.hasNext()) {
+       |   InternalRow $row = (InternalRow) $input.next();
+       |   $numOutputRows.add(1);
+       |   ${columns.map(_.code).mkString("\n").trim}
+       |   ${consume(ctx, columns).trim}
+       |   if (shouldStop()) {
+       |     return;
+       |   }
+       | }
+     """.stripMargin
+  }
 }
 
 private[sql] object PhysicalRDD {
@@ -140,8 +172,13 @@ private[sql] object PhysicalRDD {
       rdd: RDD[InternalRow],
       relation: BaseRelation,
       metadata: Map[String, String] = Map.empty): PhysicalRDD = {
-    // All HadoopFsRelations output UnsafeRows
-    val outputUnsafeRows = relation.isInstanceOf[HadoopFsRelation]
+    val outputUnsafeRows = if (relation.isInstanceOf[ParquetRelation]) {
+      // The vectorized parquet reader does not produce unsafe rows.
+      !SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
+    } else {
+      // All HadoopFsRelations output UnsafeRows
+      relation.isInstanceOf[HadoopFsRelation]
+    }
 
     val bucketSpec = relation match {
       case r: HadoopFsRelation => r.getBucketSpec

http://git-wip-us.apache.org/repos/asf/spark/blob/5a7af9e7/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index 08c52e5..afaddcf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -42,6 +42,7 @@ trait CodegenSupport extends SparkPlan {
     case _: TungstenAggregate => "agg"
     case _: BroadcastHashJoin => "bhj"
     case _: SortMergeJoin => "smj"
+    case _: PhysicalRDD => "rdd"
     case _ => nodeName.toLowerCase
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5a7af9e7/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 7a0f7ab..f8a9a95 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -187,8 +187,10 @@ class JDBCSuite extends SparkFunSuite
       val parentPlan = df.queryExecution.executedPlan
       // Check if SparkPlan Filter is removed in a physical plan and
       // the plan only has PhysicalRDD to scan JDBCRelation.
-      assert(parentPlan.isInstanceOf[PhysicalRDD])
-      assert(parentPlan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation"))
+      assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen])
+      val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]
+      assert(node.plan.isInstanceOf[org.apache.spark.sql.execution.PhysicalRDD])
+      assert(node.plan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation"))
       df
     }
     assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0)

http://git-wip-us.apache.org/repos/asf/spark/blob/5a7af9e7/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
index 7196b6d..4b578a6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
@@ -304,30 +304,38 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
     expectedCount: Int,
     requiredColumnNames: Set[String],
     expectedUnhandledFilters: Set[Filter]): Unit = {
+
     test(s"PushDown Returns $expectedCount: $sqlString") {
-      val queryExecution = sql(sqlString).queryExecution
-      val rawPlan = queryExecution.executedPlan.collect {
-        case p: execution.PhysicalRDD => p
-      } match {
-        case Seq(p) => p
-        case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
-      }
-      val rawCount = rawPlan.execute().count()
-      assert(ColumnsRequired.set === requiredColumnNames)
-
-      val table = caseInsensitiveContext.table("oneToTenFiltered")
-      val relation = table.queryExecution.logical.collectFirst {
-        case LogicalRelation(r, _, _) => r
-      }.get
-
-      assert(
-        relation.unhandledFilters(FiltersPushed.list.toArray).toSet === expectedUnhandledFilters)
-
-      if (rawCount != expectedCount) {
-        fail(
-          s"Wrong # of results for pushed filter. Got $rawCount, Expected $expectedCount\n" +
-            s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" +
-            queryExecution)
+      // These tests check a particular plan, disable whole stage codegen.
+      caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false)
+      try {
+        val queryExecution = sql(sqlString).queryExecution
+        val rawPlan = queryExecution.executedPlan.collect {
+          case p: execution.PhysicalRDD => p
+        } match {
+          case Seq(p) => p
+          case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
+        }
+        val rawCount = rawPlan.execute().count()
+        assert(ColumnsRequired.set === requiredColumnNames)
+
+        val table = caseInsensitiveContext.table("oneToTenFiltered")
+        val relation = table.queryExecution.logical.collectFirst {
+          case LogicalRelation(r, _, _) => r
+        }.get
+
+        assert(
+          relation.unhandledFilters(FiltersPushed.list.toArray).toSet === expectedUnhandledFilters)
+
+        if (rawCount != expectedCount) {
+          fail(
+            s"Wrong # of results for pushed filter. Got $rawCount, Expected $expectedCount\n" +
+              s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" +
+              queryExecution)
+        }
+      } finally {
+        caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED,
+          SQLConf.WHOLESTAGE_CODEGEN_ENABLED.defaultValue.get)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/5a7af9e7/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
index a89c5f8..02e8ab1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
@@ -117,28 +117,35 @@ class PrunedScanSuite extends DataSourceTest with SharedSQLContext {
 
   def testPruning(sqlString: String, expectedColumns: String*): Unit = {
     test(s"Columns output ${expectedColumns.mkString(",")}: $sqlString") {
-      val queryExecution = sql(sqlString).queryExecution
-      val rawPlan = queryExecution.executedPlan.collect {
-        case p: execution.PhysicalRDD => p
-      } match {
-        case Seq(p) => p
-        case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
-      }
-      val rawColumns = rawPlan.output.map(_.name)
-      val rawOutput = rawPlan.execute().first()
-
-      if (rawColumns != expectedColumns) {
-        fail(
-          s"Wrong column names. Got $rawColumns, Expected $expectedColumns\n" +
-          s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" +
-            queryExecution)
-      }
 
-      if (rawOutput.numFields != expectedColumns.size) {
-        fail(s"Wrong output row. Got $rawOutput\n$queryExecution")
+      // These tests check a particular plan, disable whole stage codegen.
+      caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false)
+      try {
+        val queryExecution = sql(sqlString).queryExecution
+        val rawPlan = queryExecution.executedPlan.collect {
+          case p: execution.PhysicalRDD => p
+        } match {
+          case Seq(p) => p
+          case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
+        }
+        val rawColumns = rawPlan.output.map(_.name)
+        val rawOutput = rawPlan.execute().first()
+
+        if (rawColumns != expectedColumns) {
+          fail(
+            s"Wrong column names. Got $rawColumns, Expected $expectedColumns\n" +
+              s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" +
+              queryExecution)
+        }
+
+        if (rawOutput.numFields != expectedColumns.size) {
+          fail(s"Wrong output row. Got $rawOutput\n$queryExecution")
+        }
+      } finally {
+        caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED,
+          SQLConf.WHOLESTAGE_CODEGEN_ENABLED.defaultValue.get)
       }
     }
   }
-
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5a7af9e7/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 0ef42f4..62f6b99 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -74,32 +74,34 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
       bucketValues: Seq[Integer],
       filterCondition: Column,
       originalDataFrame: DataFrame): Unit = {
+    // This test verifies parts of the plan. Disable whole stage codegen.
+    withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
+      val bucketedDataFrame = hiveContext.table("bucketed_table").select("i", "j", "k")
+      val BucketSpec(numBuckets, bucketColumnNames, _) = bucketSpec
+      // Limit: bucket pruning only works when the bucket column has one and only one column
+      assert(bucketColumnNames.length == 1)
+      val bucketColumnIndex = bucketedDataFrame.schema.fieldIndex(bucketColumnNames.head)
+      val bucketColumn = bucketedDataFrame.schema.toAttributes(bucketColumnIndex)
+      val matchedBuckets = new BitSet(numBuckets)
+      bucketValues.foreach { value =>
+        matchedBuckets.set(DataSourceStrategy.getBucketId(bucketColumn, numBuckets, value))
+      }
 
-    val bucketedDataFrame = hiveContext.table("bucketed_table").select("i", "j", "k")
-    val BucketSpec(numBuckets, bucketColumnNames, _) = bucketSpec
-    // Limit: bucket pruning only works when the bucket column has one and only one column
-    assert(bucketColumnNames.length == 1)
-    val bucketColumnIndex = bucketedDataFrame.schema.fieldIndex(bucketColumnNames.head)
-    val bucketColumn = bucketedDataFrame.schema.toAttributes(bucketColumnIndex)
-    val matchedBuckets = new BitSet(numBuckets)
-    bucketValues.foreach { value =>
-      matchedBuckets.set(DataSourceStrategy.getBucketId(bucketColumn, numBuckets, value))
-    }
+      // Filter could hide the bug in bucket pruning. Thus, skipping all the filters
+      val plan = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan
+      val rdd = plan.find(_.isInstanceOf[PhysicalRDD])
+      assert(rdd.isDefined, plan)
 
-    // Filter could hide the bug in bucket pruning. Thus, skipping all the filters
-    val rdd = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan
-      .find(_.isInstanceOf[PhysicalRDD])
-    assert(rdd.isDefined)
+      val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) =>
+        if (matchedBuckets.get(index % numBuckets)) Iterator(true) else Iterator(iter.isEmpty)
+      }
+      // checking if all the pruned buckets are empty
+      assert(checkedResult.collect().forall(_ == true))
 
-    val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) =>
-      if (matchedBuckets.get(index % numBuckets)) Iterator(true) else Iterator(iter.isEmpty)
+      checkAnswer(
+        bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k"),
+        originalDataFrame.filter(filterCondition).orderBy("i", "j", "k"))
     }
-    // checking if all the pruned buckets are empty
-    assert(checkedResult.collect().forall(_ == true))
-
-    checkAnswer(
-      bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k"),
-      originalDataFrame.filter(filterCondition).orderBy("i", "j", "k"))
   }
 
   test("read partitioning bucketed tables with bucket pruning filters") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org