You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by pwoody <gi...@git.apache.org> on 2017/07/05 14:06:34 UTC

[GitHub] spark pull request #18542: [SPARK-21317][SQL] Avoid sorting on bucket expres...

GitHub user pwoody opened a pull request:

    https://github.com/apache/spark/pull/18542

    [SPARK-21317][SQL] Avoid sorting on bucket expression if data is already bucketed

    ## What changes were proposed in this pull request?
    
    When saving bucketed data, each task will sort it's partition by the partition id produce by the HashPartitioning of the bucket column. If the input data is already partitioned by the bucket partitioning, then this value will be constant, making the sort unnecessary.
    
    ## How was this patch tested?
    Manual tests + debugging. Not sure what the easiest way to plug into verifying the plan inside the writer would have a sort or not.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/pwoody/spark SPARK-21317

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/18542.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #18542
    
----
commit 557837990dcfb17e19480bc3ea80ac685a8b3c32
Author: Patrick Woody <pw...@palantir.com>
Date:   2017-07-05T13:58:38Z

    Avoid sort on bucket expression if data is already bucketed

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18542: [SPARK-21317][SQL] Avoid sorting on bucket expres...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18542#discussion_r125802474
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -116,13 +116,26 @@ object FileFormatWriter extends Logging {
         val partitionSet = AttributeSet(partitionColumns)
         val dataColumns = allColumns.filterNot(partitionSet.contains)
     
    -    val bucketIdExpression = bucketSpec.map { spec =>
    +    val bucketPartitioning = bucketSpec.map { spec =>
           val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get)
    +      HashPartitioning(bucketColumns, spec.numBuckets)
    +    }
    +
    +    val bucketIdExpression = bucketPartitioning.map { partitioning =>
           // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
           // guarantee the data distribution is same between shuffle and bucketed data source, which
           // enables us to only shuffle one side when join a bucketed table and a normal one.
    -      HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
    +      partitioning.partitionIdExpression
         }
    +
    +    // If the plan's outputPartitioning is the same as the the bucket spec, then each row will have
    +    // a constant bucket id. We possibly can avoid the sort altogether.
    +    val bucketSortExpression = plan.outputPartitioning match {
    +      case output: HashPartitioning =>
    +        bucketPartitioning.filterNot(_.semanticEquals(output)).map(_.partitionIdExpression)
    --- End diff --
    
    The output partitioning doesn't guarantee that there's only one value for the partitioning columns in the partition.
    
    For example, assume the output partitioning is [a], it only guarantees that the rows with the same values of [a] will be in the same partition. So a partition can contains all the rows with `a = 1`, `a = 2`..., rather than all rows in the partition have `a = 1`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18542: [SPARK-21317][SQL] Avoid sorting on bucket expression if...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18542
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18542: [SPARK-21317][SQL] Avoid sorting on bucket expres...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18542#discussion_r125671769
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -116,13 +116,24 @@ object FileFormatWriter extends Logging {
         val partitionSet = AttributeSet(partitionColumns)
         val dataColumns = allColumns.filterNot(partitionSet.contains)
     
    -    val bucketIdExpression = bucketSpec.map { spec =>
    +    val bucketPartitioning = bucketSpec.map { spec =>
           val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get)
    +      HashPartitioning(bucketColumns, spec.numBuckets)
    +    }
    +
    +    val bucketIdExpression = bucketPartitioning.map { partitioning =>
           // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
           // guarantee the data distribution is same between shuffle and bucketed data source, which
           // enables us to only shuffle one side when join a bucketed table and a normal one.
    -      HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
    +      partitioning.partitionIdExpression
         }
    +
    +    // If the outputPartitioning for the plan guarantees the bucket spec, then it will have a
    +    // constant bucket id. We possibly can avoid the sort altogether.
    +    val bucketSortExpression = bucketPartitioning
    +      .filterNot(plan.outputPartitioning.guarantees(_))
    +      .map(_.partitionIdExpression)
    --- End diff --
    
    Even the outputPartitioning guarantees the bucket spec, doesn't it just guarantee the rows with same bucket are put into same partition? I think it doesn't guarantee the rows of the same bucket are put continuously.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18542: [SPARK-21317][SQL] Avoid sorting on bucket expres...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18542#discussion_r125838555
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -116,13 +116,26 @@ object FileFormatWriter extends Logging {
         val partitionSet = AttributeSet(partitionColumns)
         val dataColumns = allColumns.filterNot(partitionSet.contains)
     
    -    val bucketIdExpression = bucketSpec.map { spec =>
    +    val bucketPartitioning = bucketSpec.map { spec =>
           val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get)
    +      HashPartitioning(bucketColumns, spec.numBuckets)
    +    }
    +
    +    val bucketIdExpression = bucketPartitioning.map { partitioning =>
           // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
           // guarantee the data distribution is same between shuffle and bucketed data source, which
           // enables us to only shuffle one side when join a bucketed table and a normal one.
    -      HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
    +      partitioning.partitionIdExpression
         }
    +
    +    // If the plan's outputPartitioning is the same as the the bucket spec, then each row will have
    +    // a constant bucket id. We possibly can avoid the sort altogether.
    +    val bucketSortExpression = plan.outputPartitioning match {
    +      case output: HashPartitioning =>
    +        bucketPartitioning.filterNot(_.semanticEquals(output)).map(_.partitionIdExpression)
    --- End diff --
    
    Your `numBuckets` may be different to the `numPartitions` of the output partitioning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18542: [SPARK-21317][SQL] Avoid sorting on bucket expression if...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18542
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18542: [SPARK-21317][SQL] Avoid sorting on bucket expression if...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18542
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18542: [SPARK-21317][SQL] Avoid sorting on bucket expres...

Posted by pwoody <gi...@git.apache.org>.
Github user pwoody commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18542#discussion_r125835525
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -116,13 +116,26 @@ object FileFormatWriter extends Logging {
         val partitionSet = AttributeSet(partitionColumns)
         val dataColumns = allColumns.filterNot(partitionSet.contains)
     
    -    val bucketIdExpression = bucketSpec.map { spec =>
    +    val bucketPartitioning = bucketSpec.map { spec =>
           val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get)
    +      HashPartitioning(bucketColumns, spec.numBuckets)
    +    }
    +
    +    val bucketIdExpression = bucketPartitioning.map { partitioning =>
           // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
           // guarantee the data distribution is same between shuffle and bucketed data source, which
           // enables us to only shuffle one side when join a bucketed table and a normal one.
    -      HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
    +      partitioning.partitionIdExpression
         }
    +
    +    // If the plan's outputPartitioning is the same as the the bucket spec, then each row will have
    +    // a constant bucket id. We possibly can avoid the sort altogether.
    +    val bucketSortExpression = plan.outputPartitioning match {
    +      case output: HashPartitioning =>
    +        bucketPartitioning.filterNot(_.semanticEquals(output)).map(_.partitionIdExpression)
    --- End diff --
    
    If the outputPartitioning is the same as the bucketPartitioning, each row will already have the same value for `hash(a) % numBuckets`, regardless of the value of `a` proper.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18542: [SPARK-21317][SQL] Avoid sorting on bucket expres...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18542#discussion_r125927443
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -150,7 +163,7 @@ object FileFormatWriter extends Logging {
         )
     
         // We should first sort by partition columns, then bucket id, and finally sorting columns.
    -    val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns
    +    val requiredOrdering = partitionColumns ++ bucketSortExpression ++ sortColumns
    --- End diff --
    
    So when the `bucketSortExpression` is None. We should still compare `actualOrdering` with the original `requiredOrdering` which contains bucket id expression. Because now when any of them matches `actualOrdering`, we don't need to sort again.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18542: [SPARK-21317][SQL] Avoid sorting on bucket expres...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18542#discussion_r125841619
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -116,13 +116,26 @@ object FileFormatWriter extends Logging {
         val partitionSet = AttributeSet(partitionColumns)
         val dataColumns = allColumns.filterNot(partitionSet.contains)
     
    -    val bucketIdExpression = bucketSpec.map { spec =>
    +    val bucketPartitioning = bucketSpec.map { spec =>
           val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get)
    +      HashPartitioning(bucketColumns, spec.numBuckets)
    +    }
    +
    +    val bucketIdExpression = bucketPartitioning.map { partitioning =>
           // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
           // guarantee the data distribution is same between shuffle and bucketed data source, which
           // enables us to only shuffle one side when join a bucketed table and a normal one.
    -      HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
    +      partitioning.partitionIdExpression
         }
    +
    +    // If the plan's outputPartitioning is the same as the the bucket spec, then each row will have
    +    // a constant bucket id. We possibly can avoid the sort altogether.
    +    val bucketSortExpression = plan.outputPartitioning match {
    +      case output: HashPartitioning =>
    +        bucketPartitioning.filterNot(_.semanticEquals(output)).map(_.partitionIdExpression)
    --- End diff --
    
    If bucket expressions are the same as the partition expressions and the number of buckets are the same as the number of partitions, this may work. In this case, the bucket id is actually the partition id.
    
    Actually, I'm doubt that is this useful? You will have one file per partition because all rows in a partition are in the same bucket. So basically you don't have any advantage of bucket, right?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18542: [SPARK-21317][SQL] Avoid sorting on bucket expres...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18542#discussion_r125865118
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -116,13 +116,26 @@ object FileFormatWriter extends Logging {
         val partitionSet = AttributeSet(partitionColumns)
         val dataColumns = allColumns.filterNot(partitionSet.contains)
     
    -    val bucketIdExpression = bucketSpec.map { spec =>
    +    val bucketPartitioning = bucketSpec.map { spec =>
           val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get)
    +      HashPartitioning(bucketColumns, spec.numBuckets)
    +    }
    +
    +    val bucketIdExpression = bucketPartitioning.map { partitioning =>
           // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
           // guarantee the data distribution is same between shuffle and bucketed data source, which
           // enables us to only shuffle one side when join a bucketed table and a normal one.
    -      HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
    +      partitioning.partitionIdExpression
         }
    +
    +    // If the plan's outputPartitioning is the same as the the bucket spec, then each row will have
    +    // a constant bucket id. We possibly can avoid the sort altogether.
    +    val bucketSortExpression = plan.outputPartitioning match {
    +      case output: HashPartitioning =>
    +        bucketPartitioning.filterNot(_.semanticEquals(output)).map(_.partitionIdExpression)
    --- End diff --
    
    Ok. That sounds making sense. Although seems you still can't avoid the sort on the partition columns.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18542: [SPARK-21317][SQL] Avoid sorting on bucket expression if...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18542
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18542: [SPARK-21317][SQL] Avoid sorting on bucket expression if...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18542
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18542: [SPARK-21317][SQL] Avoid sorting on bucket expression if...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18542
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18542: [SPARK-21317][SQL] Avoid sorting on bucket expres...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18542#discussion_r125846726
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -116,13 +116,26 @@ object FileFormatWriter extends Logging {
         val partitionSet = AttributeSet(partitionColumns)
         val dataColumns = allColumns.filterNot(partitionSet.contains)
     
    -    val bucketIdExpression = bucketSpec.map { spec =>
    +    val bucketPartitioning = bucketSpec.map { spec =>
           val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get)
    +      HashPartitioning(bucketColumns, spec.numBuckets)
    +    }
    +
    +    val bucketIdExpression = bucketPartitioning.map { partitioning =>
           // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
           // guarantee the data distribution is same between shuffle and bucketed data source, which
           // enables us to only shuffle one side when join a bucketed table and a normal one.
    -      HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
    +      partitioning.partitionIdExpression
         }
    +
    +    // If the plan's outputPartitioning is the same as the the bucket spec, then each row will have
    +    // a constant bucket id. We possibly can avoid the sort altogether.
    +    val bucketSortExpression = plan.outputPartitioning match {
    +      case output: HashPartitioning =>
    +        bucketPartitioning.filterNot(_.semanticEquals(output)).map(_.partitionIdExpression)
    --- End diff --
    
    And I guess at the time, the table should be partitioned by the same columns, otherwise the data distribution might be wrong. Does it make sense to create table partition by and bucket by the same columns?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18542: [SPARK-21317][SQL] Avoid sorting on bucket expres...

Posted by pwoody <gi...@git.apache.org>.
Github user pwoody commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18542#discussion_r125718916
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -116,13 +116,24 @@ object FileFormatWriter extends Logging {
         val partitionSet = AttributeSet(partitionColumns)
         val dataColumns = allColumns.filterNot(partitionSet.contains)
     
    -    val bucketIdExpression = bucketSpec.map { spec =>
    +    val bucketPartitioning = bucketSpec.map { spec =>
           val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get)
    +      HashPartitioning(bucketColumns, spec.numBuckets)
    +    }
    +
    +    val bucketIdExpression = bucketPartitioning.map { partitioning =>
           // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
           // guarantee the data distribution is same between shuffle and bucketed data source, which
           // enables us to only shuffle one side when join a bucketed table and a normal one.
    -      HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
    +      partitioning.partitionIdExpression
         }
    +
    +    // If the outputPartitioning for the plan guarantees the bucket spec, then it will have a
    +    // constant bucket id. We possibly can avoid the sort altogether.
    +    val bucketSortExpression = bucketPartitioning
    +      .filterNot(plan.outputPartitioning.guarantees(_))
    +      .map(_.partitionIdExpression)
    --- End diff --
    
    Ah true, my mistake - we don't get that guarantee from partitioning alone. I can edit to be strict equality, which should get the most utility anyway (pipelines over the bucket spec or explicit partitioning in transform logic).
    
    As an aside, do we have a case w/ the current Partitioning implementations where guarantees(HashPartitioning) isn't going to have contiguous values?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18542: [SPARK-21317][SQL] Avoid sorting on bucket expres...

Posted by pwoody <gi...@git.apache.org>.
Github user pwoody commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18542#discussion_r125840979
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -116,13 +116,26 @@ object FileFormatWriter extends Logging {
         val partitionSet = AttributeSet(partitionColumns)
         val dataColumns = allColumns.filterNot(partitionSet.contains)
     
    -    val bucketIdExpression = bucketSpec.map { spec =>
    +    val bucketPartitioning = bucketSpec.map { spec =>
           val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get)
    +      HashPartitioning(bucketColumns, spec.numBuckets)
    +    }
    +
    +    val bucketIdExpression = bucketPartitioning.map { partitioning =>
           // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
           // guarantee the data distribution is same between shuffle and bucketed data source, which
           // enables us to only shuffle one side when join a bucketed table and a normal one.
    -      HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
    +      partitioning.partitionIdExpression
         }
    +
    +    // If the plan's outputPartitioning is the same as the the bucket spec, then each row will have
    +    // a constant bucket id. We possibly can avoid the sort altogether.
    +    val bucketSortExpression = plan.outputPartitioning match {
    +      case output: HashPartitioning =>
    +        bucketPartitioning.filterNot(_.semanticEquals(output)).map(_.partitionIdExpression)
    --- End diff --
    
    I don't think I'm quite following. If `bucketPartitioning.semanticEquals(outputPartitioning)`, then they both must have the same `numPartitions` in their expression?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18542: [SPARK-21317][SQL] Avoid sorting on bucket expres...

Posted by pwoody <gi...@git.apache.org>.
Github user pwoody commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18542#discussion_r125868099
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -150,7 +163,7 @@ object FileFormatWriter extends Logging {
         )
     
         // We should first sort by partition columns, then bucket id, and finally sorting columns.
    -    val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns
    +    val requiredOrdering = partitionColumns ++ bucketSortExpression ++ sortColumns
    --- End diff --
    
    Yeah - if the bucketSortExpression is Option.None it will just consider partition columns/sort columns for ordering.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18542: [SPARK-21317][SQL] Avoid sorting on bucket expres...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18542#discussion_r125865937
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -150,7 +163,7 @@ object FileFormatWriter extends Logging {
         )
     
         // We should first sort by partition columns, then bucket id, and finally sorting columns.
    -    val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns
    +    val requiredOrdering = partitionColumns ++ bucketSortExpression ++ sortColumns
    --- End diff --
    
    Once we skip sort on bucket id, the matching actual ordering can be both the ordering with and without bucket id, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18542: [SPARK-21317][SQL] Avoid sorting on bucket expression if...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/18542
  
    And I think we should also add tests for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18542: [SPARK-21317][SQL] Avoid sorting on bucket expression if...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/18542
  
    We always need more test cases for these changes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18542: [SPARK-21317][SQL] Avoid sorting on bucket expres...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18542#discussion_r125927953
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -150,7 +163,7 @@ object FileFormatWriter extends Logging {
         )
     
         // We should first sort by partition columns, then bucket id, and finally sorting columns.
    -    val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns
    +    val requiredOrdering = partitionColumns ++ bucketSortExpression ++ sortColumns
    --- End diff --
    
    When both of them don't match, we just need to sort by the `requiredOrdering` without bucket id expression.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18542: [SPARK-21317][SQL] Avoid sorting on bucket expres...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18542#discussion_r125841472
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -116,13 +116,26 @@ object FileFormatWriter extends Logging {
         val partitionSet = AttributeSet(partitionColumns)
         val dataColumns = allColumns.filterNot(partitionSet.contains)
     
    -    val bucketIdExpression = bucketSpec.map { spec =>
    +    val bucketPartitioning = bucketSpec.map { spec =>
           val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get)
    +      HashPartitioning(bucketColumns, spec.numBuckets)
    +    }
    +
    +    val bucketIdExpression = bucketPartitioning.map { partitioning =>
           // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
           // guarantee the data distribution is same between shuffle and bucketed data source, which
           // enables us to only shuffle one side when join a bucketed table and a normal one.
    -      HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
    +      partitioning.partitionIdExpression
         }
    +
    +    // If the plan's outputPartitioning is the same as the the bucket spec, then each row will have
    +    // a constant bucket id. We possibly can avoid the sort altogether.
    +    val bucketSortExpression = plan.outputPartitioning match {
    +      case output: HashPartitioning =>
    +        bucketPartitioning.filterNot(_.semanticEquals(output)).map(_.partitionIdExpression)
    --- End diff --
    
    oh, right. NVM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18542: [SPARK-21317][SQL] Avoid sorting on bucket expres...

Posted by pwoody <gi...@git.apache.org>.
Github user pwoody commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18542#discussion_r125849527
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -116,13 +116,26 @@ object FileFormatWriter extends Logging {
         val partitionSet = AttributeSet(partitionColumns)
         val dataColumns = allColumns.filterNot(partitionSet.contains)
     
    -    val bucketIdExpression = bucketSpec.map { spec =>
    +    val bucketPartitioning = bucketSpec.map { spec =>
           val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get)
    +      HashPartitioning(bucketColumns, spec.numBuckets)
    +    }
    +
    +    val bucketIdExpression = bucketPartitioning.map { partitioning =>
           // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
           // guarantee the data distribution is same between shuffle and bucketed data source, which
           // enables us to only shuffle one side when join a bucketed table and a normal one.
    -      HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
    +      partitioning.partitionIdExpression
         }
    +
    +    // If the plan's outputPartitioning is the same as the the bucket spec, then each row will have
    +    // a constant bucket id. We possibly can avoid the sort altogether.
    +    val bucketSortExpression = plan.outputPartitioning match {
    +      case output: HashPartitioning =>
    +        bucketPartitioning.filterNot(_.semanticEquals(output)).map(_.partitionIdExpression)
    --- End diff --
    
    The advantage of the bucketing would be that on read you are going to maintain your bucketing information in the query planner, avoiding possibly very expensive shuffles if your downstream Spark jobs are aggregating or joining on the same key(s).
    
    If I have just finished a join on the key in my original job, it doesn't make sense that I would need to re-sort the whole partition just to write it out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18542: [SPARK-21317][SQL] Avoid sorting on bucket expres...

Posted by pwoody <gi...@git.apache.org>.
Github user pwoody commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18542#discussion_r125867815
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -116,13 +116,26 @@ object FileFormatWriter extends Logging {
         val partitionSet = AttributeSet(partitionColumns)
         val dataColumns = allColumns.filterNot(partitionSet.contains)
     
    -    val bucketIdExpression = bucketSpec.map { spec =>
    +    val bucketPartitioning = bucketSpec.map { spec =>
           val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get)
    +      HashPartitioning(bucketColumns, spec.numBuckets)
    +    }
    +
    +    val bucketIdExpression = bucketPartitioning.map { partitioning =>
           // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
           // guarantee the data distribution is same between shuffle and bucketed data source, which
           // enables us to only shuffle one side when join a bucketed table and a normal one.
    -      HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
    +      partitioning.partitionIdExpression
         }
    +
    +    // If the plan's outputPartitioning is the same as the the bucket spec, then each row will have
    +    // a constant bucket id. We possibly can avoid the sort altogether.
    +    val bucketSortExpression = plan.outputPartitioning match {
    +      case output: HashPartitioning =>
    +        bucketPartitioning.filterNot(_.semanticEquals(output)).map(_.partitionIdExpression)
    --- End diff --
    
    Correct, this shouldn't affect that though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org