You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/10/20 01:57:26 UTC
spark git commit: [SPARK-11088][SQL] Merges partition values using
UnsafeProjection
Repository: spark
Updated Branches:
refs/heads/master 16906ef23 -> 8b877cc4e
[SPARK-11088][SQL] Merges partition values using UnsafeProjection
`DataSourceStrategy.mergeWithPartitionValues` is essentially a projection implemented in a quite inefficient way. This PR optimizes this method with `UnsafeProjection` to avoid unnecessary boxing costs.
Author: Cheng Lian <li...@databricks.com>
Closes #9104 from liancheng/spark-11088.faster-partition-values-merging.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b877cc4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b877cc4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b877cc4
Branch: refs/heads/master
Commit: 8b877cc4ee46cad9d1f7cac451801c1410f6c1fe
Parents: 16906ef
Author: Cheng Lian <li...@databricks.com>
Authored: Mon Oct 19 16:57:20 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon Oct 19 16:57:20 2015 -0700
----------------------------------------------------------------------
.../datasources/DataSourceStrategy.scala | 73 +++++++-------------
1 file changed, 24 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/8b877cc4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 33181fa..ffb4645 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -140,29 +140,30 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val sharedHadoopConf = SparkHadoopUtil.get.conf
val confBroadcast =
relation.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))
+ val partitionColumnNames = partitionColumns.fieldNames.toSet
// Now, we create a scan builder, which will be used by pruneFilterProject. This scan builder
// will union all partitions and attach partition values if needed.
val scanBuilder = {
- (columns: Seq[Attribute], filters: Array[Filter]) => {
+ (requiredColumns: Seq[Attribute], filters: Array[Filter]) => {
+ val requiredDataColumns =
+ requiredColumns.filterNot(c => partitionColumnNames.contains(c.name))
+
// Builds RDD[Row]s for each selected partition.
val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
- val partitionColNames = partitionColumns.fieldNames
-
// Don't scan any partition columns to save I/O. Here we are being optimistic and
// assuming partition columns data stored in data files are always consistent with those
// partition values encoded in partition directory paths.
- val needed = columns.filterNot(a => partitionColNames.contains(a.name))
- val dataRows =
- relation.buildScan(needed.map(_.name).toArray, filters, Array(dir), confBroadcast)
+ val dataRows = relation.buildScan(
+ requiredDataColumns.map(_.name).toArray, filters, Array(dir), confBroadcast)
// Merges data values with partition values.
mergeWithPartitionValues(
- relation.schema,
- columns.map(_.name).toArray,
- partitionColNames,
+ requiredColumns,
+ requiredDataColumns,
+ partitionColumns,
partitionValues,
- toCatalystRDD(logicalRelation, needed, dataRows))
+ toCatalystRDD(logicalRelation, requiredDataColumns, dataRows))
}
val unionedRows =
@@ -188,52 +189,27 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
sparkPlan
}
- // TODO: refactor this thing. It is very complicated because it does projection internally.
- // We should just put a project on top of this.
private def mergeWithPartitionValues(
- schema: StructType,
- requiredColumns: Array[String],
- partitionColumns: Array[String],
+ requiredColumns: Seq[Attribute],
+ dataColumns: Seq[Attribute],
+ partitionColumnSchema: StructType,
partitionValues: InternalRow,
dataRows: RDD[InternalRow]): RDD[InternalRow] = {
- val nonPartitionColumns = requiredColumns.filterNot(partitionColumns.contains)
-
// If output columns contain any partition column(s), we need to merge scanned data
// columns and requested partition columns to form the final result.
- if (!requiredColumns.sameElements(nonPartitionColumns)) {
- val mergers = requiredColumns.zipWithIndex.map { case (name, index) =>
- // To see whether the `index`-th column is a partition column...
- val i = partitionColumns.indexOf(name)
- if (i != -1) {
- val dt = schema(partitionColumns(i)).dataType
- // If yes, gets column value from partition values.
- (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => {
- mutableRow(ordinal) = partitionValues.get(i, dt)
- }
- } else {
- // Otherwise, inherits the value from scanned data.
- val i = nonPartitionColumns.indexOf(name)
- val dt = schema(nonPartitionColumns(i)).dataType
- (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => {
- mutableRow(ordinal) = dataRow.get(i, dt)
- }
- }
+ if (requiredColumns != dataColumns) {
+ // Builds `AttributeReference`s for all partition columns so that we can use them to project
+ // required partition columns. Note that if a partition column appears in `requiredColumns`,
+ // we should use the `AttributeReference` in `requiredColumns`.
+ val requiredColumnMap = requiredColumns.map(a => a.name -> a).toMap
+ val partitionColumns = partitionColumnSchema.toAttributes.map { a =>
+ requiredColumnMap.getOrElse(a.name, a)
}
- // Since we know for sure that this closure is serializable, we can avoid the overhead
- // of cleaning a closure for each RDD by creating our own MapPartitionsRDD. Functionally
- // this is equivalent to calling `dataRows.mapPartitions(mapPartitionsFunc)` (SPARK-7718).
val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: Iterator[InternalRow]) => {
- val dataTypes = requiredColumns.map(schema(_).dataType)
- val mutableRow = new SpecificMutableRow(dataTypes)
- iterator.map { dataRow =>
- var i = 0
- while (i < mutableRow.numFields) {
- mergers(i)(mutableRow, dataRow, i)
- i += 1
- }
- mutableRow.asInstanceOf[InternalRow]
- }
+ val projection = UnsafeProjection.create(requiredColumns, dataColumns ++ partitionColumns)
+ val mutableJoinedRow = new JoinedRow()
+ iterator.map(dataRow => projection(mutableJoinedRow(dataRow, partitionValues)))
}
// This is an internal RDD whose call site the user should not be concerned with
@@ -242,7 +218,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
Utils.withDummyCallSite(dataRows.sparkContext) {
new MapPartitionsRDD(dataRows, mapPartitionsFunc, preservesPartitioning = false)
}
-
} else {
dataRows
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org