You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/10/20 01:57:26 UTC

spark git commit: [SPARK-11088][SQL] Merges partition values using UnsafeProjection

Repository: spark
Updated Branches:
  refs/heads/master 16906ef23 -> 8b877cc4e


[SPARK-11088][SQL] Merges partition values using UnsafeProjection

`DataSourceStrategy.mergeWithPartitionValues` is essentially a projection implemented in a quite inefficient way. This PR optimizes this method with `UnsafeProjection` to avoid unnecessary boxing costs.

Author: Cheng Lian <li...@databricks.com>

Closes #9104 from liancheng/spark-11088.faster-partition-values-merging.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b877cc4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b877cc4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b877cc4

Branch: refs/heads/master
Commit: 8b877cc4ee46cad9d1f7cac451801c1410f6c1fe
Parents: 16906ef
Author: Cheng Lian <li...@databricks.com>
Authored: Mon Oct 19 16:57:20 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon Oct 19 16:57:20 2015 -0700

----------------------------------------------------------------------
 .../datasources/DataSourceStrategy.scala        | 73 +++++++-------------
 1 file changed, 24 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8b877cc4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 33181fa..ffb4645 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -140,29 +140,30 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
     val sharedHadoopConf = SparkHadoopUtil.get.conf
     val confBroadcast =
       relation.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))
+    val partitionColumnNames = partitionColumns.fieldNames.toSet
 
     // Now, we create a scan builder, which will be used by pruneFilterProject. This scan builder
     // will union all partitions and attach partition values if needed.
     val scanBuilder = {
-      (columns: Seq[Attribute], filters: Array[Filter]) => {
+      (requiredColumns: Seq[Attribute], filters: Array[Filter]) => {
+        val requiredDataColumns =
+          requiredColumns.filterNot(c => partitionColumnNames.contains(c.name))
+
         // Builds RDD[Row]s for each selected partition.
         val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
-          val partitionColNames = partitionColumns.fieldNames
-
           // Don't scan any partition columns to save I/O.  Here we are being optimistic and
           // assuming partition columns data stored in data files are always consistent with those
           // partition values encoded in partition directory paths.
-          val needed = columns.filterNot(a => partitionColNames.contains(a.name))
-          val dataRows =
-            relation.buildScan(needed.map(_.name).toArray, filters, Array(dir), confBroadcast)
+          val dataRows = relation.buildScan(
+            requiredDataColumns.map(_.name).toArray, filters, Array(dir), confBroadcast)
 
           // Merges data values with partition values.
           mergeWithPartitionValues(
-            relation.schema,
-            columns.map(_.name).toArray,
-            partitionColNames,
+            requiredColumns,
+            requiredDataColumns,
+            partitionColumns,
             partitionValues,
-            toCatalystRDD(logicalRelation, needed, dataRows))
+            toCatalystRDD(logicalRelation, requiredDataColumns, dataRows))
         }
 
         val unionedRows =
@@ -188,52 +189,27 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
     sparkPlan
   }
 
-  // TODO: refactor this thing. It is very complicated because it does projection internally.
-  // We should just put a project on top of this.
   private def mergeWithPartitionValues(
-      schema: StructType,
-      requiredColumns: Array[String],
-      partitionColumns: Array[String],
+      requiredColumns: Seq[Attribute],
+      dataColumns: Seq[Attribute],
+      partitionColumnSchema: StructType,
       partitionValues: InternalRow,
       dataRows: RDD[InternalRow]): RDD[InternalRow] = {
-    val nonPartitionColumns = requiredColumns.filterNot(partitionColumns.contains)
-
     // If output columns contain any partition column(s), we need to merge scanned data
     // columns and requested partition columns to form the final result.
-    if (!requiredColumns.sameElements(nonPartitionColumns)) {
-      val mergers = requiredColumns.zipWithIndex.map { case (name, index) =>
-        // To see whether the `index`-th column is a partition column...
-        val i = partitionColumns.indexOf(name)
-        if (i != -1) {
-          val dt = schema(partitionColumns(i)).dataType
-          // If yes, gets column value from partition values.
-          (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => {
-            mutableRow(ordinal) = partitionValues.get(i, dt)
-          }
-        } else {
-          // Otherwise, inherits the value from scanned data.
-          val i = nonPartitionColumns.indexOf(name)
-          val dt = schema(nonPartitionColumns(i)).dataType
-          (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => {
-            mutableRow(ordinal) = dataRow.get(i, dt)
-          }
-        }
+    if (requiredColumns != dataColumns) {
+      // Builds `AttributeReference`s for all partition columns so that we can use them to project
+      // required partition columns.  Note that if a partition column appears in `requiredColumns`,
+      // we should use the `AttributeReference` in `requiredColumns`.
+      val requiredColumnMap = requiredColumns.map(a => a.name -> a).toMap
+      val partitionColumns = partitionColumnSchema.toAttributes.map { a =>
+        requiredColumnMap.getOrElse(a.name, a)
       }
 
-      // Since we know for sure that this closure is serializable, we can avoid the overhead
-      // of cleaning a closure for each RDD by creating our own MapPartitionsRDD. Functionally
-      // this is equivalent to calling `dataRows.mapPartitions(mapPartitionsFunc)` (SPARK-7718).
       val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: Iterator[InternalRow]) => {
-        val dataTypes = requiredColumns.map(schema(_).dataType)
-        val mutableRow = new SpecificMutableRow(dataTypes)
-        iterator.map { dataRow =>
-          var i = 0
-          while (i < mutableRow.numFields) {
-            mergers(i)(mutableRow, dataRow, i)
-            i += 1
-          }
-          mutableRow.asInstanceOf[InternalRow]
-        }
+        val projection = UnsafeProjection.create(requiredColumns, dataColumns ++ partitionColumns)
+        val mutableJoinedRow = new JoinedRow()
+        iterator.map(dataRow => projection(mutableJoinedRow(dataRow, partitionValues)))
       }
 
       // This is an internal RDD whose call site the user should not be concerned with
@@ -242,7 +218,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
       Utils.withDummyCallSite(dataRows.sparkContext) {
         new MapPartitionsRDD(dataRows, mapPartitionsFunc, preservesPartitioning = false)
       }
-
     } else {
       dataRows
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org