You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by cloud-fan <gi...@git.apache.org> on 2017/02/12 09:19:26 UTC

[GitHub] spark pull request #16898: [SPARK-19563][SQL] advoid unnecessary sort in Fil...

GitHub user cloud-fan opened a pull request:

    https://github.com/apache/spark/pull/16898

    [SPARK-19563][SQL] advoid unnecessary sort in FileFormatWriter

    ## What changes were proposed in this pull request?
    
    In `FileFormatWriter`, we will sort the input rows by partition columns and bucket id and sort columns, if we want to write data out partitioned or bucketed.
    
    However, if the data is already sorted, we will sort it again, which is unnecssary.
    
    This PR removes the sorting logic in `FileFormatWriter` and use `SortExec` instead. We will not add `SortExec` if the data is already sorted.
    
    ## How was this patch tested?
    
    I did a micro benchmark manually
    ```
    val df = spark.range(10000000).select($"id", $"id" % 10 as "part").sort("part")
    spark.time(df.write.partitionBy("part").parquet("/tmp/test"))
    ```
    The result was about 6.4 seconds before this PR, and is 5.7 seconds afterwards.
    
    close https://github.com/apache/spark/pull/16724

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/cloud-fan/spark writer

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/16898.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #16898
    
----
commit 602a1e48f2ca0c8286601a35c40b0e0f23331f23
Author: Wenchen Fan <we...@databricks.com>
Date:   2017-02-12T09:01:53Z

    advoid unnecessary sort in FileFormatWriter

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] advoid unnecessary sort in Fil...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r100687410
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -134,8 +142,23 @@ object FileFormatWriter extends Logging {
           // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
           committer.setupJob(job)
     
    +      val bucketIdExpression = bucketSpec.map { spec =>
    +        // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
    +        // guarantee the data distribution is same between shuffle and bucketed data source, which
    +        // enables us to only shuffle one side when join a bucketed table and a normal one.
    +        HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
    +      }
    +      // We should first sort by partition columns, then bucket id, and finally sorting columns.
    +      val requiredOrdering = (partitionColumns ++ bucketIdExpression ++ sortColumns)
    +        .map(SortOrder(_, Ascending))
    +      val rdd = if (requiredOrdering == queryExecution.executedPlan.outputOrdering) {
    +        queryExecution.toRdd
    +      } else {
    +        SortExec(requiredOrdering, global = false, queryExecution.executedPlan).execute()
    --- End diff --
    
    Using `SortExec` here is clever.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r101888000
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -108,9 +107,21 @@ object FileFormatWriter extends Logging {
         job.setOutputValueClass(classOf[InternalRow])
         FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))
     
    +    val allColumns = queryExecution.logical.output
         val partitionSet = AttributeSet(partitionColumns)
         val dataColumns = queryExecution.logical.output.filterNot(partitionSet.contains)
    --- End diff --
    
    If we rewrite it to `val dataColumns = allColumns.filterNot(partitionColumns.contains)`, we do not need `partitionSet `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r101938607
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -108,9 +107,21 @@ object FileFormatWriter extends Logging {
         job.setOutputValueClass(classOf[InternalRow])
         FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))
     
    +    val allColumns = queryExecution.logical.output
         val partitionSet = AttributeSet(partitionColumns)
         val dataColumns = queryExecution.logical.output.filterNot(partitionSet.contains)
    --- End diff --
    
    it's so minor, I'll fix it in my next PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...

