You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by liancheng <gi...@git.apache.org> on 2015/10/13 23:50:01 UTC

[GitHub] spark pull request: [SPARK-11088] [SQL] Merges partition values us...

GitHub user liancheng opened a pull request:

    https://github.com/apache/spark/pull/9104

    [SPARK-11088] [SQL] Merges partition values using UnsafeProjection

    `DataSourceStrategy.mergeWithPartitionValues` is essentially a projection implemented in a quite inefficient way. This PR optimizes this method with `UnsafeProjection` to avoid unnecessary boxing costs.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/liancheng/spark spark-11088.faster-partition-values-merging

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/9104.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #9104
    
----
commit 23a0fc2ef86daa8faa785ef2ea3f1d7b5d1b692c
Author: Cheng Lian <li...@databricks.com>
Date:   2015-10-13T18:04:36Z

    Merges partition values using UnsafeProjection

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11088] [SQL] Merges partition values us...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/9104#issuecomment-147872722
  
    Can we also remove an extra `ConvertToUnsafe` here?  Specifically, is the parquet table scan still claiming to produce safe rows when its really producing unsafe ones now?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11088] [SQL] Merges partition values us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9104#issuecomment-147865822
  
      [Test build #43678 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43678/consoleFull) for   PR 9104 at commit [`23a0fc2`](https://github.com/apache/spark/commit/23a0fc2ef86daa8faa785ef2ea3f1d7b5d1b692c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11088] [SQL] Merges partition values us...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/9104


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11088] [SQL] Merges partition values us...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on the pull request:

    https://github.com/apache/spark/pull/9104#issuecomment-148226197
  
    @marmbrus Addressed the `ConvertToUnsafe` issue in PR #9125, since this is mostly a separate issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11088] [SQL] Merges partition values us...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on the pull request:

    https://github.com/apache/spark/pull/9104#issuecomment-147872104
  
    Micro-benchmark result with TPC-DS (scale-factor 15) `store_sales` table shows a ~12% performance gain.
    
    Before:
    
    - Round 0: 8133 ms
    - Round 1: 7799 ms
    - Round 2: 8010 ms
    - Round 3: 8009 ms
    - Round 4: 8223 ms
    - Average: 8034.8 ms
    
    After:
    
    - Round 0: 7401 ms
    - Round 1: 6897 ms
    - Round 2: 6873 ms
    - Round 3: 6935 ms
    - Round 4: 7056 ms
    - Average: 7032.4 ms
    
    Benchmark code (where `ss_sold_date_sk` is an `INT` partitioning column and `ss_sold_time_sk` is an `INT` data column):
    
    ```scala
    import com.google.common.base.Stopwatch
    
    def benchmark(runs: Int, warmupRuns: Int = 0)(f: => Unit) {
      val stopwatch = new Stopwatch()
    
      (0 until warmupRuns).foreach { i =>
        f
      }
    
      def run(i: Int) = {
        stopwatch.reset()
        stopwatch.start()
        f
        stopwatch.stop()
        val elapsed = stopwatch.elapsedMillis()
        println(s"Round $i: $elapsed ms")
        elapsed
      }
    
      val total = (0 until runs).map(i => run(i)).sum.toDouble
      println(s"Average: ${total / runs} ms")
    }
    
    val path = "file:///Users/lian/tpcds/sf15/store_sales"
    
    benchmark(5, 5) {
      val df = sqlContext.read.parquet(path).selectExpr("ss_sold_time_sk", "ss_sold_date_sk")
      df.queryExecution.toRdd.foreach(row => ())
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11088] [SQL] Merges partition values us...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9104#discussion_r41932293
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ---
    @@ -178,52 +179,26 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
         sparkPlan
       }
     
    -  // TODO: refactor this thing. It is very complicated because it does projection internally.
    -  // We should just put a project on top of this.
       private def mergeWithPartitionValues(
    -      schema: StructType,
    -      requiredColumns: Array[String],
    -      partitionColumns: Array[String],
    +      requiredColumns: Seq[Attribute],
    +      dataColumns: Seq[Attribute],
    +      partitionColumnSchema: StructType,
           partitionValues: InternalRow,
           dataRows: RDD[InternalRow]): RDD[InternalRow] = {
    -    val nonPartitionColumns = requiredColumns.filterNot(partitionColumns.contains)
    -
         // If output columns contain any partition column(s), we need to merge scanned data
         // columns and requested partition columns to form the final result.
    -    if (!requiredColumns.sameElements(nonPartitionColumns)) {
    -      val mergers = requiredColumns.zipWithIndex.map { case (name, index) =>
    -        // To see whether the `index`-th column is a partition column...
    -        val i = partitionColumns.indexOf(name)
    -        if (i != -1) {
    -          val dt = schema(partitionColumns(i)).dataType
    -          // If yes, gets column value from partition values.
    -          (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => {
    -            mutableRow(ordinal) = partitionValues.get(i, dt)
    -          }
    -        } else {
    -          // Otherwise, inherits the value from scanned data.
    -          val i = nonPartitionColumns.indexOf(name)
    -          val dt = schema(nonPartitionColumns(i)).dataType
    -          (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => {
    -            mutableRow(ordinal) = dataRow.get(i, dt)
    -          }
    -        }
    +    if (requiredColumns != dataColumns) {
    +      // Builds `AttributeReference`s for all partition columns so that we can use them to project
    +      // required partition columns.  Note that if a partition column appears in `requiredColumns`,
    +      // we should use the `AttributeReference` in `requiredColumns`.
    +      val requiredColumnMap = requiredColumns.map(a => a.name -> a).toMap
    +      val partitionColumns = partitionColumnSchema.toAttributes.map { a =>
    +        requiredColumnMap.getOrElse(a.name, a)
           }
     
    -      // Since we know for sure that this closure is serializable, we can avoid the overhead
    -      // of cleaning a closure for each RDD by creating our own MapPartitionsRDD. Functionally
    -      // this is equivalent to calling `dataRows.mapPartitions(mapPartitionsFunc)` (SPARK-7718).
           val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: Iterator[InternalRow]) => {
    -        val dataTypes = requiredColumns.map(schema(_).dataType)
    -        val mutableRow = new SpecificMutableRow(dataTypes)
    -        iterator.map { dataRow =>
    -          var i = 0
    -          while (i < mutableRow.numFields) {
    -            mergers(i)(mutableRow, dataRow, i)
    -            i += 1
    -          }
    -          mutableRow.asInstanceOf[InternalRow]
    -        }
    +        val projection = UnsafeProjection.create(requiredColumns, dataColumns ++ partitionColumns)
    +        iterator.map(dataRow => projection(new JoinedRow(dataRow, partitionValues)))
    --- End diff --
    
    Do we have to allocate a new JoinedRow each time?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11088] [SQL] Merges partition values us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9104#issuecomment-147897476
  
      [Test build #43687 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43687/console) for   PR 9104 at commit [`4cadd1c`](https://github.com/apache/spark/commit/4cadd1cca4ceb095c1a8854acf3b46788dd88368).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11088] [SQL] Merges partition values us...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/9104#issuecomment-149378930
  
    LGTM, merging to master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11088] [SQL] Merges partition values us...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9104#issuecomment-147864406
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11088] [SQL] Merges partition values us...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9104#issuecomment-147889506
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11088] [SQL] Merges partition values us...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9104#issuecomment-147880523
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11088] [SQL] Merges partition values us...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9104#issuecomment-147897580
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11088] [SQL] Merges partition values us...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9104#discussion_r41934985
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ---
    @@ -178,52 +179,26 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
         sparkPlan
       }
     
    -  // TODO: refactor this thing. It is very complicated because it does projection internally.
    -  // We should just put a project on top of this.
       private def mergeWithPartitionValues(
    -      schema: StructType,
    -      requiredColumns: Array[String],
    -      partitionColumns: Array[String],
    +      requiredColumns: Seq[Attribute],
    +      dataColumns: Seq[Attribute],
    +      partitionColumnSchema: StructType,
           partitionValues: InternalRow,
           dataRows: RDD[InternalRow]): RDD[InternalRow] = {
    -    val nonPartitionColumns = requiredColumns.filterNot(partitionColumns.contains)
    -
         // If output columns contain any partition column(s), we need to merge scanned data
         // columns and requested partition columns to form the final result.
    -    if (!requiredColumns.sameElements(nonPartitionColumns)) {
    -      val mergers = requiredColumns.zipWithIndex.map { case (name, index) =>
    -        // To see whether the `index`-th column is a partition column...
    -        val i = partitionColumns.indexOf(name)
    -        if (i != -1) {
    -          val dt = schema(partitionColumns(i)).dataType
    -          // If yes, gets column value from partition values.
    -          (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => {
    -            mutableRow(ordinal) = partitionValues.get(i, dt)
    -          }
    -        } else {
    -          // Otherwise, inherits the value from scanned data.
    -          val i = nonPartitionColumns.indexOf(name)
    -          val dt = schema(nonPartitionColumns(i)).dataType
    -          (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => {
    -            mutableRow(ordinal) = dataRow.get(i, dt)
    -          }
    -        }
    +    if (requiredColumns != dataColumns) {
    +      // Builds `AttributeReference`s for all partition columns so that we can use them to project
    +      // required partition columns.  Note that if a partition column appears in `requiredColumns`,
    +      // we should use the `AttributeReference` in `requiredColumns`.
    +      val requiredColumnMap = requiredColumns.map(a => a.name -> a).toMap
    +      val partitionColumns = partitionColumnSchema.toAttributes.map { a =>
    +        requiredColumnMap.getOrElse(a.name, a)
           }
     
    -      // Since we know for sure that this closure is serializable, we can avoid the overhead
    -      // of cleaning a closure for each RDD by creating our own MapPartitionsRDD. Functionally
    -      // this is equivalent to calling `dataRows.mapPartitions(mapPartitionsFunc)` (SPARK-7718).
           val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: Iterator[InternalRow]) => {
    -        val dataTypes = requiredColumns.map(schema(_).dataType)
    -        val mutableRow = new SpecificMutableRow(dataTypes)
    -        iterator.map { dataRow =>
    -          var i = 0
    -          while (i < mutableRow.numFields) {
    -            mergers(i)(mutableRow, dataRow, i)
    -            i += 1
    -          }
    -          mutableRow.asInstanceOf[InternalRow]
    -        }
    +        val projection = UnsafeProjection.create(requiredColumns, dataColumns ++ partitionColumns)
    +        iterator.map(dataRow => projection(new JoinedRow(dataRow, partitionValues)))
    --- End diff --
    
    That's a good point, didn't realize `JoinedRow` is mutable. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11088] [SQL] Merges partition values us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9104#issuecomment-147889399
  
      [Test build #43678 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43678/console) for   PR 9104 at commit [`23a0fc2`](https://github.com/apache/spark/commit/23a0fc2ef86daa8faa785ef2ea3f1d7b5d1b692c).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11088] [SQL] Merges partition values us...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9104#issuecomment-147889508
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43678/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11088] [SQL] Merges partition values us...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9104#issuecomment-147897582
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43687/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11088] [SQL] Merges partition values us...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9104#issuecomment-147864429
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11088] [SQL] Merges partition values us...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9104#issuecomment-147880534
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11088] [SQL] Merges partition values us...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9104#discussion_r41938703
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ---
    @@ -178,52 +179,26 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
         sparkPlan
       }
     
    -  // TODO: refactor this thing. It is very complicated because it does projection internally.
    -  // We should just put a project on top of this.
       private def mergeWithPartitionValues(
    -      schema: StructType,
    -      requiredColumns: Array[String],
    -      partitionColumns: Array[String],
    +      requiredColumns: Seq[Attribute],
    +      dataColumns: Seq[Attribute],
    +      partitionColumnSchema: StructType,
           partitionValues: InternalRow,
           dataRows: RDD[InternalRow]): RDD[InternalRow] = {
    -    val nonPartitionColumns = requiredColumns.filterNot(partitionColumns.contains)
    -
         // If output columns contain any partition column(s), we need to merge scanned data
         // columns and requested partition columns to form the final result.
    -    if (!requiredColumns.sameElements(nonPartitionColumns)) {
    -      val mergers = requiredColumns.zipWithIndex.map { case (name, index) =>
    -        // To see whether the `index`-th column is a partition column...
    -        val i = partitionColumns.indexOf(name)
    -        if (i != -1) {
    -          val dt = schema(partitionColumns(i)).dataType
    -          // If yes, gets column value from partition values.
    -          (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => {
    -            mutableRow(ordinal) = partitionValues.get(i, dt)
    -          }
    -        } else {
    -          // Otherwise, inherits the value from scanned data.
    -          val i = nonPartitionColumns.indexOf(name)
    -          val dt = schema(nonPartitionColumns(i)).dataType
    -          (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => {
    -            mutableRow(ordinal) = dataRow.get(i, dt)
    -          }
    -        }
    +    if (requiredColumns != dataColumns) {
    +      // Builds `AttributeReference`s for all partition columns so that we can use them to project
    +      // required partition columns.  Note that if a partition column appears in `requiredColumns`,
    +      // we should use the `AttributeReference` in `requiredColumns`.
    +      val requiredColumnMap = requiredColumns.map(a => a.name -> a).toMap
    +      val partitionColumns = partitionColumnSchema.toAttributes.map { a =>
    +        requiredColumnMap.getOrElse(a.name, a)
           }
     
    -      // Since we know for sure that this closure is serializable, we can avoid the overhead
    -      // of cleaning a closure for each RDD by creating our own MapPartitionsRDD. Functionally
    -      // this is equivalent to calling `dataRows.mapPartitions(mapPartitionsFunc)` (SPARK-7718).
           val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: Iterator[InternalRow]) => {
    -        val dataTypes = requiredColumns.map(schema(_).dataType)
    -        val mutableRow = new SpecificMutableRow(dataTypes)
    -        iterator.map { dataRow =>
    -          var i = 0
    -          while (i < mutableRow.numFields) {
    -            mergers(i)(mutableRow, dataRow, i)
    -            i += 1
    -          }
    -          mutableRow.asInstanceOf[InternalRow]
    -        }
    +        val projection = UnsafeProjection.create(requiredColumns, dataColumns ++ partitionColumns)
    +        iterator.map(dataRow => projection(new JoinedRow(dataRow, partitionValues)))
    --- End diff --
    
    Updated, although reusing `JoinedRow` doesn't bring noticeable speedup in my micro-benchmark.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11088] [SQL] Merges partition values us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9104#issuecomment-147880680
  
      [Test build #43687 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43687/consoleFull) for   PR 9104 at commit [`4cadd1c`](https://github.com/apache/spark/commit/4cadd1cca4ceb095c1a8854acf3b46788dd88368).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org