You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by viirya <gi...@git.apache.org> on 2017/01/23 08:54:29 UTC

[GitHub] spark pull request #16677: [WIP][SQL] Use map output statistices to improve ...

GitHub user viirya opened a pull request:

    https://github.com/apache/spark/pull/16677

    [WIP][SQL] Use map output statistices to improve global limit's parallelism

    ## What changes were proposed in this pull request?
    
    A logical `Limit` is performed actually by two physical operations `LocalLimit` and `GlobalLimit`.
    
    In most of time, before `GlobalLimit`, we will perform a shuffle exchange to shuffle data to single partition. When the limit number is very big, we shuffle a lot of data to a single partition and significantly reduce parallelism, except for the cost of shuffling.
    
    This change tries to perform `GlobalLimit` without shuffling data to single partition. Instead, we perform the map stage of the shuffling and collect the statistics of the number of rows in each partition. Shuffled data are actually all retrieved locally without from remote executors.
    
    Once we get the number of output rows in each partition, we only take the required number of rows from the locally shuffled data.
    
    ## How was this patch tested?
    
    Jenkins tests.
    
    Please review http://spark.apache.org/contributing.html before opening a pull request.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/viirya/spark-1 improve-global-limit-parallelism

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/16677.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #16677
    
----
commit e067b10274179c1307a04e2a94c141147867c58f
Author: Liang-Chi Hsieh <vi...@gmail.com>
Date:   2017-01-23T08:21:04Z

    Use map output statistices to improve global limit's parallelism.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    @hvanhovell @mridulm Your comments were addressed. Please have a look. Thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r197607227
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -93,25 +96,94 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
     }
     
     /**
    - * Take the first `limit` elements of each child partition, but do not collect or shuffle them.
    + * Take the `limit` elements of the child output.
      */
    -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +  override def output: Seq[Attribute] = child.output
     
       override def outputPartitioning: Partitioning = child.outputPartitioning
    -}
     
    -/**
    - * Take the first `limit` elements of the child's single output partition.
    - */
    -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
     
    -  override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
    +  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
     
    -  override def outputPartitioning: Partitioning = child.outputPartitioning
    +  protected override def doExecute(): RDD[InternalRow] = {
    +    val childRDD = child.execute()
    +    val partitioner = LocalPartitioning(child.outputPartitioning,
    +      childRDD.getNumPartitions)
    +    val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
    +      childRDD, child.output, partitioner, serializer)
    +    val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) {
    +      // submitMapStage does not accept RDD with 0 partition.
    +      // So, we will not submit this dependency.
    +      val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency)
    +      submittedStageFuture.get().recordsByPartitionId.toSeq
    +    } else {
    +      Nil
    +    }
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +    // During global limit, try to evenly distribute limited rows across data
    +    // partitions. If disabled, scanning data partitions sequentially until reaching limit number.
    +    // Besides, if child output has certain ordering, we can't evenly pick up rows from
    +    // each parititon.
    +    val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil
    +
    +    val shuffled = new ShuffledRowRDD(shuffleDependency)
    +
    +    val sumOfOutput = numberOfOutput.sum
    +    if (sumOfOutput <= limit) {
    +      shuffled
    +    } else if (!flatGlobalLimit) {
    +      var numRowTaken = 0
    +      val takeAmounts = numberOfOutput.map { num =>
    --- End diff --
    
    map -> foreach


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #92237 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92237/testReport)** for PR 16677 at commit [`f24171e`](https://github.com/apache/spark/commit/f24171e4364aa5d429dd76a22dc9991f7fff77f8).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r212805707
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
     }
     
     /**
    - * Take the first `limit` elements of each child partition, but do not collect or shuffle them.
    + * Take the `limit` elements of the child output.
      */
    -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +  override def output: Seq[Attribute] = child.output
     
       override def outputPartitioning: Partitioning = child.outputPartitioning
    -}
     
    -/**
    - * Take the first `limit` elements of the child's single output partition.
    - */
    -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
     
    -  override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
    +  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
     
    -  override def outputPartitioning: Partitioning = child.outputPartitioning
    +  protected override def doExecute(): RDD[InternalRow] = {
    +    val childRDD = child.execute()
    +    val partitioner = LocalPartitioning(childRDD)
    +    val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
    +      childRDD, child.output, partitioner, serializer)
    +    val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) {
    +      // submitMapStage does not accept RDD with 0 partition.
    +      // So, we will not submit this dependency.
    +      val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency)
    +      submittedStageFuture.get().recordsByPartitionId.toSeq
    +    } else {
    +      Nil
    +    }
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +    // During global limit, try to evenly distribute limited rows across data
    +    // partitions. If disabled, scanning data partitions sequentially until reaching limit number.
    +    // Besides, if child output has certain ordering, we can't evenly pick up rows from
    +    // each parititon.
    +    val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil
    --- End diff --
    
    If we remove it, we may need to feature flag it first since people may rely on the old behavior. Anyway all of this is up for debate.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    @mridulm are you ok with the current changes?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2985/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...