Posted by leachbj <gi...@git.apache.org>.
Github user leachbj commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r208094538
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -119,23 +130,45 @@ object FileFormatWriter extends Logging {
           uuid = UUID.randomUUID().toString,
           serializableHadoopConf = new SerializableConfiguration(job.getConfiguration),
           outputWriterFactory = outputWriterFactory,
    -      allColumns = queryExecution.logical.output,
    -      partitionColumns = partitionColumns,
    +      allColumns = allColumns,
           dataColumns = dataColumns,
    -      bucketSpec = bucketSpec,
    +      partitionColumns = partitionColumns,
    +      bucketIdExpression = bucketIdExpression,
           path = outputSpec.outputPath,
           customPartitionLocations = outputSpec.customPartitionLocations,
           maxRecordsPerFile = options.get("maxRecordsPerFile").map(_.toLong)
             .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile)
         )
     
    +    // We should first sort by partition columns, then bucket id, and finally sorting columns.
    +    val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns
    +    // the sort order doesn't matter
    +    val actualOrdering = queryExecution.executedPlan.outputOrdering.map(_.child)
    --- End diff --
    
    @cloud-fan would it be possible to use the logical plan rather than the executedPlan?  If the optimizer decides the data is already sorted according according to the logical plan the executedPlan won't include the fields.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] advoid unnecessary sort in Fil...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r100692456
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -134,8 +142,26 @@ object FileFormatWriter extends Logging {
           // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
           committer.setupJob(job)
     
    +      val bucketIdExpression = bucketSpec.map { spec =>
    +        // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
    +        // guarantee the data distribution is same between shuffle and bucketed data source, which
    +        // enables us to only shuffle one side when join a bucketed table and a normal one.
    +        HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
    +      }
    +      // We should first sort by partition columns, then bucket id, and finally sorting columns.
    +      val requiredOrdering = (partitionColumns ++ bucketIdExpression ++ sortColumns)
    +        .map(SortOrder(_, Ascending))
    +      val actualOrdering = queryExecution.executedPlan.outputOrdering
    +      // We can still avoid the sort if the required ordering is [partCol] and the actual ordering
    +      // is [partCol, anotherCol].
    +      val rdd = if (requiredOrdering == actualOrdering.take(requiredOrdering.length)) {
    +        queryExecution.toRdd
    +      } else {
    +        SortExec(requiredOrdering, global = false, queryExecution.executedPlan).execute()
    --- End diff --
    
    Good catch!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] avoid unnecessary sort in FileFormatW...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    **[Test build #72945 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72945/testReport)** for PR 16898 at commit [`728e1c8`](https://github.com/apache/spark/commit/728e1c8eef5d081b499f3e105a0dc13302d8e799).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] advoid unnecessary sort in FileFormat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] avoid unnecessary sort in FileFormatW...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73059/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/16898


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] advoid unnecessary sort in FileFormat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    **[Test build #72764 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72764/testReport)** for PR 16898 at commit [`602a1e4`](https://github.com/apache/spark/commit/602a1e48f2ca0c8286601a35c40b0e0f23331f23).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] advoid unnecessary sort in Fil...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r100692348
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -134,8 +142,26 @@ object FileFormatWriter extends Logging {
           // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
           committer.setupJob(job)
     
    +      val bucketIdExpression = bucketSpec.map { spec =>
    +        // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
    +        // guarantee the data distribution is same between shuffle and bucketed data source, which
    +        // enables us to only shuffle one side when join a bucketed table and a normal one.
    +        HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
    +      }
    +      // We should first sort by partition columns, then bucket id, and finally sorting columns.
    +      val requiredOrdering = (partitionColumns ++ bucketIdExpression ++ sortColumns)
    +        .map(SortOrder(_, Ascending))
    +      val actualOrdering = queryExecution.executedPlan.outputOrdering
    +      // We can still avoid the sort if the required ordering is [partCol] and the actual ordering
    +      // is [partCol, anotherCol].
    +      val rdd = if (requiredOrdering == actualOrdering.take(requiredOrdering.length)) {
    +        queryExecution.toRdd
    +      } else {
    +        SortExec(requiredOrdering, global = false, queryExecution.executedPlan).execute()
    --- End diff --
    
    Oh, I met this case before IIRC. This complains in Scala 2.10. I guess it should be 
    
    ```
    SortExec(requiredOrdering, global = false, child = queryExecution.executedPlan).execute()
    ```
    
    because it seems the complier gets confused the positional/named  arguments. (this is actually invalid syntax in Python).



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] avoid unnecessary sort in FileFormatW...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    **[Test build #72942 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72942/testReport)** for PR 16898 at commit [`fc591d1`](https://github.com/apache/spark/commit/fc591d18601e9c2c86d3df80b7ebbf43bc403d27).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] advoid unnecessary sort in FileFormat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    **[Test build #72764 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72764/testReport)** for PR 16898 at commit [`602a1e4`](https://github.com/apache/spark/commit/602a1e48f2ca0c8286601a35c40b0e0f23331f23).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] advoid unnecessary sort in Fil...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r100877180
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -134,8 +142,26 @@ object FileFormatWriter extends Logging {
           // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
           committer.setupJob(job)
     
    +      val bucketIdExpression = bucketSpec.map { spec =>
    +        // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
    +        // guarantee the data distribution is same between shuffle and bucketed data source, which
    +        // enables us to only shuffle one side when join a bucketed table and a normal one.
    +        HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
    +      }
    +      // We should first sort by partition columns, then bucket id, and finally sorting columns.
    +      val requiredOrdering = (partitionColumns ++ bucketIdExpression ++ sortColumns)
    --- End diff --
    
    does it make sense to sort over partition columns in a bucket? I'm surprised if we support this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] advoid unnecessary sort in Fil...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r100687347
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -134,8 +142,23 @@ object FileFormatWriter extends Logging {
           // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
           committer.setupJob(job)
     
    +      val bucketIdExpression = bucketSpec.map { spec =>
    +        // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
    +        // guarantee the data distribution is same between shuffle and bucketed data source, which
    +        // enables us to only shuffle one side when join a bucketed table and a normal one.
    +        HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
    +      }
    +      // We should first sort by partition columns, then bucket id, and finally sorting columns.
    +      val requiredOrdering = (partitionColumns ++ bucketIdExpression ++ sortColumns)
    +        .map(SortOrder(_, Ascending))
    +      val rdd = if (requiredOrdering == queryExecution.executedPlan.outputOrdering) {
    --- End diff --
    
    If data's outputOrdering is [partCol1, partCol2, dataCol1], here the requiredOrdering is [partCol1, partCol2], you will miss this optimization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] avoid unnecessary sort in FileFormatW...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] avoid unnecessary sort in FileFormatW...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    **[Test build #72942 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72942/testReport)** for PR 16898 at commit [`fc591d1`](https://github.com/apache/spark/commit/fc591d18601e9c2c86d3df80b7ebbf43bc403d27).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] avoid unnecessary sort in FileFormatW...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] advoid unnecessary sort in FileFormat...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    Few comments. Others LGTM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] avoid unnecessary sort in FileFormatW...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    **[Test build #73061 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73061/testReport)** for PR 16898 at commit [`83053ef`](https://github.com/apache/spark/commit/83053ef59d97abe1b9d05e0cf6184a00d261da25).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] advoid unnecessary sort in Fil...

Posted by tejasapatil <gi...@git.apache.org>.
Github user tejasapatil commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r100697621
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -189,7 +215,7 @@ object FileFormatWriter extends Logging {
         committer.setupTask(taskAttemptContext)
     
         val writeTask =
    -      if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) {
    +      if (description.partitionColumns.isEmpty && description.numBuckets == 0) {
    --- End diff --
    
    For someone reading the code, this might be non intuitive to understand that you are checking if there is no bucketing. `0` has been used in many places in this PR to check if table has bucketing. Maybe orthogonal to the PR, but in general we could have a util method to do this. I can send a tiny PR for this if you agree that its a good thing to do.
    
    PS: Having 0 buckets is a thing in Hive however logically it makes no sense and confusing. Under the hood, it treats that as a table with single bucket. Its good that Spark does not allow this.
    
    ```
    # hive-1.2.1
    
    hive> CREATE TABLE tejasp_temp_can_be_deleted (key string, value string) CLUSTERED BY (key) INTO 0 BUCKETS;
    Time taken: 1.144 seconds
    
    hive> desc formatted tejasp_temp_can_be_deleted;
    
    # Storage Information
    ...
    Num Buckets:        	0
    Bucket Columns:     	[key]
    Sort Columns:       	[]
    
    hive>INSERT OVERWRITE TABLE tejasp_temp_can_be_deleted SELECT * FROM ....;
    # doing `ls` on the output directory shows a single file
    ```
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] avoid unnecessary sort in FileFormatW...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72945/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r208098556
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -119,23 +130,45 @@ object FileFormatWriter extends Logging {
           uuid = UUID.randomUUID().toString,
           serializableHadoopConf = new SerializableConfiguration(job.getConfiguration),
           outputWriterFactory = outputWriterFactory,
    -      allColumns = queryExecution.logical.output,
    -      partitionColumns = partitionColumns,
    +      allColumns = allColumns,
           dataColumns = dataColumns,
    -      bucketSpec = bucketSpec,
    +      partitionColumns = partitionColumns,
    +      bucketIdExpression = bucketIdExpression,
           path = outputSpec.outputPath,
           customPartitionLocations = outputSpec.customPartitionLocations,
           maxRecordsPerFile = options.get("maxRecordsPerFile").map(_.toLong)
             .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile)
         )
     
    +    // We should first sort by partition columns, then bucket id, and finally sorting columns.
    +    val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns
    +    // the sort order doesn't matter
    +    val actualOrdering = queryExecution.executedPlan.outputOrdering.map(_.child)
    --- End diff --
    
    That would be great, but may need some refactoring.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] advoid unnecessary sort in Fil...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r100692415
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -134,8 +142,26 @@ object FileFormatWriter extends Logging {
           // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
           committer.setupJob(job)
     
    +      val bucketIdExpression = bucketSpec.map { spec =>
    +        // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
    +        // guarantee the data distribution is same between shuffle and bucketed data source, which
    +        // enables us to only shuffle one side when join a bucketed table and a normal one.
    +        HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
    +      }
    +      // We should first sort by partition columns, then bucket id, and finally sorting columns.
    +      val requiredOrdering = (partitionColumns ++ bucketIdExpression ++ sortColumns)
    +        .map(SortOrder(_, Ascending))
    +      val actualOrdering = queryExecution.executedPlan.outputOrdering
    +      // We can still avoid the sort if the required ordering is [partCol] and the actual ordering
    +      // is [partCol, anotherCol].
    +      val rdd = if (requiredOrdering == actualOrdering.take(requiredOrdering.length)) {
    +        queryExecution.toRdd
    +      } else {
    +        SortExec(requiredOrdering, global = false, queryExecution.executedPlan).execute()
    --- End diff --
    
    Yea, it seems it complains.
    
    ```
    [error] .../spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala:160: not enough arguments for method apply: (sortOrder: Seq[org.apache.spark.sql.catalyst.expressions.SortOrder], global: Boolean, child: org.apache.spark.sql.execution.SparkPlan, testSpillFrequency: Int)org.apache.spark.sql.execution.SortExec in object SortExec.
    [error] Unspecified value parameter child.
    [error]         SortExec(requiredOrdering, global = false, queryExecution.executedPlan).execute()
    [error]
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] advoid unnecessary sort in Fil...

Posted by tejasapatil <gi...@git.apache.org>.
Github user tejasapatil commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r100949852
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -134,8 +142,26 @@ object FileFormatWriter extends Logging {
           // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
           committer.setupJob(job)
     
    +      val bucketIdExpression = bucketSpec.map { spec =>
    +        // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
    +        // guarantee the data distribution is same between shuffle and bucketed data source, which
    +        // enables us to only shuffle one side when join a bucketed table and a normal one.
    +        HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
    +      }
    +      // We should first sort by partition columns, then bucket id, and finally sorting columns.
    +      val requiredOrdering = (partitionColumns ++ bucketIdExpression ++ sortColumns)
    --- End diff --
    
    https://issues.apache.org/jira/browse/SPARK-19587


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r101938559
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -119,23 +130,45 @@ object FileFormatWriter extends Logging {
           uuid = UUID.randomUUID().toString,
           serializableHadoopConf = new SerializableConfiguration(job.getConfiguration),
           outputWriterFactory = outputWriterFactory,
    -      allColumns = queryExecution.logical.output,
    -      partitionColumns = partitionColumns,
    +      allColumns = allColumns,
           dataColumns = dataColumns,
    -      bucketSpec = bucketSpec,
    +      partitionColumns = partitionColumns,
    +      bucketIdExpression = bucketIdExpression,
           path = outputSpec.outputPath,
           customPartitionLocations = outputSpec.customPartitionLocations,
           maxRecordsPerFile = options.get("maxRecordsPerFile").map(_.toLong)
             .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile)
         )
     
    +    // We should first sort by partition columns, then bucket id, and finally sorting columns.
    +    val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns
    +    // the sort order doesn't matter
    +    val actualOrdering = queryExecution.executedPlan.outputOrdering.map(_.child)
    +    val orderingMatched = if (requiredOrdering.length > actualOrdering.length) {
    +      false
    +    } else {
    +      requiredOrdering.zip(actualOrdering).forall {
    +        case (requiredOrder, childOutputOrder) =>
    +          requiredOrder.semanticEquals(childOutputOrder)
    --- End diff --
    
    it's `HashPartitioning(...).partitionIdExpression`, which returns `Pmod(new Murmur3Hash(expressions), Literal(numPartitions))`, so it may match


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] advoid unnecessary sort in Fil...

Posted by tejasapatil <gi...@git.apache.org>.
Github user tejasapatil commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r100947229
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -134,8 +142,26 @@ object FileFormatWriter extends Logging {
           // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
           committer.setupJob(job)
     
    +      val bucketIdExpression = bucketSpec.map { spec =>
    +        // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
    +        // guarantee the data distribution is same between shuffle and bucketed data source, which
    +        // enables us to only shuffle one side when join a bucketed table and a normal one.
    +        HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
    +      }
    +      // We should first sort by partition columns, then bucket id, and finally sorting columns.
    +      val requiredOrdering = (partitionColumns ++ bucketIdExpression ++ sortColumns)
    --- End diff --
    
    It does not make sense (I thought it was intentional). This should definitely be fixed. I digged the commit logs to see that this was fixed for bucketing columns in https://github.com/apache/spark/pull/10891 but no discussion around sort columns. Will log a JIRA for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r101887954
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -119,23 +130,45 @@ object FileFormatWriter extends Logging {
           uuid = UUID.randomUUID().toString,
           serializableHadoopConf = new SerializableConfiguration(job.getConfiguration),
           outputWriterFactory = outputWriterFactory,
    -      allColumns = queryExecution.logical.output,
    -      partitionColumns = partitionColumns,
    +      allColumns = allColumns,
           dataColumns = dataColumns,
    -      bucketSpec = bucketSpec,
    +      partitionColumns = partitionColumns,
    +      bucketIdExpression = bucketIdExpression,
           path = outputSpec.outputPath,
           customPartitionLocations = outputSpec.customPartitionLocations,
           maxRecordsPerFile = options.get("maxRecordsPerFile").map(_.toLong)
             .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile)
         )
     
    +    // We should first sort by partition columns, then bucket id, and finally sorting columns.
    +    val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns
    --- End diff --
    
    `bucketIdExpression` should be replaced by `bucketColumns`, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] advoid unnecessary sort in Fil...

Posted by tejasapatil <gi...@git.apache.org>.
Github user tejasapatil commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r100697356
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -134,8 +142,26 @@ object FileFormatWriter extends Logging {
           // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
           committer.setupJob(job)
     
    +      val bucketIdExpression = bucketSpec.map { spec =>
    +        // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
    +        // guarantee the data distribution is same between shuffle and bucketed data source, which
    +        // enables us to only shuffle one side when join a bucketed table and a normal one.
    +        HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
    +      }
    +      // We should first sort by partition columns, then bucket id, and finally sorting columns.
    +      val requiredOrdering = (partitionColumns ++ bucketIdExpression ++ sortColumns)
    +        .map(SortOrder(_, Ascending))
    +      val actualOrdering = queryExecution.executedPlan.outputOrdering
    +      // We can still avoid the sort if the required ordering is [partCol] and the actual ordering
    --- End diff --
    
    The comment makes it feel like its specific to partition columns but it the code below does not have anything specific to partition columns


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] advoid unnecessary sort in FileFormat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] avoid unnecessary sort in FileFormatW...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    **[Test build #73059 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73059/testReport)** for PR 16898 at commit [`728e1c8`](https://github.com/apache/spark/commit/728e1c8eef5d081b499f3e105a0dc13302d8e799).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] avoid unnecessary sort in FileFormatW...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    cc @tejasapatil how is the updated version?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] avoid unnecessary sort in FileFormatW...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] advoid unnecessary sort in FileFormat...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    cc @viirya @gatorsmile 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r101888508
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -119,23 +130,45 @@ object FileFormatWriter extends Logging {
           uuid = UUID.randomUUID().toString,
           serializableHadoopConf = new SerializableConfiguration(job.getConfiguration),
           outputWriterFactory = outputWriterFactory,
    -      allColumns = queryExecution.logical.output,
    -      partitionColumns = partitionColumns,
    +      allColumns = allColumns,
           dataColumns = dataColumns,
    -      bucketSpec = bucketSpec,
    +      partitionColumns = partitionColumns,
    +      bucketIdExpression = bucketIdExpression,
           path = outputSpec.outputPath,
           customPartitionLocations = outputSpec.customPartitionLocations,
           maxRecordsPerFile = options.get("maxRecordsPerFile").map(_.toLong)
             .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile)
         )
     
    +    // We should first sort by partition columns, then bucket id, and finally sorting columns.
    +    val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns
    +    // the sort order doesn't matter
    +    val actualOrdering = queryExecution.executedPlan.outputOrdering.map(_.child)
    +    val orderingMatched = if (requiredOrdering.length > actualOrdering.length) {
    +      false
    +    } else {
    +      requiredOrdering.zip(actualOrdering).forall {
    +        case (requiredOrder, childOutputOrder) =>
    +          requiredOrder.semanticEquals(childOutputOrder)
    --- End diff --
    
    Because `bucketIdExpression` is `HashPartitioning`, this will never match, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r101938630
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -287,31 +320,16 @@ object FileFormatWriter extends Logging {
        * multiple directories (partitions) or files (bucketing).
        */
       private class DynamicPartitionWriteTask(
    -      description: WriteJobDescription,
    +      desc: WriteJobDescription,
    --- End diff --
    
    I'd like to change both to make it consistent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] advoid unnecessary sort in FileFormat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    **[Test build #72767 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72767/testReport)** for PR 16898 at commit [`00e2f22`](https://github.com/apache/spark/commit/00e2f22704a32cc2ceb0712139f5c084253c48d1).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] avoid unnecessary sort in FileFormatW...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    **[Test build #72945 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72945/testReport)** for PR 16898 at commit [`728e1c8`](https://github.com/apache/spark/commit/728e1c8eef5d081b499f3e105a0dc13302d8e799).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r101888059
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -287,31 +320,16 @@ object FileFormatWriter extends Logging {
        * multiple directories (partitions) or files (bucketing).
        */
       private class DynamicPartitionWriteTask(
    -      description: WriteJobDescription,
    +      desc: WriteJobDescription,
    --- End diff --
    
    `SingleDirectoryWriteTask` is still using `description`. Change both or keep it unchanged?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] avoid unnecessary sort in FileFormatW...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    **[Test build #72955 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72955/testReport)** for PR 16898 at commit [`728e1c8`](https://github.com/apache/spark/commit/728e1c8eef5d081b499f3e105a0dc13302d8e799).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r101320333
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -363,80 +393,42 @@ object FileFormatWriter extends Logging {
             committer.newTaskTempFile(taskAttemptContext, partDir, ext)
           }
     
    -      currentWriter = description.outputWriterFactory.newInstance(
    +      currentWriter = desc.outputWriterFactory.newInstance(
             path = path,
    -        dataSchema = description.dataColumns.toStructType,
    +        dataSchema = desc.dataColumns.toStructType,
             context = taskAttemptContext)
         }
     
         override def execute(iter: Iterator[InternalRow]): Set[String] = {
    -      // We should first sort by partition columns, then bucket id, and finally sorting columns.
    -      val sortingExpressions: Seq[Expression] =
    -        description.partitionColumns ++ bucketIdExpression ++ sortColumns
    -      val getSortingKey = UnsafeProjection.create(sortingExpressions, description.allColumns)
    -
    -      val sortingKeySchema = StructType(sortingExpressions.map {
    -        case a: Attribute => StructField(a.name, a.dataType, a.nullable)
    -        // The sorting expressions are all `Attribute` except bucket id.
    -        case _ => StructField("bucketId", IntegerType, nullable = false)
    -      })
    +      val getPartitionColsAndBucketId = UnsafeProjection.create(
    +        desc.partitionColumns ++ bucketIdExpression, desc.allColumns)
     
    -      // Returns the data columns to be written given an input row
    -      val getOutputRow = UnsafeProjection.create(
    -        description.dataColumns, description.allColumns)
    -
    -      // Returns the partition path given a partition key.
    -      val getPartitionStringFunc = UnsafeProjection.create(
    -        Seq(Concat(partitionStringExpression)), description.partitionColumns)
    -
    -      // Sorts the data before write, so that we only need one writer at the same time.
    -      val sorter = new UnsafeKVExternalSorter(
    -        sortingKeySchema,
    -        StructType.fromAttributes(description.dataColumns),
    -        SparkEnv.get.blockManager,
    -        SparkEnv.get.serializerManager,
    -        TaskContext.get().taskMemoryManager().pageSizeBytes,
    -        SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
    -          UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))
    +      // Generates the partition path given the row generated by `getPartitionColsAndBucketId`.
    +      val getPartPath = UnsafeProjection.create(
    +        Seq(Concat(partitionPathExpression)), desc.partitionColumns)
     
    -      while (iter.hasNext) {
    -        val currentRow = iter.next()
    -        sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow))
    -      }
    -
    -      val getBucketingKey: InternalRow => InternalRow = if (sortColumns.isEmpty) {
    -        identity
    -      } else {
    -        UnsafeProjection.create(sortingExpressions.dropRight(sortColumns.length).zipWithIndex.map {
    -          case (expr, ordinal) => BoundReference(ordinal, expr.dataType, expr.nullable)
    -        })
    -      }
    -
    -      val sortedIterator = sorter.sortedIterator()
    +      // Returns the data columns to be written given an input row
    +      val getOutputRow = UnsafeProjection.create(desc.dataColumns, desc.allColumns)
     
           // If anything below fails, we should abort the task.
           var recordsInFile: Long = 0L
           var fileCounter = 0
    -      var currentKey: UnsafeRow = null
    +      var currentPartColsAndBucketId: UnsafeRow = null
           val updatedPartitions = mutable.Set[String]()
    -      while (sortedIterator.next()) {
    -        val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow]
    -        if (currentKey != nextKey) {
    -          // See a new key - write to a new partition (new file).
    -          currentKey = nextKey.copy()
    -          logDebug(s"Writing partition: $currentKey")
    +      for (row <- iter) {
    +        val nextPartColsAndBucketId = getPartitionColsAndBucketId(row)
    --- End diff --
    
    if you take a look at the `GenerateUnsafeProject`, actually it will reuse the same row instance, so we need to copy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] advoid unnecessary sort in Fil...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r100687761
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -363,80 +393,42 @@ object FileFormatWriter extends Logging {
             committer.newTaskTempFile(taskAttemptContext, partDir, ext)
           }
     
    -      currentWriter = description.outputWriterFactory.newInstance(
    +      currentWriter = desc.outputWriterFactory.newInstance(
             path = path,
    -        dataSchema = description.dataColumns.toStructType,
    +        dataSchema = desc.dataColumns.toStructType,
             context = taskAttemptContext)
         }
     
         override def execute(iter: Iterator[InternalRow]): Set[String] = {
    -      // We should first sort by partition columns, then bucket id, and finally sorting columns.
    -      val sortingExpressions: Seq[Expression] =
    -        description.partitionColumns ++ bucketIdExpression ++ sortColumns
    -      val getSortingKey = UnsafeProjection.create(sortingExpressions, description.allColumns)
    -
    -      val sortingKeySchema = StructType(sortingExpressions.map {
    -        case a: Attribute => StructField(a.name, a.dataType, a.nullable)
    -        // The sorting expressions are all `Attribute` except bucket id.
    -        case _ => StructField("bucketId", IntegerType, nullable = false)
    -      })
    +      val getPartitionColsAndBucketId = UnsafeProjection.create(
    +        desc.partitionColumns ++ bucketIdExpression, desc.allColumns)
     
    -      // Returns the data columns to be written given an input row
    -      val getOutputRow = UnsafeProjection.create(
    -        description.dataColumns, description.allColumns)
    -
    -      // Returns the partition path given a partition key.
    -      val getPartitionStringFunc = UnsafeProjection.create(
    -        Seq(Concat(partitionStringExpression)), description.partitionColumns)
    -
    -      // Sorts the data before write, so that we only need one writer at the same time.
    -      val sorter = new UnsafeKVExternalSorter(
    -        sortingKeySchema,
    -        StructType.fromAttributes(description.dataColumns),
    -        SparkEnv.get.blockManager,
    -        SparkEnv.get.serializerManager,
    -        TaskContext.get().taskMemoryManager().pageSizeBytes,
    -        SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
    -          UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))
    +      // Generates the partition path given the row generated by `getPartitionColsAndBucketId`.
    +      val getPartPath = UnsafeProjection.create(
    +        Seq(Concat(partitionPathExpression)), desc.partitionColumns)
     
    -      while (iter.hasNext) {
    -        val currentRow = iter.next()
    -        sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow))
    -      }
    -
    -      val getBucketingKey: InternalRow => InternalRow = if (sortColumns.isEmpty) {
    -        identity
    -      } else {
    -        UnsafeProjection.create(sortingExpressions.dropRight(sortColumns.length).zipWithIndex.map {
    -          case (expr, ordinal) => BoundReference(ordinal, expr.dataType, expr.nullable)
    -        })
    -      }
    -
    -      val sortedIterator = sorter.sortedIterator()
    +      // Returns the data columns to be written given an input row
    +      val getOutputRow = UnsafeProjection.create(desc.dataColumns, desc.allColumns)
     
           // If anything below fails, we should abort the task.
           var recordsInFile: Long = 0L
           var fileCounter = 0
    -      var currentKey: UnsafeRow = null
    +      var currentPartColsAndBucketId: UnsafeRow = null
           val updatedPartitions = mutable.Set[String]()
    -      while (sortedIterator.next()) {
    -        val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow]
    -        if (currentKey != nextKey) {
    -          // See a new key - write to a new partition (new file).
    -          currentKey = nextKey.copy()
    -          logDebug(s"Writing partition: $currentKey")
    +      for (row <- iter) {
    +        val nextPartColsAndBucketId = getPartitionColsAndBucketId(row)
    --- End diff --
    
    `getPartitionColsAndBucketId` is an unsafe projection. So `nextPartColsAndBucketId` is a new unsafe row. Do we still need a `copy` when assigning it to `currentPartColsAndBucketId`?
    
    Previously we need a copy because `getBucketingKey` can be an `identity` function. So the `nextKey` can be the same unsafe row.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] avoid unnecessary sort in FileFormatW...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] avoid unnecessary sort in FileFormatW...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    Sorry, I am late. Will review it tonight. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] avoid unnecessary sort in FileFormatW...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72955/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] avoid unnecessary sort in FileFormatW...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    **[Test build #73059 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73059/testReport)** for PR 16898 at commit [`728e1c8`](https://github.com/apache/spark/commit/728e1c8eef5d081b499f3e105a0dc13302d8e799).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] advoid unnecessary sort in Fil...

Posted by tejasapatil <gi...@git.apache.org>.
Github user tejasapatil commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r100697403
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -134,8 +142,26 @@ object FileFormatWriter extends Logging {
           // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
           committer.setupJob(job)
     
    +      val bucketIdExpression = bucketSpec.map { spec =>
    +        // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
    +        // guarantee the data distribution is same between shuffle and bucketed data source, which
    +        // enables us to only shuffle one side when join a bucketed table and a normal one.
    +        HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
    +      }
    +      // We should first sort by partition columns, then bucket id, and finally sorting columns.
    +      val requiredOrdering = (partitionColumns ++ bucketIdExpression ++ sortColumns)
    +        .map(SortOrder(_, Ascending))
    +      val actualOrdering = queryExecution.executedPlan.outputOrdering
    +      // We can still avoid the sort if the required ordering is [partCol] and the actual ordering
    +      // is [partCol, anotherCol].
    +      val rdd = if (requiredOrdering == actualOrdering.take(requiredOrdering.length)) {
    --- End diff --
    
    You could do semantic equals and not object equals. I recall using object equals in  in `EnsureRequirements` was adding unnecessary SORT in some cases : https://github.com/apache/spark/pull/14841/files#diff-cdb577e36041e4a27a605b6b3063fd54


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] avoid unnecessary sort in FileFormatW...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    **[Test build #73061 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73061/testReport)** for PR 16898 at commit [`83053ef`](https://github.com/apache/spark/commit/83053ef59d97abe1b9d05e0cf6184a00d261da25).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] advoid unnecessary sort in FileFormat...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    nit: typo in pr title `advoid`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] avoid unnecessary sort in FileFormatW...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] advoid unnecessary sort in Fil...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r100687380
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -134,8 +142,23 @@ object FileFormatWriter extends Logging {
           // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
           committer.setupJob(job)
     
    +      val bucketIdExpression = bucketSpec.map { spec =>
    +        // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
    +        // guarantee the data distribution is same between shuffle and bucketed data source, which
    +        // enables us to only shuffle one side when join a bucketed table and a normal one.
    +        HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
    +      }
    +      // We should first sort by partition columns, then bucket id, and finally sorting columns.
    +      val requiredOrdering = (partitionColumns ++ bucketIdExpression ++ sortColumns)
    +        .map(SortOrder(_, Ascending))
    +      val rdd = if (requiredOrdering == queryExecution.executedPlan.outputOrdering) {
    --- End diff --
    
    or, I should check the subset, good catch!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] avoid unnecessary sort in FileFormatW...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] avoid unnecessary sort in FileFormatW...

Posted by tejasapatil <gi...@git.apache.org>.
Github user tejasapatil commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    @cloud-fan : LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] avoid unnecessary sort in FileFormatW...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    thanks for the review, merging to master!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] advoid unnecessary sort in Fil...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r100687628
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -120,9 +127,10 @@ object FileFormatWriter extends Logging {
           serializableHadoopConf = new SerializableConfiguration(job.getConfiguration),
           outputWriterFactory = outputWriterFactory,
           allColumns = queryExecution.logical.output,
    --- End diff --
    
    Directly use `allColumns` created above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] advoid unnecessary sort in Fil...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r100687514
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -134,8 +142,26 @@ object FileFormatWriter extends Logging {
           // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
           committer.setupJob(job)
     
    +      val bucketIdExpression = bucketSpec.map { spec =>
    +        // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
    +        // guarantee the data distribution is same between shuffle and bucketed data source, which
    +        // enables us to only shuffle one side when join a bucketed table and a normal one.
    +        HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
    +      }
    +      // We should first sort by partition columns, then bucket id, and finally sorting columns.
    +      val requiredOrdering = (partitionColumns ++ bucketIdExpression ++ sortColumns)
    +        .map(SortOrder(_, Ascending))
    +      val actualOrdering = queryExecution.executedPlan.outputOrdering
    +      // We can still avoid the sort if the required ordering is [partCol] and the actual ordering
    +      // is [partCol, anotherCol].
    +      val rdd = if (requiredOrdering == actualOrdering.take(requiredOrdering.length)) {
    --- End diff --
    
    We only care if partition columns are the same between requiredOrdering and actualOrdering. The sort direction doesn't matter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] advoid unnecessary sort in FileFormat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72764/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] avoid unnecessary sort in FileFormatW...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] avoid unnecessary sort in FileFormatW...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72942/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] avoid unnecessary sort in FileFormatW...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73061/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] advoid unnecessary sort in Fil...

Posted by tejasapatil <gi...@git.apache.org>.
Github user tejasapatil commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r100700939
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -329,31 +349,41 @@ object FileFormatWriter extends Logging {
          * If bucket id is specified, we will append it to the end of the file name, but before the
    --- End diff --
    
    nit for previous line: `Open and returns a ...`
    
    this method does not return anything


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] advoid unnecessary sort in FileFormat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72767/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] advoid unnecessary sort in FileFormat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    **[Test build #72767 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72767/testReport)** for PR 16898 at commit [`00e2f22`](https://github.com/apache/spark/commit/00e2f22704a32cc2ceb0712139f5c084253c48d1).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16898: [SPARK-19563][SQL] avoid unnecessary sort in FileFormatW...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16898
  
    **[Test build #72955 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72955/testReport)** for PR 16898 at commit [`728e1c8`](https://github.com/apache/spark/commit/728e1c8eef5d081b499f3e105a0dc13302d8e799).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] advoid unnecessary sort in Fil...

Posted by tejasapatil <gi...@git.apache.org>.
Github user tejasapatil commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r100696521
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -108,8 +108,15 @@ object FileFormatWriter extends Logging {
         job.setOutputValueClass(classOf[InternalRow])
         FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))
     
    +    val allColumns = queryExecution.logical.output
         val partitionSet = AttributeSet(partitionColumns)
         val dataColumns = queryExecution.logical.output.filterNot(partitionSet.contains)
    +    val bucketColumns = bucketSpec.toSeq.flatMap {
    +      spec => spec.bucketColumnNames.map(c => allColumns.find(_.name == c).get)
    --- End diff --
    
    nit: `allColumns` -> `dataColumns` ?
    
    No need to look at all columns since Spark doesn't allow bucketing over partition columns.
    
    ```
    scala> df1.write.format("orc").partitionBy("i").bucketBy(8, "i").sortBy("k").saveAsTable("table70")
    org.apache.spark.sql.AnalysisException: bucketBy columns 'i' should not be part of partitionBy columns 'i';
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16898: [SPARK-19563][SQL] advoid unnecessary sort in Fil...

Posted by tejasapatil <gi...@git.apache.org>.
Github user tejasapatil commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r100697138
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -134,8 +142,26 @@ object FileFormatWriter extends Logging {
           // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
           committer.setupJob(job)
     
    +      val bucketIdExpression = bucketSpec.map { spec =>
    +        // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
    +        // guarantee the data distribution is same between shuffle and bucketed data source, which
    +        // enables us to only shuffle one side when join a bucketed table and a normal one.
    +        HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
    +      }
    +      // We should first sort by partition columns, then bucket id, and finally sorting columns.
    +      val requiredOrdering = (partitionColumns ++ bucketIdExpression ++ sortColumns)
    --- End diff --
    
    Possible over-optimization : Spark allows sorting over partition columns so `requiredOrdering` can be changed to do:
    
    `partitionColumns` + `bucketIdExpression` + (`sortColumns` which are not in` partitionColumns`)
    
    so that any extra column(s) in sort expression can be deduped.
    
    ```
    scala> df1.write.format("orc").partitionBy("i").bucketBy(8, "i").sortBy("k").saveAsTable("table70")
    org.apache.spark.sql.AnalysisException: bucketBy columns 'i' should not be part of partitionBy columns 'i';
    ```
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org