Posted by scwf <gi...@git.apache.org>.
Github user scwf commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r97700670
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -90,25 +95,100 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
     }
     
     /**
    - * Take the first `limit` elements of each child partition, but do not collect or shuffle them.
    + * Take the `limit` elements of the child output.
      */
    -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +  override def output: Seq[Attribute] = child.output
     
       override def outputPartitioning: Partitioning = child.outputPartitioning
    -}
     
    -/**
    - * Take the first `limit` elements of the child's single output partition.
    - */
    -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
     
    -  override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
    +  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
     
    -  override def outputPartitioning: Partitioning = child.outputPartitioning
    +  protected override def doExecute(): RDD[InternalRow] = {
    +    val childRDD = child.execute()
    +    val partitioner = FakePartitioning(child.outputPartitioning,
    +      childRDD.getNumPartitions)
    +    val shuffleDependency = ShuffleExchange.prepareShuffleDependency(
    +      childRDD, child.output, partitioner, serializer)
    +    val numberOfOutput: Seq[Int] = if (shuffleDependency.rdd.getNumPartitions != 0) {
    +      // submitMapStage does not accept RDD with 0 partition.
    +      // So, we will not submit this dependency.
    +      val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency)
    +      submittedStageFuture.get().numberOfOutput.toSeq
    +    } else {
    +      Nil
    +    }
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +    // Try to keep child plan's original data parallelism or not. It is enabled by default.
    +    val respectChildParallelism = sqlContext.conf.enableParallelGlobalLimit
    +
    +    val sumOfOutput = numberOfOutput.sum
    +    if (sumOfOutput <= limit) {
    +      childRDD
    --- End diff --
    
    i think here we should use the shuffle rdd to directly read the data from disk.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/515/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r103140028
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputStatistics.scala ---
    @@ -23,5 +23,9 @@ package org.apache.spark
      * @param shuffleId ID of the shuffle
      * @param bytesByPartitionId approximate number of output bytes for each map output partition
      *   (may be inexact due to use of compressed map statuses)
    + * @param numberOfOutput number of output for each pre-map output partition
      */
    -private[spark] class MapOutputStatistics(val shuffleId: Int, val bytesByPartitionId: Array[Long])
    +private[spark] class MapOutputStatistics(
    +    val shuffleId: Int,
    +    val bytesByPartitionId: Array[Long],
    +    val numberOfOutput: Array[Int])
    --- End diff --
    
    Ok. Use `Long` now


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #92339 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92339/testReport)** for PR 16677 at commit [`2d522b4`](https://github.com/apache/spark/commit/2d522b4d05cd544617bb7d78362aaf28bd15d62a).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class LocalPartitioning(orgPartition: Partitioning) extends Partitioning `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #92207 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92207/testReport)** for PR 16677 at commit [`1ff1fa5`](https://github.com/apache/spark/commit/1ff1fa56735722887c8eb0d6be42bf42fa580fa1).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    ping @cloud-fan @jiangxb1987 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #73518 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73518/testReport)** for PR 16677 at commit [`45a1fcb`](https://github.com/apache/spark/commit/45a1fcb729cd551fbba1c633af713152fd63c24c).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94195/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Let me take an example from the PR description
    > For example, we have three partitions with rows (100, 100, 50) respectively. In global limit of 100 rows, we may take (34, 33, 33) rows for each partition locally. After global limit we still have three partitions.
    
    Without this patch, we need to take the first 100 rows from each partition, and then perform a shuffle to send all data into one partition and take the first 100 rows.
    
    So if the limit is big, this patch is super useful, if the limit is small, this patch is not that useful but should not be slower.
    
    The only overhead I can think of is, `MapStatus` needs to carry the numRecords metrics. It should be a small overhead, as `MapStatus` already carries many information.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [WIP][SQL] Use map output statistices to improve global ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #71936 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/71936/testReport)** for PR 16677 at commit [`9d4cadb`](https://github.com/apache/spark/commit/9d4cadb782afcba52b8081402f5dd89cb0a27ae5).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class FakePartitioning(orgPartition: Partitioning, numPartitions: Int) extends Partitioning `
      * `case class LocalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode with CodegenSupport `
      * `case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #75688 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75688/testReport)** for PR 16677 at commit [`b8a2275`](https://github.com/apache/spark/commit/b8a22755bfdef8f1ab78016aea6914155ada67c1).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    I'm convinced, there are 2 major issues:
    1. abusing shuffle. we need a new mechanism for driver to analyze some statistics about data (records per map task)
    2. too many small tasks. We need a better algorithm to decide the parallelism of limit.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [WIP][SQL] Use map output statistices to improve global ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #71901 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/71901/testReport)** for PR 16677 at commit [`3dec117`](https://github.com/apache/spark/commit/3dec1173da9ec8d3d10e6aabf95bc33cc88a1587).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class FakePartitioning(orgPartition: Partitioning, numPartitions: Int) extends Partitioning `
      * `case class LocalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode with CodegenSupport `
      * `case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [WIP][SQL] Use map output statistices to improve global ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #71833 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/71833/testReport)** for PR 16677 at commit [`e067b10`](https://github.com/apache/spark/commit/e067b10274179c1307a04e2a94c141147867c58f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2994/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #90306 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90306/testReport)** for PR 16677 at commit [`062b8fd`](https://github.com/apache/spark/commit/062b8fd58ae13f252b1e6f61c70b69ed05521715).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/406/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    @sujith71955 Thanks for the test! The test number looks promising!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #93789 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93789/testReport)** for PR 16677 at commit [`d05c144`](https://github.com/apache/spark/commit/d05c144aecdd57f4ee3d179a240ccafa6c02bb66).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #92206 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92206/testReport)** for PR 16677 at commit [`59a3029`](https://github.com/apache/spark/commit/59a30294e5cc043844d49dc2e2315f36723b5e5f).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r198106419
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala ---
    @@ -193,6 +193,16 @@ case object SinglePartition extends Partitioning {
       }
     }
     
    +/**
    + * Represents a partitioning where rows are only serialized/deserialized locally. The number
    + * of partitions are not changed and also the distribution of rows. This is mainly used to
    + * obtain some statistics of map tasks such as number of outputs.
    + */
    +case class LocalPartitioning(orgPartition: Partitioning, numPartitions: Int) extends Partitioning {
    --- End diff --
    
    One more thing, can you make LocalRelation use `orgPartition.numPartitions` instead of adding the it as a separate field?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r197596253
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala ---
    @@ -247,6 +253,10 @@ object ShuffleExchangeExec {
             val projection = UnsafeProjection.create(h.partitionIdExpression :: Nil, outputAttributes)
             row => projection(row).getInt(0)
           case RangePartitioning(_, _) | SinglePartition => identity
    +      case LocalPartitioning(_, _) =>
    +        (row: InternalRow) => {
    +          TaskContext.get().partitionId()
    --- End diff --
    
    Fixed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r197606877
  
    --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java ---
    @@ -145,10 +145,12 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
         // included in the shuffle write time.
         writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
     
    +    long numOfRecords = 0;
         while (records.hasNext()) {
           final Product2<K, V> record = records.next();
           final K key = record._1();
           partitionWriters[partitioner.getPartition(key)].write(key, record._2());
    +      numOfRecords += 1;
    --- End diff --
    
    Here and elsewhere, simply use `writeMetrics._recordsWritten` instead of adding `numOfRecords` ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [WIP][SQL] Use map output statistices to improve global ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #71842 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/71842/testReport)** for PR 16677 at commit [`5dae2da`](https://github.com/apache/spark/commit/5dae2da0d83a4ae34b4d2138156eea5ea69fc3ea).
     * This patch **fails from timeout after a configured wait of \`250m\`**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class FakePartitioning(orgPartition: Partitioning, numPartitions: Int) extends Partitioning `
      * `case class LocalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode with CodegenSupport `
      * `case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1257/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #75699 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75699/testReport)** for PR 16677 at commit [`b8a2275`](https://github.com/apache/spark/commit/b8a22755bfdef8f1ab78016aea6914155ada67c1).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...

Posted by wzhfy <gi...@git.apache.org>.
Github user wzhfy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r115667172
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -90,25 +95,102 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
     }
     
     /**
    - * Take the first `limit` elements of each child partition, but do not collect or shuffle them.
    + * Take the `limit` elements of the child output.
      */
    -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +  override def output: Seq[Attribute] = child.output
     
       override def outputPartitioning: Partitioning = child.outputPartitioning
    -}
     
    -/**
    - * Take the first `limit` elements of the child's single output partition.
    - */
    -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +
    +  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
    +
    +  protected override def doExecute(): RDD[InternalRow] = {
    +    val childRDD = child.execute()
    +    val partitioner = LocalPartitioning(child.outputPartitioning,
    +      childRDD.getNumPartitions)
    +    val shuffleDependency = ShuffleExchange.prepareShuffleDependency(
    +      childRDD, child.output, partitioner, serializer)
    +    val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) {
    +      // submitMapStage does not accept RDD with 0 partition.
    +      // So, we will not submit this dependency.
    +      val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency)
    +      submittedStageFuture.get().numberOfOutput.toSeq
    +    } else {
    +      Nil
    +    }
     
    -  override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
    +    // Try to keep child plan's original data parallelism or not. It is enabled by default.
    +    val respectChildParallelism = sqlContext.conf.enableParallelGlobalLimit
     
    -  override def outputPartitioning: Partitioning = child.outputPartitioning
    +    val shuffled = new ShuffledRowRDD(shuffleDependency)
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +    val sumOfOutput = numberOfOutput.sum
    +    if (sumOfOutput <= limit) {
    +      shuffled
    +    } else if (!respectChildParallelism) {
    +      // This is mainly for tests.
    +      // We take the rows of each partition until we reach the required limit number.
    +      var numTakenRow = 0
    +      val takeAmounts = new mutable.HashMap[Int, Int]()
    +      numberOfOutput.zipWithIndex.foreach { case (num, index) =>
    +        if (numTakenRow + num < limit) {
    +          numTakenRow += num.toInt
    +          takeAmounts += ((index, num.toInt))
    +        } else {
    +          val toTake = limit - numTakenRow
    +          numTakenRow += toTake
    +          takeAmounts += ((index, toTake))
    +        }
    +      }
    +      val broadMap = sparkContext.broadcast(takeAmounts)
    +      shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
    +        broadMap.value.get(index).map { size =>
    +          iter.take(size)
    +        }.get
    +      }
    +    } else {
    +      // We try to distribute the required limit number of rows across all child rdd's partitions.
    +      var numTakenRow = 0
    +      val takeAmounts = new mutable.HashMap[Int, Int]()
    +      val nonEmptyParts = numberOfOutput.filter(_ > 0).size
    +      val takePerPart = limit / nonEmptyParts
    +      numberOfOutput.zipWithIndex.foreach { case (num, index) =>
    +        if (num >= takePerPart) {
    +          numTakenRow += takePerPart
    +          takeAmounts += ((index, takePerPart))
    +        } else {
    +          numTakenRow += num.toInt
    --- End diff --
    
    use Long for `numTakenRow`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73518/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [WIP][SQL] Use map output statistices to improve global ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #71936 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/71936/testReport)** for PR 16677 at commit [`9d4cadb`](https://github.com/apache/spark/commit/9d4cadb782afcba52b8081402f5dd89cb0a27ae5).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [WIP][SQL] Use map output statistices to improve global ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Looks like unrelated test failure at `VersionsSuite`...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #92215 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92215/testReport)** for PR 16677 at commit [`b0cca1a`](https://github.com/apache/spark/commit/b0cca1ae0672076fd096cfdc0d4a634481fcf216).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/75688/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...

Posted by scwf <gi...@git.apache.org>.
Github user scwf commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r97700863
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -90,25 +95,100 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
     }
     
     /**
    - * Take the first `limit` elements of each child partition, but do not collect or shuffle them.
    + * Take the `limit` elements of the child output.
      */
    -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +  override def output: Seq[Attribute] = child.output
     
       override def outputPartitioning: Partitioning = child.outputPartitioning
    -}
     
    -/**
    - * Take the first `limit` elements of the child's single output partition.
    - */
    -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
     
    -  override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
    +  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
     
    -  override def outputPartitioning: Partitioning = child.outputPartitioning
    +  protected override def doExecute(): RDD[InternalRow] = {
    +    val childRDD = child.execute()
    +    val partitioner = FakePartitioning(child.outputPartitioning,
    +      childRDD.getNumPartitions)
    +    val shuffleDependency = ShuffleExchange.prepareShuffleDependency(
    +      childRDD, child.output, partitioner, serializer)
    +    val numberOfOutput: Seq[Int] = if (shuffleDependency.rdd.getNumPartitions != 0) {
    +      // submitMapStage does not accept RDD with 0 partition.
    +      // So, we will not submit this dependency.
    +      val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency)
    +      submittedStageFuture.get().numberOfOutput.toSeq
    +    } else {
    +      Nil
    +    }
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +    // Try to keep child plan's original data parallelism or not. It is enabled by default.
    +    val respectChildParallelism = sqlContext.conf.enableParallelGlobalLimit
    +
    +    val sumOfOutput = numberOfOutput.sum
    +    if (sumOfOutput <= limit) {
    +      childRDD
    +    } else if (!respectChildParallelism) {
    +      // This is mainly for tests.
    +      // We take the rows of each partition until we reach the required limit number.
    +      var countForRows = 0
    +      val takeAmounts = new mutable.HashMap[Int, Int]()
    +      numberOfOutput.zipWithIndex.foreach { case (num, index) =>
    +        if (countForRows + num < limit) {
    +          countForRows += num
    +          takeAmounts += ((index, num))
    +        } else {
    +          val toTake = limit - countForRows
    +          countForRows += toTake
    +          takeAmounts += ((index, toTake))
    +        }
    +      }
    +      val shuffled = new ShuffledRowRDD(shuffleDependency)
    +      shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
    +        takeAmounts.get(index).map { size =>
    +          iter.take(size)
    +        }.getOrElse(iter)
    +      }
    +    } else {
    +      // We try to distribute the required limit number of rows across all child rdd's partitions.
    +      var numToReduce = (sumOfOutput - limit)
    +      val reduceAmounts = new mutable.HashMap[Int, Int]()
    --- End diff --
    
    its better to broadcast `reduceAmounts`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [WIP][SQL] Use map output statistices to improve global ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/71842/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #83247 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83247/testReport)** for PR 16677 at commit [`7598337`](https://github.com/apache/spark/commit/759833712a9be4b3f3f65cf4722ddd33851726e8).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...

Posted by scwf <gi...@git.apache.org>.
Github user scwf commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r97700568
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala ---
    @@ -230,6 +230,21 @@ case object SinglePartition extends Partitioning {
     }
     
     /**
    + * Represents a partitioning where rows are only serialized/deserialized locally. The number
    + * of partitions are not changed and also the distribution of rows. This is mainly used to
    + * obtain some statistics of map tasks such as number of outputs.
    + */
    +case class FakePartitioning(orgPartition: Partitioning, numPartitions: Int) extends Partitioning {
    --- End diff --
    
    how about `LocalPartitioning`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #72373 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72373/testReport)** for PR 16677 at commit [`b049cc4`](https://github.com/apache/spark/commit/b049cc4a4c83b308eb9aefa225435471b90a5f02).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...

Posted by wzhfy <gi...@git.apache.org>.
Github user wzhfy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r115645927
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -90,25 +95,102 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
     }
     
     /**
    - * Take the first `limit` elements of each child partition, but do not collect or shuffle them.
    + * Take the `limit` elements of the child output.
      */
    -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +  override def output: Seq[Attribute] = child.output
     
       override def outputPartitioning: Partitioning = child.outputPartitioning
    -}
     
    -/**
    - * Take the first `limit` elements of the child's single output partition.
    - */
    -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +
    +  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
    +
    +  protected override def doExecute(): RDD[InternalRow] = {
    +    val childRDD = child.execute()
    +    val partitioner = LocalPartitioning(child.outputPartitioning,
    +      childRDD.getNumPartitions)
    +    val shuffleDependency = ShuffleExchange.prepareShuffleDependency(
    +      childRDD, child.output, partitioner, serializer)
    +    val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) {
    +      // submitMapStage does not accept RDD with 0 partition.
    +      // So, we will not submit this dependency.
    +      val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency)
    +      submittedStageFuture.get().numberOfOutput.toSeq
    +    } else {
    +      Nil
    +    }
     
    -  override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
    +    // Try to keep child plan's original data parallelism or not. It is enabled by default.
    +    val respectChildParallelism = sqlContext.conf.enableParallelGlobalLimit
     
    -  override def outputPartitioning: Partitioning = child.outputPartitioning
    +    val shuffled = new ShuffledRowRDD(shuffleDependency)
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +    val sumOfOutput = numberOfOutput.sum
    +    if (sumOfOutput <= limit) {
    +      shuffled
    +    } else if (!respectChildParallelism) {
    +      // This is mainly for tests.
    +      // We take the rows of each partition until we reach the required limit number.
    +      var numTakenRow = 0
    +      val takeAmounts = new mutable.HashMap[Int, Int]()
    +      numberOfOutput.zipWithIndex.foreach { case (num, index) =>
    +        if (numTakenRow + num < limit) {
    +          numTakenRow += num.toInt
    +          takeAmounts += ((index, num.toInt))
    +        } else {
    +          val toTake = limit - numTakenRow
    +          numTakenRow += toTake
    +          takeAmounts += ((index, toTake))
    +        }
    +      }
    +      val broadMap = sparkContext.broadcast(takeAmounts)
    +      shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
    +        broadMap.value.get(index).map { size =>
    +          iter.take(size)
    +        }.get
    +      }
    +    } else {
    +      // We try to distribute the required limit number of rows across all child rdd's partitions.
    +      var numTakenRow = 0
    +      val takeAmounts = new mutable.HashMap[Int, Int]()
    +      val nonEmptyParts = numberOfOutput.filter(_ > 0).size
    --- End diff --
    
    Nit: filter(_ > 0).size -> count(_ > 0)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92367/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #71972 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/71972/testReport)** for PR 16677 at commit [`def10e6`](https://github.com/apache/spark/commit/def10e696bed279a240b4454154ce4aea713cf47).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...

Posted by sujith71955 <gi...@git.apache.org>.
Github user sujith71955 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r99463628
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
    @@ -216,7 +216,7 @@ class PlannerSuite extends SharedSQLContext {
               ).queryExecution.executedPlan.collect {
                 case exchange: ShuffleExchange => exchange
               }.length
    -          assert(numExchanges === 5)
    +          assert(numExchanges === 3)
    --- End diff --
    
    I took the respective change mentioned in this PR, this particular test case is failing since  numExchanges value i am getting as 5 still. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #90293 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90293/testReport)** for PR 16677 at commit [`062b8fd`](https://github.com/apache/spark/commit/062b8fd58ae13f252b1e6f61c70b69ed05521715).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Thank you! @hvanhovell 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #94452 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94452/testReport)** for PR 16677 at commit [`69513d1`](https://github.com/apache/spark/commit/69513d166ee56587d7b039b5d9645299785dcb77).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1967/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3101/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #92357 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92357/testReport)** for PR 16677 at commit [`9792220`](https://github.com/apache/spark/commit/97922200f9d898fe4fca1930f3bea8a3d12325e8).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class LocalPartitioning(childRDD: RDD[InternalRow]) extends Partitioning `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90311/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r218614872
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala ---
    @@ -44,18 +45,23 @@ private[spark] sealed trait MapStatus {
        * necessary for correctness, since block fetchers are allowed to skip zero-size blocks.
        */
       def getSizeForBlock(reduceId: Int): Long
    +
    +  /**
    +   * The number of outputs for the map task.
    +   */
    +  def numberOfOutput: Long
    --- End diff --
    
    what does this mean? output blocks? output files?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r218665902
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
     }
     
     /**
    - * Take the first `limit` elements of each child partition, but do not collect or shuffle them.
    + * Take the `limit` elements of the child output.
      */
    -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +  override def output: Seq[Attribute] = child.output
     
       override def outputPartitioning: Partitioning = child.outputPartitioning
    -}
     
    -/**
    - * Take the first `limit` elements of the child's single output partition.
    - */
    -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
     
    -  override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
    +  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
     
    -  override def outputPartitioning: Partitioning = child.outputPartitioning
    +  protected override def doExecute(): RDD[InternalRow] = {
    +    val childRDD = child.execute()
    +    val partitioner = LocalPartitioning(childRDD)
    +    val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
    +      childRDD, child.output, partitioner, serializer)
    +    val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) {
    +      // submitMapStage does not accept RDD with 0 partition.
    +      // So, we will not submit this dependency.
    +      val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency)
    +      submittedStageFuture.get().recordsByPartitionId.toSeq
    +    } else {
    +      Nil
    +    }
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +    // During global limit, try to evenly distribute limited rows across data
    +    // partitions. If disabled, scanning data partitions sequentially until reaching limit number.
    +    // Besides, if child output has certain ordering, we can't evenly pick up rows from
    +    // each parititon.
    +    val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil
    +
    +    val shuffled = new ShuffledRowRDD(shuffleDependency)
    +
    +    val sumOfOutput = numberOfOutput.sum
    +    if (sumOfOutput <= limit) {
    +      shuffled
    +    } else if (!flatGlobalLimit) {
    +      var numRowTaken = 0
    +      val takeAmounts = numberOfOutput.map { num =>
    +        if (numRowTaken + num < limit) {
    +          numRowTaken += num.toInt
    +          num.toInt
    +        } else {
    +          val toTake = limit - numRowTaken
    +          numRowTaken += toTake
    +          toTake
    +        }
    +      }
    +      val broadMap = sparkContext.broadcast(takeAmounts)
    +      shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
    +        iter.take(broadMap.value(index).toInt)
    +      }
    +    } else {
    +      // We try to evenly require the asked limit number of rows across all child rdd's partitions.
    +      var rowsNeedToTake: Long = limit
    +      val takeAmountByPartition: Array[Long] = Array.fill[Long](numberOfOutput.length)(0L)
    +      val remainingRowsByPartition: Array[Long] = Array(numberOfOutput: _*)
    +
    +      while (rowsNeedToTake > 0) {
    +        val nonEmptyParts = remainingRowsByPartition.count(_ > 0)
    +        // If the rows needed to take are less the number of non-empty partitions, take one row from
    +        // each non-empty partitions until we reach `limit` rows.
    +        // Otherwise, evenly divide the needed rows to each non-empty partitions.
    +        val takePerPart = math.max(1, rowsNeedToTake / nonEmptyParts)
    +        remainingRowsByPartition.zipWithIndex.foreach { case (num, index) =>
    +          // In case `rowsNeedToTake` < `nonEmptyParts`, we may run out of `rowsNeedToTake` during
    +          // the traversal, so we need to add this check.
    +          if (rowsNeedToTake > 0 && num > 0) {
    +            if (num >= takePerPart) {
    +              rowsNeedToTake -= takePerPart
    +              takeAmountByPartition(index) += takePerPart
    +              remainingRowsByPartition(index) -= takePerPart
    +            } else {
    +              rowsNeedToTake -= num
    +              takeAmountByPartition(index) += num
    +              remainingRowsByPartition(index) -= num
    +            }
    +          }
    +        }
    +      }
    +      val broadMap = sparkContext.broadcast(takeAmountByPartition)
    --- End diff --
    
    but tasks are already broadcasted


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r197410930
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -93,25 +98,95 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
     }
     
     /**
    - * Take the first `limit` elements of each child partition, but do not collect or shuffle them.
    + * Take the `limit` elements of the child output.
      */
    -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +  override def output: Seq[Attribute] = child.output
     
       override def outputPartitioning: Partitioning = child.outputPartitioning
    -}
     
    -/**
    - * Take the first `limit` elements of the child's single output partition.
    - */
    -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
     
    -  override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
    +  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
     
    -  override def outputPartitioning: Partitioning = child.outputPartitioning
    +  protected override def doExecute(): RDD[InternalRow] = {
    +    val childRDD = child.execute()
    +    val partitioner = LocalPartitioning(child.outputPartitioning,
    +      childRDD.getNumPartitions)
    +    val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
    +      childRDD, child.output, partitioner, serializer)
    +    val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) {
    +      // submitMapStage does not accept RDD with 0 partition.
    +      // So, we will not submit this dependency.
    +      val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency)
    +      submittedStageFuture.get().recordsByPartitionId.toSeq
    +    } else {
    +      Nil
    +    }
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +    // During global limit, try to evenly distribute limited rows across data
    +    // partitions. If disabled, scanning data partitions sequentially until reaching limit number.
    +    // Besides, if child output has certain ordering, we can't evenly pick up rows from
    +    // each parititon.
    +    val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil
    +
    +    val shuffled = new ShuffledRowRDD(shuffleDependency)
    +
    +    val sumOfOutput = numberOfOutput.sum
    +    if (sumOfOutput <= limit) {
    +      shuffled
    +    } else if (!flatGlobalLimit) {
    +      var numRowTaken = 0
    +      val takeAmounts = mutable.ArrayBuffer.fill[Long](numberOfOutput.length)(0L)
    --- End diff --
    
    I might be dumb, but why do you need an `ArrayBuffer` here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/410/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93486/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r218651545
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
     }
     
     /**
    - * Take the first `limit` elements of each child partition, but do not collect or shuffle them.
    + * Take the `limit` elements of the child output.
      */
    -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +  override def output: Seq[Attribute] = child.output
     
       override def outputPartitioning: Partitioning = child.outputPartitioning
    -}
     
    -/**
    - * Take the first `limit` elements of the child's single output partition.
    - */
    -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
     
    -  override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
    +  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
     
    -  override def outputPartitioning: Partitioning = child.outputPartitioning
    +  protected override def doExecute(): RDD[InternalRow] = {
    +    val childRDD = child.execute()
    +    val partitioner = LocalPartitioning(childRDD)
    +    val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
    +      childRDD, child.output, partitioner, serializer)
    +    val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) {
    +      // submitMapStage does not accept RDD with 0 partition.
    +      // So, we will not submit this dependency.
    +      val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency)
    +      submittedStageFuture.get().recordsByPartitionId.toSeq
    +    } else {
    +      Nil
    +    }
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +    // During global limit, try to evenly distribute limited rows across data
    +    // partitions. If disabled, scanning data partitions sequentially until reaching limit number.
    +    // Besides, if child output has certain ordering, we can't evenly pick up rows from
    +    // each parititon.
    +    val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil
    +
    +    val shuffled = new ShuffledRowRDD(shuffleDependency)
    +
    +    val sumOfOutput = numberOfOutput.sum
    +    if (sumOfOutput <= limit) {
    +      shuffled
    +    } else if (!flatGlobalLimit) {
    +      var numRowTaken = 0
    +      val takeAmounts = numberOfOutput.map { num =>
    +        if (numRowTaken + num < limit) {
    +          numRowTaken += num.toInt
    +          num.toInt
    +        } else {
    +          val toTake = limit - numRowTaken
    +          numRowTaken += toTake
    +          toTake
    +        }
    +      }
    +      val broadMap = sparkContext.broadcast(takeAmounts)
    +      shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
    +        iter.take(broadMap.value(index).toInt)
    +      }
    +    } else {
    +      // We try to evenly require the asked limit number of rows across all child rdd's partitions.
    +      var rowsNeedToTake: Long = limit
    +      val takeAmountByPartition: Array[Long] = Array.fill[Long](numberOfOutput.length)(0L)
    +      val remainingRowsByPartition: Array[Long] = Array(numberOfOutput: _*)
    +
    +      while (rowsNeedToTake > 0) {
    +        val nonEmptyParts = remainingRowsByPartition.count(_ > 0)
    +        // If the rows needed to take are less the number of non-empty partitions, take one row from
    +        // each non-empty partitions until we reach `limit` rows.
    +        // Otherwise, evenly divide the needed rows to each non-empty partitions.
    +        val takePerPart = math.max(1, rowsNeedToTake / nonEmptyParts)
    +        remainingRowsByPartition.zipWithIndex.foreach { case (num, index) =>
    +          // In case `rowsNeedToTake` < `nonEmptyParts`, we may run out of `rowsNeedToTake` during
    +          // the traversal, so we need to add this check.
    +          if (rowsNeedToTake > 0 && num > 0) {
    +            if (num >= takePerPart) {
    +              rowsNeedToTake -= takePerPart
    +              takeAmountByPartition(index) += takePerPart
    +              remainingRowsByPartition(index) -= takePerPart
    +            } else {
    +              rowsNeedToTake -= num
    +              takeAmountByPartition(index) += num
    +              remainingRowsByPartition(index) -= num
    +            }
    +          }
    +        }
    +      }
    +      val broadMap = sparkContext.broadcast(takeAmountByPartition)
    --- End diff --
    
    broadcast is more efficient if data size is big, because of `TorrentBroadcast`. What's our expectation of the data size here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #71955 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/71955/testReport)** for PR 16677 at commit [`7f89c30`](https://github.com/apache/spark/commit/7f89c305f8ddd595fd752f7a8c238d23ec796895).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class FakePartitioning(orgPartition: Partitioning, numPartitions: Int) extends Partitioning `
      * `case class LocalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode with CodegenSupport `
      * `case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94216/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    @cloud-fan Compared with the sizes of each block, we only send back an Int for each partition. I think the overhead should be very low.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r212811618
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
     }
     
     /**
    - * Take the first `limit` elements of each child partition, but do not collect or shuffle them.
    + * Take the `limit` elements of the child output.
      */
    -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +  override def output: Seq[Attribute] = child.output
     
       override def outputPartitioning: Partitioning = child.outputPartitioning
    -}
     
    -/**
    - * Take the first `limit` elements of the child's single output partition.
    - */
    -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
     
    -  override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
    +  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
     
    -  override def outputPartitioning: Partitioning = child.outputPartitioning
    +  protected override def doExecute(): RDD[InternalRow] = {
    +    val childRDD = child.execute()
    +    val partitioner = LocalPartitioning(childRDD)
    +    val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
    +      childRDD, child.output, partitioner, serializer)
    +    val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) {
    +      // submitMapStage does not accept RDD with 0 partition.
    +      // So, we will not submit this dependency.
    +      val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency)
    +      submittedStageFuture.get().recordsByPartitionId.toSeq
    +    } else {
    +      Nil
    +    }
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +    // During global limit, try to evenly distribute limited rows across data
    +    // partitions. If disabled, scanning data partitions sequentially until reaching limit number.
    +    // Besides, if child output has certain ordering, we can't evenly pick up rows from
    +    // each parititon.
    +    val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil
    --- End diff --
    
    For a query like `select * from table order by a limit 10`, I think the expected semantics is going to return top 10 elements, not any 10 elements. In order to not change this behavior, I add this check.
    
    > Moreover checking child.outputOrdering only checks the order of the partition and not the order of the frame as a whole. You should also add the child.outputPartitioning.
    
    I think you are correct. We need to check `child.outputPartitioning`. I think we need to check there is a `RangePartitioning`. The check should be the child is a range partitioning and has some output ordering. WDYT?
    
    > I am not entirely sure that we should guarantee that you should get the lowest elements of a dataset if you perform a limit in the middle of a query (a top level sort-limit does have this guarantee). I also don't think the SQL standard supports/mandates this.
    > I would be slightly in favor of removing the child.outputOrdering check.
    
    I am not sure for a limit in the middle of a query, if we don't need to consider this. When such query has sort, don't we need to return top limit elements?
    
    cc @cloud-fan too.
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #74086 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74086/testReport)** for PR 16677 at commit [`2d37598`](https://github.com/apache/spark/commit/2d37598d20f8a2c536eab4bd6a7e4da26f8bdc61).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #94195 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94195/testReport)** for PR 16677 at commit [`69513d1`](https://github.com/apache/spark/commit/69513d166ee56587d7b039b5d9645299785dcb77).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/71977/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r204221009
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
    @@ -204,6 +204,13 @@ object SQLConf {
         .intConf
         .createWithDefault(4)
     
    +  val LIMIT_FLAT_GLOBAL_LIMIT = buildConf("spark.sql.limit.flatGlobalLimit")
    +    .internal()
    +    .doc("During global limit, try to evenly distribute limited rows across data " +
    +      "partitions. If disabled, scanning data partitions sequentially until reaching limit number.")
    +    .booleanConf
    +    .createWithDefault(true)
    --- End diff --
    
    @hvanhovell Should we set it false by default?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by sujith71955 <gi...@git.apache.org>.
Github user sujith71955 commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Mainly i think we are trying to interpolate the number of partitions


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...

Posted by scwf <gi...@git.apache.org>.
Github user scwf commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r97783672
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -90,25 +95,101 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
     }
     
     /**
    - * Take the first `limit` elements of each child partition, but do not collect or shuffle them.
    + * Take the `limit` elements of the child output.
      */
    -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +  override def output: Seq[Attribute] = child.output
     
       override def outputPartitioning: Partitioning = child.outputPartitioning
    -}
     
    -/**
    - * Take the first `limit` elements of the child's single output partition.
    - */
    -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +
    +  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
    +
    +  protected override def doExecute(): RDD[InternalRow] = {
    +    val childRDD = child.execute()
    +    val partitioner = LocalPartitioning(child.outputPartitioning,
    +      childRDD.getNumPartitions)
    +    val shuffleDependency = ShuffleExchange.prepareShuffleDependency(
    +      childRDD, child.output, partitioner, serializer)
    +    val numberOfOutput: Seq[Int] = if (shuffleDependency.rdd.getNumPartitions != 0) {
    +      // submitMapStage does not accept RDD with 0 partition.
    +      // So, we will not submit this dependency.
    +      val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency)
    +      submittedStageFuture.get().numberOfOutput.toSeq
    +    } else {
    +      Nil
    +    }
     
    -  override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
    +    // Try to keep child plan's original data parallelism or not. It is enabled by default.
    +    val respectChildParallelism = sqlContext.conf.enableParallelGlobalLimit
     
    -  override def outputPartitioning: Partitioning = child.outputPartitioning
    +    val shuffled = new ShuffledRowRDD(shuffleDependency)
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +    val sumOfOutput = numberOfOutput.sum
    +    if (sumOfOutput <= limit) {
    +      shuffled
    +    } else if (!respectChildParallelism) {
    +      // This is mainly for tests.
    +      // We take the rows of each partition until we reach the required limit number.
    --- End diff --
    
    Hmm, i think the old single partition is not like this, why we need this branch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [WIP][SQL] Use map output statistices to improve global ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/71833/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r97786175
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -90,25 +95,101 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
     }
     
     /**
    - * Take the first `limit` elements of each child partition, but do not collect or shuffle them.
    + * Take the `limit` elements of the child output.
      */
    -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +  override def output: Seq[Attribute] = child.output
     
       override def outputPartitioning: Partitioning = child.outputPartitioning
    -}
     
    -/**
    - * Take the first `limit` elements of the child's single output partition.
    - */
    -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +
    +  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
    +
    +  protected override def doExecute(): RDD[InternalRow] = {
    +    val childRDD = child.execute()
    +    val partitioner = LocalPartitioning(child.outputPartitioning,
    +      childRDD.getNumPartitions)
    +    val shuffleDependency = ShuffleExchange.prepareShuffleDependency(
    +      childRDD, child.output, partitioner, serializer)
    +    val numberOfOutput: Seq[Int] = if (shuffleDependency.rdd.getNumPartitions != 0) {
    +      // submitMapStage does not accept RDD with 0 partition.
    +      // So, we will not submit this dependency.
    +      val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency)
    +      submittedStageFuture.get().numberOfOutput.toSeq
    +    } else {
    +      Nil
    +    }
     
    -  override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
    +    // Try to keep child plan's original data parallelism or not. It is enabled by default.
    +    val respectChildParallelism = sqlContext.conf.enableParallelGlobalLimit
     
    -  override def outputPartitioning: Partitioning = child.outputPartitioning
    +    val shuffled = new ShuffledRowRDD(shuffleDependency)
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +    val sumOfOutput = numberOfOutput.sum
    +    if (sumOfOutput <= limit) {
    +      shuffled
    +    } else if (!respectChildParallelism) {
    +      // This is mainly for tests.
    +      // We take the rows of each partition until we reach the required limit number.
    --- End diff --
    
    Actually it is, although it is not so similar at the first look.
    
    In the previous single partition approach, global limit will fetch rows from first partition and then 2nd partition...until it reaches the limit number of rows.
    
    This branch does the same. It takes the rows from first partition, then 2nd partitions...until it reaches the limit number of rows.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r218652707
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
     }
     
     /**
    - * Take the first `limit` elements of each child partition, but do not collect or shuffle them.
    + * Take the `limit` elements of the child output.
      */
    -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +  override def output: Seq[Attribute] = child.output
     
       override def outputPartitioning: Partitioning = child.outputPartitioning
    -}
     
    -/**
    - * Take the first `limit` elements of the child's single output partition.
    - */
    -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
     
    -  override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
    +  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
     
    -  override def outputPartitioning: Partitioning = child.outputPartitioning
    +  protected override def doExecute(): RDD[InternalRow] = {
    +    val childRDD = child.execute()
    +    val partitioner = LocalPartitioning(childRDD)
    +    val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
    +      childRDD, child.output, partitioner, serializer)
    +    val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) {
    +      // submitMapStage does not accept RDD with 0 partition.
    +      // So, we will not submit this dependency.
    +      val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency)
    +      submittedStageFuture.get().recordsByPartitionId.toSeq
    +    } else {
    +      Nil
    +    }
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +    // During global limit, try to evenly distribute limited rows across data
    +    // partitions. If disabled, scanning data partitions sequentially until reaching limit number.
    +    // Besides, if child output has certain ordering, we can't evenly pick up rows from
    +    // each parititon.
    +    val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil
    +
    +    val shuffled = new ShuffledRowRDD(shuffleDependency)
    +
    +    val sumOfOutput = numberOfOutput.sum
    +    if (sumOfOutput <= limit) {
    +      shuffled
    +    } else if (!flatGlobalLimit) {
    +      var numRowTaken = 0
    +      val takeAmounts = numberOfOutput.map { num =>
    +        if (numRowTaken + num < limit) {
    +          numRowTaken += num.toInt
    +          num.toInt
    +        } else {
    +          val toTake = limit - numRowTaken
    +          numRowTaken += toTake
    +          toTake
    +        }
    +      }
    +      val broadMap = sparkContext.broadcast(takeAmounts)
    +      shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
    +        iter.take(broadMap.value(index).toInt)
    +      }
    +    } else {
    +      // We try to evenly require the asked limit number of rows across all child rdd's partitions.
    +      var rowsNeedToTake: Long = limit
    +      val takeAmountByPartition: Array[Long] = Array.fill[Long](numberOfOutput.length)(0L)
    +      val remainingRowsByPartition: Array[Long] = Array(numberOfOutput: _*)
    +
    +      while (rowsNeedToTake > 0) {
    +        val nonEmptyParts = remainingRowsByPartition.count(_ > 0)
    +        // If the rows needed to take are less the number of non-empty partitions, take one row from
    +        // each non-empty partitions until we reach `limit` rows.
    +        // Otherwise, evenly divide the needed rows to each non-empty partitions.
    +        val takePerPart = math.max(1, rowsNeedToTake / nonEmptyParts)
    +        remainingRowsByPartition.zipWithIndex.foreach { case (num, index) =>
    +          // In case `rowsNeedToTake` < `nonEmptyParts`, we may run out of `rowsNeedToTake` during
    +          // the traversal, so we need to add this check.
    +          if (rowsNeedToTake > 0 && num > 0) {
    +            if (num >= takePerPart) {
    +              rowsNeedToTake -= takePerPart
    +              takeAmountByPartition(index) += takePerPart
    +              remainingRowsByPartition(index) -= takePerPart
    +            } else {
    +              rowsNeedToTake -= num
    +              takeAmountByPartition(index) += num
    +              remainingRowsByPartition(index) -= num
    +            }
    +          }
    +        }
    +      }
    +      val broadMap = sparkContext.broadcast(takeAmountByPartition)
    --- End diff --
    
    The size depends on the number of partitions. Each partition uses an int. If this is too small, we can remove the broadcast.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #92339 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92339/testReport)** for PR 16677 at commit [`2d522b4`](https://github.com/apache/spark/commit/2d522b4d05cd544617bb7d78362aaf28bd15d62a).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/404/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #92383 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92383/testReport)** for PR 16677 at commit [`9792220`](https://github.com/apache/spark/commit/97922200f9d898fe4fca1930f3bea8a3d12325e8).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94452/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1490/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92237/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90452/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #83251 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83251/testReport)** for PR 16677 at commit [`e53648e`](https://github.com/apache/spark/commit/e53648e7f58f439bb09a702521c2f84cf2e344bd).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    > If there is some codepath not updating shuffle write metrics (introduced for sql), that would be a bug.
    
    I'd try to investigate this more to see if there is a bug. However, I think that should be considered orthogonal to this PR's change.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #92168 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92168/testReport)** for PR 16677 at commit [`5594bf9`](https://github.com/apache/spark/commit/5594bf9f13aa83d05a433bad0fd366daabd2d034).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    @hvanhovell @cloud-fan Will we consider to include this feature?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    @jiangxb1987 Sure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #83249 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83249/testReport)** for PR 16677 at commit [`e53648e`](https://github.com/apache/spark/commit/e53648e7f58f439bb09a702521c2f84cf2e344bd).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81619/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    actually looking at the design - this could cause perf regressions in some cases too right? it introduces a barrier that was previously non-existent. if the number of records to take isn't substantially less than the actual records on each partition, perf would be much worse. also it feels to me this isn't shuffle at all, and we are piggybacking on the wrong infrastructure. what you really want is a way to buffer blocks temporarily, and can launch a 2nd wave of tasks to rerun some of them.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [WIP][SQL] Use map output statistices to improve global ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/71905/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #94216 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94216/testReport)** for PR 16677 at commit [`69513d1`](https://github.com/apache/spark/commit/69513d166ee56587d7b039b5d9645299785dcb77).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #76809 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76809/testReport)** for PR 16677 at commit [`867a93d`](https://github.com/apache/spark/commit/867a93d01a7f83f4ea0e43a77bebee09727b96e1).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [WIP][SQL] Use map output statistices to improve global ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #81619 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81619/testReport)** for PR 16677 at commit [`f2a7aac`](https://github.com/apache/spark/commit/f2a7aacc22caf27c1a8af612c9432586e6a86d17).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1811/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90306/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #71977 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/71977/testReport)** for PR 16677 at commit [`4e31bb7`](https://github.com/apache/spark/commit/4e31bb7959cb774b51d6d8662f53a3ad96b4dc49).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class LocalPartitioning(orgPartition: Partitioning, numPartitions: Int) extends Partitioning `
      * `case class LocalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode with CodegenSupport `
      * `case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r97710396
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -90,25 +95,100 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
     }
     
     /**
    - * Take the first `limit` elements of each child partition, but do not collect or shuffle them.
    + * Take the `limit` elements of the child output.
      */
    -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +  override def output: Seq[Attribute] = child.output
     
       override def outputPartitioning: Partitioning = child.outputPartitioning
    -}
     
    -/**
    - * Take the first `limit` elements of the child's single output partition.
    - */
    -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
     
    -  override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
    +  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
     
    -  override def outputPartitioning: Partitioning = child.outputPartitioning
    +  protected override def doExecute(): RDD[InternalRow] = {
    +    val childRDD = child.execute()
    +    val partitioner = FakePartitioning(child.outputPartitioning,
    +      childRDD.getNumPartitions)
    +    val shuffleDependency = ShuffleExchange.prepareShuffleDependency(
    +      childRDD, child.output, partitioner, serializer)
    +    val numberOfOutput: Seq[Int] = if (shuffleDependency.rdd.getNumPartitions != 0) {
    +      // submitMapStage does not accept RDD with 0 partition.
    +      // So, we will not submit this dependency.
    +      val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency)
    +      submittedStageFuture.get().numberOfOutput.toSeq
    +    } else {
    +      Nil
    +    }
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +    // Try to keep child plan's original data parallelism or not. It is enabled by default.
    +    val respectChildParallelism = sqlContext.conf.enableParallelGlobalLimit
    +
    +    val sumOfOutput = numberOfOutput.sum
    +    if (sumOfOutput <= limit) {
    +      childRDD
    +    } else if (!respectChildParallelism) {
    +      // This is mainly for tests.
    +      // We take the rows of each partition until we reach the required limit number.
    +      var countForRows = 0
    +      val takeAmounts = new mutable.HashMap[Int, Int]()
    +      numberOfOutput.zipWithIndex.foreach { case (num, index) =>
    +        if (countForRows + num < limit) {
    +          countForRows += num
    +          takeAmounts += ((index, num))
    +        } else {
    +          val toTake = limit - countForRows
    +          countForRows += toTake
    +          takeAmounts += ((index, toTake))
    +        }
    +      }
    +      val shuffled = new ShuffledRowRDD(shuffleDependency)
    +      shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
    +        takeAmounts.get(index).map { size =>
    +          iter.take(size)
    +        }.getOrElse(iter)
    --- End diff --
    
    Actually we won't reach here, but the change is ok.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [WIP][SQL] Use map output statistices to improve global ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #71848 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/71848/testReport)** for PR 16677 at commit [`5dae2da`](https://github.com/apache/spark/commit/5dae2da0d83a4ae34b4d2138156eea5ea69fc3ea).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #94415 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94415/testReport)** for PR 16677 at commit [`69513d1`](https://github.com/apache/spark/commit/69513d166ee56587d7b039b5d9645299785dcb77).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #92369 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92369/testReport)** for PR 16677 at commit [`9792220`](https://github.com/apache/spark/commit/97922200f9d898fe4fca1930f3bea8a3d12325e8).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class LocalPartitioning(childRDD: RDD[InternalRow]) extends Partitioning `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #92420 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92420/testReport)** for PR 16677 at commit [`8b5102f`](https://github.com/apache/spark/commit/8b5102fe14c6defb33622a1310c31d0acdfc6b8f).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #94227 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94227/testReport)** for PR 16677 at commit [`69513d1`](https://github.com/apache/spark/commit/69513d166ee56587d7b039b5d9645299785dcb77).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r212792753
  
    --- Diff: sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql ---
    @@ -1,6 +1,9 @@
     -- A test suite for IN LIMIT in parent side, subquery, and both predicate subquery
     -- It includes correlated cases.
     
    +-- Disable global limit optimization
    --- End diff --
    
    The golden result strictly requires returned values on limit query. This disables the optimization to get the required results.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93771/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92205/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #93502 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93502/testReport)** for PR 16677 at commit [`d05c144`](https://github.com/apache/spark/commit/d05c144aecdd57f4ee3d179a240ccafa6c02bb66).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    I'm not sure where it can cause perf regressions. Basically this just changes the way we retrieve records from partitions when performing limit. This doesn't do shuffling them together to single partition.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    I understood the two major concerns regarding this change. I'm going to submit a pr to revert the change. I will look into this idea further with new design.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r197366478
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -93,25 +98,101 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
     }
     
     /**
    - * Take the first `limit` elements of each child partition, but do not collect or shuffle them.
    + * Take the `limit` elements of the child output.
      */
    -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +  override def output: Seq[Attribute] = child.output
     
       override def outputPartitioning: Partitioning = child.outputPartitioning
    -}
     
    -/**
    - * Take the first `limit` elements of the child's single output partition.
    - */
    -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +
    +  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
    +
    +  protected override def doExecute(): RDD[InternalRow] = {
    +    val childRDD = child.execute()
    +    val partitioner = LocalPartitioning(child.outputPartitioning,
    +      childRDD.getNumPartitions)
    +    val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
    +      childRDD, child.output, partitioner, serializer)
    +    val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) {
    +      // submitMapStage does not accept RDD with 0 partition.
    +      // So, we will not submit this dependency.
    +      val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency)
    +      submittedStageFuture.get().recordsByPartitionId.toSeq
    +    } else {
    +      Nil
    +    }
     
    -  override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
    +    // Try to keep child plan's original data parallelism or not. It is enabled by default.
    +    // If child output has certain ordering, we can't evenly pick up rows from each parititon.
    +    val respectChildParallelism = sqlContext.conf.enableParallelGlobalLimit &&
    +      child.outputOrdering != Nil
     
    -  override def outputPartitioning: Partitioning = child.outputPartitioning
    +    val shuffled = new ShuffledRowRDD(shuffleDependency)
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +    val sumOfOutput = numberOfOutput.sum
    +    if (sumOfOutput <= limit) {
    +      shuffled
    +    } else if (!respectChildParallelism) {
    +      // This is mainly for tests.
    +      // Some tests like hive compatibility tests assume that the rows are returned by a specified
    +      // order that the partitions are scaned sequentially until we reach the required number of
    +      // rows. However, logically a Limit operator should not care the row scan order.
    +      // Thus we take the rows of each partition until we reach the required limit number.
    +      var numTakenRow = 0
    +      val takeAmounts = new mutable.HashMap[Int, Int]()
    --- End diff --
    
    Yes. Going to updated.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #92420 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92420/testReport)** for PR 16677 at commit [`8b5102f`](https://github.com/apache/spark/commit/8b5102fe14c6defb33622a1310c31d0acdfc6b8f).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by dilipbiswal <gi...@git.apache.org>.
Github user dilipbiswal commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #90311 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90311/testReport)** for PR 16677 at commit [`a691e88`](https://github.com/apache/spark/commit/a691e885b1f304ba4037964e2fba09c540503e1a).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92206/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93789/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...

Posted by wzhfy <gi...@git.apache.org>.
Github user wzhfy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r115657718
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -90,25 +95,102 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
     }
     
     /**
    - * Take the first `limit` elements of each child partition, but do not collect or shuffle them.
    + * Take the `limit` elements of the child output.
      */
    -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +  override def output: Seq[Attribute] = child.output
     
       override def outputPartitioning: Partitioning = child.outputPartitioning
    -}
     
    -/**
    - * Take the first `limit` elements of the child's single output partition.
    - */
    -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +
    +  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
    +
    +  protected override def doExecute(): RDD[InternalRow] = {
    +    val childRDD = child.execute()
    +    val partitioner = LocalPartitioning(child.outputPartitioning,
    +      childRDD.getNumPartitions)
    +    val shuffleDependency = ShuffleExchange.prepareShuffleDependency(
    +      childRDD, child.output, partitioner, serializer)
    +    val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) {
    +      // submitMapStage does not accept RDD with 0 partition.
    +      // So, we will not submit this dependency.
    +      val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency)
    +      submittedStageFuture.get().numberOfOutput.toSeq
    +    } else {
    +      Nil
    +    }
     
    -  override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
    +    // Try to keep child plan's original data parallelism or not. It is enabled by default.
    +    val respectChildParallelism = sqlContext.conf.enableParallelGlobalLimit
     
    -  override def outputPartitioning: Partitioning = child.outputPartitioning
    +    val shuffled = new ShuffledRowRDD(shuffleDependency)
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +    val sumOfOutput = numberOfOutput.sum
    +    if (sumOfOutput <= limit) {
    +      shuffled
    +    } else if (!respectChildParallelism) {
    +      // This is mainly for tests.
    +      // We take the rows of each partition until we reach the required limit number.
    +      var numTakenRow = 0
    +      val takeAmounts = new mutable.HashMap[Int, Int]()
    +      numberOfOutput.zipWithIndex.foreach { case (num, index) =>
    +        if (numTakenRow + num < limit) {
    +          numTakenRow += num.toInt
    +          takeAmounts += ((index, num.toInt))
    +        } else {
    +          val toTake = limit - numTakenRow
    +          numTakenRow += toTake
    +          takeAmounts += ((index, toTake))
    +        }
    +      }
    +      val broadMap = sparkContext.broadcast(takeAmounts)
    +      shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
    +        broadMap.value.get(index).map { size =>
    +          iter.take(size)
    +        }.get
    +      }
    +    } else {
    +      // We try to distribute the required limit number of rows across all child rdd's partitions.
    +      var numTakenRow = 0
    +      val takeAmounts = new mutable.HashMap[Int, Int]()
    +      val nonEmptyParts = numberOfOutput.filter(_ > 0).size
    +      val takePerPart = limit / nonEmptyParts
    +      numberOfOutput.zipWithIndex.foreach { case (num, index) =>
    +        if (num >= takePerPart) {
    +          numTakenRow += takePerPart
    +          takeAmounts += ((index, takePerPart))
    +        } else {
    +          numTakenRow += num.toInt
    +          takeAmounts += ((index, num.toInt))
    +        }
    +      }
    +      var remainingRow = limit - numTakenRow
    +      while (remainingRow > 0) {
    +        numberOfOutput.zipWithIndex.foreach { case (num, index) =>
    +          val toTake = if (remainingRow / nonEmptyParts > 0) {
    --- End diff --
    
    remainingRow > nonEmptyParts?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #75699 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75699/testReport)** for PR 16677 at commit [`b8a2275`](https://github.com/apache/spark/commit/b8a22755bfdef8f1ab78016aea6914155ada67c1).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [WIP][SQL] Use map output statistices to improve global ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #71931 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/71931/testReport)** for PR 16677 at commit [`4fb5e40`](https://github.com/apache/spark/commit/4fb5e40d6aa77dafc0eb715730f5048a74d461d6).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77079/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/16677


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r197430376
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala ---
    @@ -193,6 +193,16 @@ case object SinglePartition extends Partitioning {
       }
     }
     
    +/**
    + * Represents a partitioning where rows are only serialized/deserialized locally. The number
    + * of partitions are not changed and also the distribution of rows. This is mainly used to
    + * obtain some statistics of map tasks such as number of outputs.
    + */
    +case class LocalPartitioning(orgPartition: Partitioning, numPartitions: Int) extends Partitioning {
    --- End diff --
    
    Ah. I see. Thanks for the clarifying. I agree that we might need to have specialized shuffle writing path at some point. Currently I think when we hit the sort based shuffle, this should not be worse performance than previous global limit operation. If you agree, I'd like to put it to some follow-ups.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [WIP][SQL] Use map output statistices to improve global ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    cc @rxin @wzhfy @scwf 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #90301 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90301/testReport)** for PR 16677 at commit [`062b8fd`](https://github.com/apache/spark/commit/062b8fd58ae13f252b1e6f61c70b69ed05521715).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1277/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #83247 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83247/testReport)** for PR 16677 at commit [`7598337`](https://github.com/apache/spark/commit/759833712a9be4b3f3f65cf4722ddd33851726e8).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r204580788
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala ---
    @@ -22,6 +22,7 @@ import scala.util.Random
     import org.apache.spark.sql.{DataFrame, Row}
     import org.apache.spark.sql.catalyst.dsl.expressions._
     import org.apache.spark.sql.catalyst.expressions.Literal
    +import org.apache.spark.sql.internal.SQLConf
    --- End diff --
    
    Oops, forgot to revert it. Thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93502/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...

Posted by wzhfy <gi...@git.apache.org>.
Github user wzhfy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r115644428
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala ---
    @@ -221,6 +221,12 @@ object ShuffleExchange {
               override def numPartitions: Int = 1
               override def getPartition(key: Any): Int = 0
             }
    +      case LocalPartitioning(prev, numParts) =>
    --- End diff --
    
    Nit: LocalPartitioning(_, numParts)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/405/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #77079 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77079/testReport)** for PR 16677 at commit [`55ee6b0`](https://github.com/apache/spark/commit/55ee6b0fb3bc9e6998b4098a369c54a15824e414).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #73662 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73662/testReport)** for PR 16677 at commit [`1a56252`](https://github.com/apache/spark/commit/1a5625222febf99bb89732d2cedd773923d5f6c6).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [WIP][SQL] Use map output statistices to improve global ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #71901 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/71901/testReport)** for PR 16677 at commit [`3dec117`](https://github.com/apache/spark/commit/3dec1173da9ec8d3d10e6aabf95bc33cc88a1587).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r97712511
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -90,25 +95,100 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
     }
     
     /**
    - * Take the first `limit` elements of each child partition, but do not collect or shuffle them.
    + * Take the `limit` elements of the child output.
      */
    -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +  override def output: Seq[Attribute] = child.output
     
       override def outputPartitioning: Partitioning = child.outputPartitioning
    -}
     
    -/**
    - * Take the first `limit` elements of the child's single output partition.
    - */
    -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
     
    -  override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
    +  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
     
    -  override def outputPartitioning: Partitioning = child.outputPartitioning
    +  protected override def doExecute(): RDD[InternalRow] = {
    +    val childRDD = child.execute()
    +    val partitioner = FakePartitioning(child.outputPartitioning,
    +      childRDD.getNumPartitions)
    +    val shuffleDependency = ShuffleExchange.prepareShuffleDependency(
    +      childRDD, child.output, partitioner, serializer)
    +    val numberOfOutput: Seq[Int] = if (shuffleDependency.rdd.getNumPartitions != 0) {
    +      // submitMapStage does not accept RDD with 0 partition.
    +      // So, we will not submit this dependency.
    +      val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency)
    +      submittedStageFuture.get().numberOfOutput.toSeq
    +    } else {
    +      Nil
    +    }
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +    // Try to keep child plan's original data parallelism or not. It is enabled by default.
    +    val respectChildParallelism = sqlContext.conf.enableParallelGlobalLimit
    +
    +    val sumOfOutput = numberOfOutput.sum
    +    if (sumOfOutput <= limit) {
    +      childRDD
    +    } else if (!respectChildParallelism) {
    +      // This is mainly for tests.
    +      // We take the rows of each partition until we reach the required limit number.
    +      var countForRows = 0
    +      val takeAmounts = new mutable.HashMap[Int, Int]()
    +      numberOfOutput.zipWithIndex.foreach { case (num, index) =>
    +        if (countForRows + num < limit) {
    +          countForRows += num
    +          takeAmounts += ((index, num))
    +        } else {
    +          val toTake = limit - countForRows
    +          countForRows += toTake
    +          takeAmounts += ((index, toTake))
    +        }
    +      }
    +      val shuffled = new ShuffledRowRDD(shuffleDependency)
    +      shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
    +        takeAmounts.get(index).map { size =>
    +          iter.take(size)
    +        }.getOrElse(iter)
    +      }
    +    } else {
    +      // We try to distribute the required limit number of rows across all child rdd's partitions.
    +      var numToReduce = (sumOfOutput - limit)
    +      val reduceAmounts = new mutable.HashMap[Int, Int]()
    +      val nonEmptyParts = numberOfOutput.filter(_ > 0).size
    +      val reducePerPart = numToReduce / nonEmptyParts
    +      numberOfOutput.zipWithIndex.foreach { case (num, index) =>
    +        if (num >= reducePerPart) {
    +          numToReduce -= reducePerPart
    +          reduceAmounts += ((index, reducePerPart))
    +        } else {
    +          numToReduce -= num
    +          reduceAmounts += ((index, num))
    +        }
    +      }
    +      while (numToReduce > 0) {
    +        numberOfOutput.zipWithIndex.foreach { case (num, index) =>
    +          val toReduce = if (numToReduce / nonEmptyParts > 0) {
    +            numToReduce / nonEmptyParts
    +          } else {
    +            numToReduce
    +          }
    +          if (num - reduceAmounts(index) >= toReduce) {
    +            reduceAmounts(index) = reduceAmounts(index) + toReduce
    +            numToReduce -= toReduce
    +          } else if (num - reduceAmounts(index) > 0) {
    +            reduceAmounts(index) = reduceAmounts(index) + 1
    +            numToReduce -= 1
    +          }
    +        }
    +      }
    +
    +      val shuffled = new ShuffledRowRDD(shuffleDependency)
    +      shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
    +        reduceAmounts.get(index).map { size =>
    +          iter.drop(size)
    +        }.getOrElse(iter)
    +      }
    +    }
    +  }
    --- End diff --
    
    For this, I am more conservative. Because currently there are no other operators using this feature. So I would tend to not change `ShuffleExchange` right now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r102891969
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala ---
    @@ -39,16 +40,18 @@ private[spark] sealed trait MapStatus {
        * necessary for correctness, since block fetchers are allowed to skip zero-size blocks.
        */
       def getSizeForBlock(reduceId: Int): Long
    +
    +  def numberOfOutput: Int
    --- End diff --
    
    The number of output may be greater than 2G?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

Posted by sujith71955 <gi...@git.apache.org>.
Github user sujith71955 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r204362254
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala ---
    @@ -231,6 +231,12 @@ object ShuffleExchangeExec {
               override def numPartitions: Int = 1
               override def getPartition(key: Any): Int = 0
             }
    +      case l: LocalPartitioning =>
    +        new Partitioner {
    +          override def numPartitions: Int = l.numPartitions
    +          override def getPartition(key: Any): Int = key.asInstanceOf[Int]
    +        }
    +
    --- End diff --
    
    nit :extra space


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #93478 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93478/testReport)** for PR 16677 at commit [`d05c144`](https://github.com/apache/spark/commit/d05c144aecdd57f4ee3d179a240ccafa6c02bb66).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #81627 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81627/testReport)** for PR 16677 at commit [`f2a7aac`](https://github.com/apache/spark/commit/f2a7aacc22caf27c1a8af612c9432586e6a86d17).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [WIP][SQL] Use map output statistices to improve global ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/71936/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [WIP][SQL] Use map output statistices to improve global ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #90301 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90301/testReport)** for PR 16677 at commit [`062b8fd`](https://github.com/apache/spark/commit/062b8fd58ae13f252b1e6f61c70b69ed05521715).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    I am not sure if I am missing something - the count's obtained are at map side output per (map-side) partition; while limit is being computed at reduce side (after some arbitrary partitioning/shuffle has been applied). The number of records per partition obtained from map side need not match what is at reduce side anymore.
    
    Ofcourse, I am not very familiar with spark sql, so could be wrong in my understanding.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/407/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Thanks @hvanhovell 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92339/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #92211 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92211/testReport)** for PR 16677 at commit [`a737573`](https://github.com/apache/spark/commit/a7375736b38b0955b47dc48e710114b3ae97234b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    ping @cloud-fan Any suggestions on this? Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    also cc @cloud-fan and @hvanhovell 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/75694/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #92369 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92369/testReport)** for PR 16677 at commit [`9792220`](https://github.com/apache/spark/commit/97922200f9d898fe4fca1930f3bea8a3d12325e8).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Although this has good improvement on `Limit` operation, looks like there is no much interests from the committers...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    @hvanhovell Shall we consider to include this into 2.4?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r218639483
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
     }
     
     /**
    - * Take the first `limit` elements of each child partition, but do not collect or shuffle them.
    + * Take the `limit` elements of the child output.
      */
    -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +  override def output: Seq[Attribute] = child.output
     
       override def outputPartitioning: Partitioning = child.outputPartitioning
    -}
     
    -/**
    - * Take the first `limit` elements of the child's single output partition.
    - */
    -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
     
    -  override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
    +  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
     
    -  override def outputPartitioning: Partitioning = child.outputPartitioning
    +  protected override def doExecute(): RDD[InternalRow] = {
    +    val childRDD = child.execute()
    +    val partitioner = LocalPartitioning(childRDD)
    +    val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
    +      childRDD, child.output, partitioner, serializer)
    +    val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) {
    +      // submitMapStage does not accept RDD with 0 partition.
    +      // So, we will not submit this dependency.
    +      val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency)
    +      submittedStageFuture.get().recordsByPartitionId.toSeq
    +    } else {
    +      Nil
    +    }
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +    // During global limit, try to evenly distribute limited rows across data
    +    // partitions. If disabled, scanning data partitions sequentially until reaching limit number.
    +    // Besides, if child output has certain ordering, we can't evenly pick up rows from
    +    // each parititon.
    +    val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil
    +
    +    val shuffled = new ShuffledRowRDD(shuffleDependency)
    +
    +    val sumOfOutput = numberOfOutput.sum
    +    if (sumOfOutput <= limit) {
    +      shuffled
    +    } else if (!flatGlobalLimit) {
    +      var numRowTaken = 0
    +      val takeAmounts = numberOfOutput.map { num =>
    +        if (numRowTaken + num < limit) {
    +          numRowTaken += num.toInt
    +          num.toInt
    +        } else {
    +          val toTake = limit - numRowTaken
    +          numRowTaken += toTake
    +          toTake
    +        }
    +      }
    +      val broadMap = sparkContext.broadcast(takeAmounts)
    +      shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
    +        iter.take(broadMap.value(index).toInt)
    +      }
    +    } else {
    +      // We try to evenly require the asked limit number of rows across all child rdd's partitions.
    +      var rowsNeedToTake: Long = limit
    +      val takeAmountByPartition: Array[Long] = Array.fill[Long](numberOfOutput.length)(0L)
    +      val remainingRowsByPartition: Array[Long] = Array(numberOfOutput: _*)
    +
    +      while (rowsNeedToTake > 0) {
    +        val nonEmptyParts = remainingRowsByPartition.count(_ > 0)
    +        // If the rows needed to take are less the number of non-empty partitions, take one row from
    +        // each non-empty partitions until we reach `limit` rows.
    +        // Otherwise, evenly divide the needed rows to each non-empty partitions.
    +        val takePerPart = math.max(1, rowsNeedToTake / nonEmptyParts)
    +        remainingRowsByPartition.zipWithIndex.foreach { case (num, index) =>
    +          // In case `rowsNeedToTake` < `nonEmptyParts`, we may run out of `rowsNeedToTake` during
    +          // the traversal, so we need to add this check.
    +          if (rowsNeedToTake > 0 && num > 0) {
    +            if (num >= takePerPart) {
    +              rowsNeedToTake -= takePerPart
    +              takeAmountByPartition(index) += takePerPart
    +              remainingRowsByPartition(index) -= takePerPart
    +            } else {
    +              rowsNeedToTake -= num
    +              takeAmountByPartition(index) += num
    +              remainingRowsByPartition(index) -= num
    +            }
    +          }
    +        }
    +      }
    +      val broadMap = sparkContext.broadcast(takeAmountByPartition)
    --- End diff --
    
    Because we want the map to be sent to each node just only once?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    @rxin Thanks for the comment. I will improve the document in a pr.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r197117604
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala ---
    @@ -193,6 +193,16 @@ case object SinglePartition extends Partitioning {
       }
     }
     
    +/**
    + * Represents a partitioning where rows are only serialized/deserialized locally. The number
    + * of partitions are not changed and also the distribution of rows. This is mainly used to
    + * obtain some statistics of map tasks such as number of outputs.
    + */
    +case class LocalPartitioning(orgPartition: Partitioning, numPartitions: Int) extends Partitioning {
    --- End diff --
    
    This might be expensive as soon as we hit the sort based shuffle. Perhaps we should carve out some specialized shuffle writing path for this at some point. You basically only need to write to a single file and your done. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/505/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #92206 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92206/testReport)** for PR 16677 at commit [`59a3029`](https://github.com/apache/spark/commit/59a30294e5cc043844d49dc2e2315f36723b5e5f).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r198746910
  
    --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java ---
    @@ -145,10 +145,12 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
         // included in the shuffle write time.
         writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
     
    +    long numOfRecords = 0;
         while (records.hasNext()) {
           final Product2<K, V> record = records.next();
           final K key = record._1();
           partitionWriters[partitioner.getPartition(key)].write(key, record._2());
    +      numOfRecords += 1;
    --- End diff --
    
    To verify it, I ran a test locally. Looks like `writeMetrics_recordsWritten` is well updated in `SortShuffleWriter` path too. So I will replace `numOfRecords` with `writeMetrics_recordsWritten`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    If there is some codepath not updating shuffle write metrics (introduced
    for sql), that would be a bug.
    
    On Sat, Jun 23, 2018 at 7:27 AM Liang-Chi Hsieh <no...@github.com>
    wrote:
    
    > *@viirya* commented on this pull request.
    > ------------------------------
    >
    > In
    > core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
    > <https://github.com/apache/spark/pull/16677#discussion_r197613554>:
    >
    > >      while (records.hasNext()) {
    >        final Product2<K, V> record = records.next();
    >        final K key = record._1();
    >        partitionWriters[partitioner.getPartition(key)].write(key, record._2());
    > +      numOfRecords += 1;
    >
    > Hmm, I think it is fine. However, maybe I miss it, but I can't find
    > SortShuffleWriter has updated writeMetrics_recordsWritten?
    >
    > —
    > You are receiving this because you commented.
    > Reply to this email directly, view it on GitHub
    > <https://github.com/apache/spark/pull/16677#discussion_r197613554>, or mute
    > the thread
    > <https://github.com/notifications/unsubscribe-auth/ABhJlAGkrY00kcOkccVWLytJxBqOFPVjks5t_lBdgaJpZM4Lqyoi>
    > .
    >



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r197613004
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
    @@ -204,6 +204,13 @@ object SQLConf {
         .intConf
         .createWithDefault(4)
     
    +  val LIMIT_FLAT_GLOBAL_LIMIT = buildConf("spark.sql.limit.flatGlobalLimit")
    +    .internal()
    +    .doc("During global limit, try to evenly distribute limited rows across data " +
    +      "partitions. If disabled, scanning data partitions sequentially until reaching limit number.")
    +    .booleanConf
    +    .createWithDefault(true)
    --- End diff --
    
    I set this as true. One reason is to see if it can pass existing tests. If we don't feel confident or worry about behavior change, we can set this to false before merging.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [WIP][SQL] Use map output statistices to improve global ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/71848/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...

Posted by sujith71955 <gi...@git.apache.org>.
Github user sujith71955 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r100033968
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -90,25 +95,101 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
     }
     
     /**
    - * Take the first `limit` elements of each child partition, but do not collect or shuffle them.
    + * Take the `limit` elements of the child output.
      */
    -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +  override def output: Seq[Attribute] = child.output
     
       override def outputPartitioning: Partitioning = child.outputPartitioning
    -}
     
    -/**
    - * Take the first `limit` elements of the child's single output partition.
    - */
    -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +
    +  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
    +
    +  protected override def doExecute(): RDD[InternalRow] = {
    +    val childRDD = child.execute()
    +    val partitioner = LocalPartitioning(child.outputPartitioning,
    +      childRDD.getNumPartitions)
    +    val shuffleDependency = ShuffleExchange.prepareShuffleDependency(
    +      childRDD, child.output, partitioner, serializer)
    +    val numberOfOutput: Seq[Int] = if (shuffleDependency.rdd.getNumPartitions != 0) {
    +      // submitMapStage does not accept RDD with 0 partition.
    +      // So, we will not submit this dependency.
    +      val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency)
    +      submittedStageFuture.get().numberOfOutput.toSeq
    +    } else {
    +      Nil
    +    }
     
    -  override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
    +    // Try to keep child plan's original data parallelism or not. It is enabled by default.
    +    val respectChildParallelism = sqlContext.conf.enableParallelGlobalLimit
     
    -  override def outputPartitioning: Partitioning = child.outputPartitioning
    +    val shuffled = new ShuffledRowRDD(shuffleDependency)
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +    val sumOfOutput = numberOfOutput.sum
    +    if (sumOfOutput <= limit) {
    +      shuffled
    +    } else if (!respectChildParallelism) {
    +      // This is mainly for tests.
    +      // We take the rows of each partition until we reach the required limit number.
    +      var countForRows = 0
    +      val takeAmounts = new mutable.HashMap[Int, Int]()
    +      numberOfOutput.zipWithIndex.foreach { case (num, index) =>
    +        if (countForRows + num < limit) {
    +          countForRows += num
    +          takeAmounts += ((index, num))
    +        } else {
    +          val toTake = limit - countForRows
    +          countForRows += toTake
    +          takeAmounts += ((index, toTake))
    +        }
    +      }
    +      val broadMap = sparkContext.broadcast(takeAmounts)
    +      shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
    +        broadMap.value.get(index).map { size =>
    +          iter.take(size)
    +        }.get
    +      }
    +    } else {
    +      // We try to distribute the required limit number of rows across all child rdd's partitions.
    +      var numToReduce = (sumOfOutput - limit)
    +      val reduceAmounts = new mutable.HashMap[Int, Int]()
    +      val nonEmptyParts = numberOfOutput.filter(_ > 0).size
    +      val reducePerPart = numToReduce / nonEmptyParts
    +      numberOfOutput.zipWithIndex.foreach { case (num, index) =>
    +        if (num >= reducePerPart) {
    +          numToReduce -= reducePerPart
    +          reduceAmounts += ((index, reducePerPart))
    +        } else {
    +          numToReduce -= num
    +          reduceAmounts += ((index, num))
    +        }
    +      }
    +      while (numToReduce > 0) {
    +        numberOfOutput.zipWithIndex.foreach { case (num, index) =>
    +          val toReduce = if (numToReduce / nonEmptyParts > 0) {
    +            numToReduce / nonEmptyParts
    +          } else {
    +            numToReduce
    +          }
    +          if (num - reduceAmounts(index) >= toReduce) {
    +            reduceAmounts(index) = reduceAmounts(index) + toReduce
    +            numToReduce -= toReduce
    +          } else if (num - reduceAmounts(index) > 0) {
    +            reduceAmounts(index) = reduceAmounts(index) + 1
    +            numToReduce -= 1
    +          }
    +        }
    +      }
    +      val broadMap = sparkContext.broadcast(reduceAmounts)
    +      shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
    +        broadMap.value.get(index).map { size =>
    +          iter.drop(size)
    --- End diff --
    
    @viirya  just need one clarification, as per the above logic we always try to find reduce amount and try to drop the amount from respective iterators , but what if the limit value is less and the record count is more, then we need to drop more data from each iterator right, in this scenario i think take() will be better. whats your point of view on this scenario.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r99465564
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
    @@ -216,7 +216,7 @@ class PlannerSuite extends SharedSQLContext {
               ).queryExecution.executedPlan.collect {
                 case exchange: ShuffleExchange => exchange
               }.length
    -          assert(numExchanges === 5)
    +          assert(numExchanges === 3)
    --- End diff --
    
    Do you apply all changes? I can't reproduce it and Jenkins tests are passed too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #90452 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90452/testReport)** for PR 16677 at commit [`5594bf9`](https://github.com/apache/spark/commit/5594bf9f13aa83d05a433bad0fd366daabd2d034).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r218631745
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
     }
     
     /**
    - * Take the first `limit` elements of each child partition, but do not collect or shuffle them.
    + * Take the `limit` elements of the child output.
      */
    -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +  override def output: Seq[Attribute] = child.output
     
       override def outputPartitioning: Partitioning = child.outputPartitioning
    -}
     
    -/**
    - * Take the first `limit` elements of the child's single output partition.
    - */
    -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
     
    -  override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
    +  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
     
    -  override def outputPartitioning: Partitioning = child.outputPartitioning
    +  protected override def doExecute(): RDD[InternalRow] = {
    +    val childRDD = child.execute()
    +    val partitioner = LocalPartitioning(childRDD)
    +    val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
    +      childRDD, child.output, partitioner, serializer)
    +    val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) {
    +      // submitMapStage does not accept RDD with 0 partition.
    +      // So, we will not submit this dependency.
    +      val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency)
    +      submittedStageFuture.get().recordsByPartitionId.toSeq
    +    } else {
    +      Nil
    +    }
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +    // During global limit, try to evenly distribute limited rows across data
    +    // partitions. If disabled, scanning data partitions sequentially until reaching limit number.
    +    // Besides, if child output has certain ordering, we can't evenly pick up rows from
    +    // each parititon.
    +    val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil
    +
    +    val shuffled = new ShuffledRowRDD(shuffleDependency)
    +
    +    val sumOfOutput = numberOfOutput.sum
    +    if (sumOfOutput <= limit) {
    +      shuffled
    +    } else if (!flatGlobalLimit) {
    +      var numRowTaken = 0
    +      val takeAmounts = numberOfOutput.map { num =>
    +        if (numRowTaken + num < limit) {
    +          numRowTaken += num.toInt
    +          num.toInt
    +        } else {
    +          val toTake = limit - numRowTaken
    +          numRowTaken += toTake
    +          toTake
    +        }
    +      }
    +      val broadMap = sparkContext.broadcast(takeAmounts)
    +      shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
    +        iter.take(broadMap.value(index).toInt)
    +      }
    +    } else {
    +      // We try to evenly require the asked limit number of rows across all child rdd's partitions.
    +      var rowsNeedToTake: Long = limit
    +      val takeAmountByPartition: Array[Long] = Array.fill[Long](numberOfOutput.length)(0L)
    +      val remainingRowsByPartition: Array[Long] = Array(numberOfOutput: _*)
    +
    +      while (rowsNeedToTake > 0) {
    +        val nonEmptyParts = remainingRowsByPartition.count(_ > 0)
    +        // If the rows needed to take are less the number of non-empty partitions, take one row from
    +        // each non-empty partitions until we reach `limit` rows.
    +        // Otherwise, evenly divide the needed rows to each non-empty partitions.
    +        val takePerPart = math.max(1, rowsNeedToTake / nonEmptyParts)
    +        remainingRowsByPartition.zipWithIndex.foreach { case (num, index) =>
    +          // In case `rowsNeedToTake` < `nonEmptyParts`, we may run out of `rowsNeedToTake` during
    +          // the traversal, so we need to add this check.
    +          if (rowsNeedToTake > 0 && num > 0) {
    +            if (num >= takePerPart) {
    +              rowsNeedToTake -= takePerPart
    +              takeAmountByPartition(index) += takePerPart
    +              remainingRowsByPartition(index) -= takePerPart
    +            } else {
    +              rowsNeedToTake -= num
    +              takeAmountByPartition(index) += num
    +              remainingRowsByPartition(index) -= num
    +            }
    +          }
    +        }
    +      }
    +      val broadMap = sparkContext.broadcast(takeAmountByPartition)
    --- End diff --
    
    btw why do we need to broadcast this?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    @sujith71955 Thanks. I see. The case is somehow different with the problem this PR wants to solve. But I think it is a reasonable use case. May you want to create a ticket for us to track it?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r115925532
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputStatistics.scala ---
    @@ -23,5 +23,9 @@ package org.apache.spark
      * @param shuffleId ID of the shuffle
      * @param bytesByPartitionId approximate number of output bytes for each map output partition
      *   (may be inexact due to use of compressed map statuses)
    + * @param numberOfOutput number of output for each pre-map output partition
    --- End diff --
    
    It is different with `bytesByPartitionId`. It is not the number of records for each reducer. Rather said, it is the number of records for each map task. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by wzhfy <gi...@git.apache.org>.
Github user wzhfy commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    @hvanhovell @cloud-fan We have seen value of this PR in our customer scenarios, and that's why we started a discussion in dev list before. And thank @viirya to discuss with us and implement it.   
    It has been a while (> two months) since your last comments, do you have time to review this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r197613554
  
    --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java ---
    @@ -145,10 +145,12 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
         // included in the shuffle write time.
         writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
     
    +    long numOfRecords = 0;
         while (records.hasNext()) {
           final Product2<K, V> record = records.next();
           final K key = record._1();
           partitionWriters[partitioner.getPartition(key)].write(key, record._2());
    +      numOfRecords += 1;
    --- End diff --
    
    Hmm, I think it is fine. However, maybe I miss it, but I can't find `SortShuffleWriter` has updated `writeMetrics_recordsWritten`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90293/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...

Posted by wzhfy <gi...@git.apache.org>.
Github user wzhfy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r115642947
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputStatistics.scala ---
    @@ -23,5 +23,9 @@ package org.apache.spark
      * @param shuffleId ID of the shuffle
      * @param bytesByPartitionId approximate number of output bytes for each map output partition
      *   (may be inexact due to use of compressed map statuses)
    + * @param numberOfOutput number of output for each pre-map output partition
    --- End diff --
    
    What does pre-map output partition mean? `numberOfOutput` is the same name in `MapStatus`, rename it `recordsByPartitionId`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #77054 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77054/testReport)** for PR 16677 at commit [`55ee6b0`](https://github.com/apache/spark/commit/55ee6b0fb3bc9e6998b4098a369c54a15824e414).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73662/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #94220 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94220/testReport)** for PR 16677 at commit [`69513d1`](https://github.com/apache/spark/commit/69513d166ee56587d7b039b5d9645299785dcb77).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1789/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74086/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [WIP][SQL] Use map output statistices to improve global ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #71842 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/71842/testReport)** for PR 16677 at commit [`5dae2da`](https://github.com/apache/spark/commit/5dae2da0d83a4ae34b4d2138156eea5ea69fc3ea).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [WIP][SQL] Use map output statistices to improve global ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #72373 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72373/testReport)** for PR 16677 at commit [`b049cc4`](https://github.com/apache/spark/commit/b049cc4a4c83b308eb9aefa225435471b90a5f02).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    @viirya Could you please bring this up to date?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #71955 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/71955/testReport)** for PR 16677 at commit [`7f89c30`](https://github.com/apache/spark/commit/7f89c305f8ddd595fd752f7a8c238d23ec796895).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r103134610
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -90,25 +95,101 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
     }
     
     /**
    - * Take the first `limit` elements of each child partition, but do not collect or shuffle them.
    + * Take the `limit` elements of the child output.
      */
    -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +  override def output: Seq[Attribute] = child.output
     
       override def outputPartitioning: Partitioning = child.outputPartitioning
    -}
     
    -/**
    - * Take the first `limit` elements of the child's single output partition.
    - */
    -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
    +  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +
    +  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
    +
    +  protected override def doExecute(): RDD[InternalRow] = {
    +    val childRDD = child.execute()
    +    val partitioner = LocalPartitioning(child.outputPartitioning,
    +      childRDD.getNumPartitions)
    +    val shuffleDependency = ShuffleExchange.prepareShuffleDependency(
    +      childRDD, child.output, partitioner, serializer)
    +    val numberOfOutput: Seq[Int] = if (shuffleDependency.rdd.getNumPartitions != 0) {
    +      // submitMapStage does not accept RDD with 0 partition.
    +      // So, we will not submit this dependency.
    +      val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency)
    +      submittedStageFuture.get().numberOfOutput.toSeq
    +    } else {
    +      Nil
    +    }
     
    -  override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
    +    // Try to keep child plan's original data parallelism or not. It is enabled by default.
    +    val respectChildParallelism = sqlContext.conf.enableParallelGlobalLimit
     
    -  override def outputPartitioning: Partitioning = child.outputPartitioning
    +    val shuffled = new ShuffledRowRDD(shuffleDependency)
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +    val sumOfOutput = numberOfOutput.sum
    +    if (sumOfOutput <= limit) {
    +      shuffled
    +    } else if (!respectChildParallelism) {
    +      // This is mainly for tests.
    +      // We take the rows of each partition until we reach the required limit number.
    +      var countForRows = 0
    +      val takeAmounts = new mutable.HashMap[Int, Int]()
    +      numberOfOutput.zipWithIndex.foreach { case (num, index) =>
    +        if (countForRows + num < limit) {
    +          countForRows += num
    +          takeAmounts += ((index, num))
    +        } else {
    +          val toTake = limit - countForRows
    +          countForRows += toTake
    +          takeAmounts += ((index, toTake))
    +        }
    +      }
    +      val broadMap = sparkContext.broadcast(takeAmounts)
    +      shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
    +        broadMap.value.get(index).map { size =>
    +          iter.take(size)
    +        }.get
    +      }
    +    } else {
    +      // We try to distribute the required limit number of rows across all child rdd's partitions.
    +      var numToReduce = (sumOfOutput - limit)
    +      val reduceAmounts = new mutable.HashMap[Int, Int]()
    +      val nonEmptyParts = numberOfOutput.filter(_ > 0).size
    +      val reducePerPart = numToReduce / nonEmptyParts
    +      numberOfOutput.zipWithIndex.foreach { case (num, index) =>
    +        if (num >= reducePerPart) {
    +          numToReduce -= reducePerPart
    +          reduceAmounts += ((index, reducePerPart))
    +        } else {
    +          numToReduce -= num
    +          reduceAmounts += ((index, num))
    +        }
    +      }
    +      while (numToReduce > 0) {
    +        numberOfOutput.zipWithIndex.foreach { case (num, index) =>
    +          val toReduce = if (numToReduce / nonEmptyParts > 0) {
    +            numToReduce / nonEmptyParts
    +          } else {
    +            numToReduce
    +          }
    +          if (num - reduceAmounts(index) >= toReduce) {
    +            reduceAmounts(index) = reduceAmounts(index) + toReduce
    +            numToReduce -= toReduce
    +          } else if (num - reduceAmounts(index) > 0) {
    +            reduceAmounts(index) = reduceAmounts(index) + 1
    +            numToReduce -= 1
    +          }
    +        }
    +      }
    +      val broadMap = sparkContext.broadcast(reduceAmounts)
    +      shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
    +        broadMap.value.get(index).map { size =>
    +          iter.drop(size)
    --- End diff --
    
    It sounds good. If we can add additional logic for take/drop decision without making the section of code more complex, we can give it a try.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by watermen <gi...@git.apache.org>.
Github user watermen commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    @viirya We'd better don't modify the API and in `TaskMetrics` already has `resultSize`, we can add `resultNum` like it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #92205 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92205/testReport)** for PR 16677 at commit [`ca00701`](https://github.com/apache/spark/commit/ca00701ee1dbd18fe5b45f2fbfca80242da366f2).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92421/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #90311 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90311/testReport)** for PR 16677 at commit [`a691e88`](https://github.com/apache/spark/commit/a691e885b1f304ba4037964e2fba09c540503e1a).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    I set up a test PR for `VersionsSuite` at #22046.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #71977 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/71977/testReport)** for PR 16677 at commit [`4e31bb7`](https://github.com/apache/spark/commit/4e31bb7959cb774b51d6d8662f53a3ad96b4dc49).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #76809 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/76809/testReport)** for PR 16677 at commit [`867a93d`](https://github.com/apache/spark/commit/867a93d01a7f83f4ea0e43a77bebee09727b96e1).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #83249 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83249/testReport)** for PR 16677 at commit [`e53648e`](https://github.com/apache/spark/commit/e53648e7f58f439bb09a702521c2f84cf2e344bd).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72373/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistices to improve...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/78390/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    **[Test build #93789 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93789/testReport)** for PR 16677 at commit [`d05c144`](https://github.com/apache/spark/commit/d05c144aecdd57f4ee3d179a240ccafa6c02bb66).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/16677
  
    retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org