You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by gczsjdy <gi...@git.apache.org> on 2017/11/16 04:21:41 UTC

[GitHub] spark pull request #19763: [SPARK-22537] Aggregation of map output statistic...

GitHub user gczsjdy opened a pull request:

    https://github.com/apache/spark/pull/19763

    [SPARK-22537] Aggregation of map output statistics on driver faces single point bottleneck

    ## What changes were proposed in this pull request?
    
    In adaptive execution, the map output statistics of all mappers will be aggregated after previous stage is successfully executed. Driver takes the aggregation job while it will get slow when the number of `mapper * shuffle partitions` is large, since it only uses single thread to compute. This PR uses multi-thread to deal with this single point bottleneck.
    
    ## How was this patch tested?
    
    Test cases are in `MapOutputTrackerSuite.scala`


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/gczsjdy/spark single_point_mapstatistics

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19763.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19763
    
----
commit 5dd04872e983de861a301c22a124dd8923ccc8c6
Author: GuoChenzhao <ch...@intel.com>
Date:   2017-11-16T02:58:22Z

    Use multi-thread to solve single point bottleneck

commit 819774fc7087c51a4b7b03213bfb330331d6f108
Author: GuoChenzhao <ch...@intel.com>
Date:   2017-11-16T03:01:21Z

    Add test case

commit da028258bd172b6d3ff89504097fb6651f5c05c0
Author: GuoChenzhao <ch...@intel.com>
Date:   2017-11-16T03:24:47Z

    Style

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on the issue:

    https://github.com/apache/spark/pull/19763
  
    my question is "how many times we have seen this operation of collecting statistics is the bottleneck?"


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19763
  
    cc @zsxwing 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19763
  
    **[Test build #84134 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84134/testReport)** for PR 19763 at commit [`72c3d97`](https://github.com/apache/spark/commit/72c3d97e6e2f2c50504c5e4d8b80ea595797b044).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152908363
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -485,4 +485,13 @@ package object config {
             "array in the sorter.")
           .intConf
           .createWithDefault(Integer.MAX_VALUE)
    +
    +  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
    +    ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
    +      .internal()
    +      .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " +
    +        "or equal to this threshold.")
    --- End diff --
    
    From above `statuses.length.toLong * totalSizes.length / parallelAggThreshold + 1`, looks like we need to have at least two times of this threshold to enable this parallel aggregation?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152921579
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
         shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
       }
     
    +  /**
    +   * Grouped function of Range, this is to avoid traverse of all elements of Range using
    +   * IterableLike's grouped function.
    +   */
    +  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
    +    val start = range.start
    +    val step = range.step
    +    val end = range.end
    +    for (i <- start.until(end, size * step)) yield {
    +      i.until(i + size * step, step)
    +    }
    +  }
    +
    +  /**
    +   * To equally divide n elements into m buckets, basically each bucket should have n/m elements,
    +   * for the remaining n%m elements, add one more element to the first n%m buckets each.
    +   */
    +  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
    +    val elementsPerBucket = numElements / numBuckets
    +    val remaining = numElements % numBuckets
    +    val splitPoint = (elementsPerBucket + 1) * remaining
    +    if (elementsPerBucket == 0) {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
    +    } else {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
    +        rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
    +    }
    +  }
    +
       /**
        * Return statistics about all of the outputs for a given shuffle.
        */
       def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
         shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
           val totalSizes = new Array[Long](dep.partitioner.numPartitions)
    -      for (s <- statuses) {
    -        for (i <- 0 until totalSizes.length) {
    -          totalSizes(i) += s.getSizeForBlock(i)
    +      val parallelAggThreshold = conf.get(
    +        SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
    --- End diff --
    
    Yeah, I left the comment before https://github.com/apache/spark/pull/19763#discussion_r152914613. I think it is good enough to add more comment to the config entry.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152912363
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -485,4 +485,13 @@ package object config {
             "array in the sorter.")
           .intConf
           .createWithDefault(Integer.MAX_VALUE)
    +
    +  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
    +    ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
    +      .internal()
    +      .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " +
    +        "or equal to this threshold.")
    --- End diff --
    
    It's ok. I misread the equation. Nvm.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by gczsjdy <gi...@git.apache.org>.
Github user gczsjdy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152921091
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
         shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
       }
     
    +  /**
    +   * Grouped function of Range, this is to avoid traverse of all elements of Range using
    +   * IterableLike's grouped function.
    +   */
    +  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
    +    val start = range.start
    +    val step = range.step
    +    val end = range.end
    +    for (i <- start.until(end, size * step)) yield {
    +      i.until(i + size * step, step)
    +    }
    +  }
    +
    +  /**
    +   * To equally divide n elements into m buckets, basically each bucket should have n/m elements,
    +   * for the remaining n%m elements, add one more element to the first n%m buckets each.
    +   */
    +  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
    +    val elementsPerBucket = numElements / numBuckets
    +    val remaining = numElements % numBuckets
    +    val splitPoint = (elementsPerBucket + 1) * remaining
    +    if (elementsPerBucket == 0) {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
    +    } else {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
    +        rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
    +    }
    +  }
    +
       /**
        * Return statistics about all of the outputs for a given shuffle.
        */
       def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
         shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
           val totalSizes = new Array[Long](dep.partitioner.numPartitions)
    -      for (s <- statuses) {
    -        for (i <- 0 until totalSizes.length) {
    -          totalSizes(i) += s.getSizeForBlock(i)
    +      val parallelAggThreshold = conf.get(
    +        SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
    --- End diff --
    
    I think that code will make people confused, and we need more comments to explain, that seems unworthy. 
    In most cases the default value is enough, so we just add some assertion and docs explanation will be good?  


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152911829
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -485,4 +485,13 @@ package object config {
             "array in the sorter.")
           .intConf
           .createWithDefault(Integer.MAX_VALUE)
    +
    +  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
    +    ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
    +      .internal()
    +      .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " +
    +        "or equal to this threshold.")
    --- End diff --
    
    Says `statuses.length.toLong * totalSizes.length` is `10010000`, for example:
    
    ```scala
    scala> 10010000 / 10000001
    res0: Int = 1
    ```
    
    Now, it is more than the threshold, but the parallel aggregation is not enabled...



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by gczsjdy <gi...@git.apache.org>.
Github user gczsjdy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152017736
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -485,4 +485,13 @@ package object config {
             "array in the sorter.")
           .intConf
           .createWithDefault(Integer.MAX_VALUE)
    +
    +  private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
    --- End diff --
    
    There is also a `spark.shuffle.mapOutput.dispatcher.numThreads` in this file without config entry, do I need to add one?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by gczsjdy <gi...@git.apache.org>.
Github user gczsjdy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152827467
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
         shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
       }
     
    +  /**
    +   * Grouped function of Range, this is to avoid traverse of all elements of Range using
    +   * IterableLike's grouped function.
    +   */
    +  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
    +    val start = range.start
    +    val step = range.step
    +    val end = range.end
    +    for (i <- start.until(end, size * step)) yield {
    +      i.until(i + size * step, step)
    +    }
    +  }
    +
    +  /**
    +   * To equally divide n elements into m buckets, basically each bucket should have n/m elements,
    +   * for the remaining n%m elements, add one more element to the first n%m buckets each.
    +   */
    +  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
    +    val elementsPerBucket = numElements / numBuckets
    +    val remaining = numElements % numBuckets
    +    val splitPoint = (elementsPerBucket + 1) * remaining
    +    if (elementsPerBucket == 0) {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
    +    } else {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
    +        rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
    +    }
    +  }
    +
       /**
        * Return statistics about all of the outputs for a given shuffle.
        */
       def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
         shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
           val totalSizes = new Array[Long](dep.partitioner.numPartitions)
    -      for (s <- statuses) {
    -        for (i <- 0 until totalSizes.length) {
    -          totalSizes(i) += s.getSizeForBlock(i)
    +      val parallelAggThreshold = conf.get(
    +        SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
    +      val parallelism = math.min(
    +        Runtime.getRuntime.availableProcessors(),
    +        statuses.length * totalSizes.length / parallelAggThreshold + 1)
    +      if (parallelism <= 1) {
    +        for (s <- statuses) {
    +          for (i <- 0 until totalSizes.length) {
    +            totalSizes(i) += s.getSizeForBlock(i)
    +          }
    +        }
    +      } else {
    +        try {
    +          val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
    +          implicit val executionContext = ExecutionContext.fromExecutor(threadPool)
    +          val mapStatusSubmitTasks = equallyDivide(totalSizes.length, parallelism).map {
    +            reduceIds => Future {
    +              for (s <- statuses; i <- reduceIds) {
    +                totalSizes(i) += s.getSizeForBlock(i)
    +              }
    +            }
    +          }
    +          ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), Duration.Inf)
    +        } finally {
    +          threadpool.shutdown()
    --- End diff --
    
    @zsxwing Actually I built using sbt/mvn, no errors...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r151962448
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -485,4 +485,13 @@ package object config {
             "array in the sorter.")
           .intConf
           .createWithDefault(Integer.MAX_VALUE)
    +
    +  private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
    --- End diff --
    
    `spark.adaptive.map.statistics.cores` should also be a config entry like this


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19763: [SPARK-22537] Aggregation of map output statistics on dr...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19763
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by gczsjdy <gi...@git.apache.org>.
Github user gczsjdy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r151921740
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -485,4 +485,13 @@ package object config {
             "array in the sorter.")
           .intConf
           .createWithDefault(Integer.MAX_VALUE)
    +
    +  private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_MULTITHREAD_THRESHOLD =
    +    ConfigBuilder("spark.shuffle.mapOutputStatisticsMultithreadThreshold")
    --- End diff --
    
    Yes, it's better!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/19763
  
    LGTM


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by gczsjdy <gi...@git.apache.org>.
Github user gczsjdy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152911325
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -485,4 +485,13 @@ package object config {
             "array in the sorter.")
           .intConf
           .createWithDefault(Integer.MAX_VALUE)
    +
    +  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
    +    ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
    +      .internal()
    +      .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " +
    +        "or equal to this threshold.")
    --- End diff --
    
    `statuses.length.toLong * totalSizes.length / parallelAggThreshold + 1` >= 2 -> `statuses.length.toLong * totalSizes.length >= parallelAggThreshold`, so it doesn't need to be 2 times, just not smaller than 1x is good.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by gczsjdy <gi...@git.apache.org>.
Github user gczsjdy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152016240
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -472,16 +475,45 @@ private[spark] class MapOutputTrackerMaster(
         shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
       }
     
    +  /**
    +   * Try to equally divide Range(0, num) to divisor slices
    +   */
    +  def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
    +    assert(divisor > 0, "Divisor should be positive")
    +    val (each, remain) = (num / divisor, num % divisor)
    +    val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
    +    if (each != 0) {
    +      smaller.grouped(each) ++ bigger.grouped(each + 1)
    +    } else {
    +      bigger.grouped(each + 1)
    +    }
    +  }
    +
       /**
        * Return statistics about all of the outputs for a given shuffle.
        */
       def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
         shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
           val totalSizes = new Array[Long](dep.partitioner.numPartitions)
    -      for (s <- statuses) {
    -        for (i <- 0 until totalSizes.length) {
    -          totalSizes(i) += s.getSizeForBlock(i)
    +      if (statuses.length * totalSizes.length <=
    +        conf.get(SHUFFLE_MAP_OUTPUT_STATISTICS_MULTITHREAD_THRESHOLD)) {
    +        for (s <- statuses) {
    +          for (i <- 0 until totalSizes.length) {
    +            totalSizes(i) += s.getSizeForBlock(i)
    +          }
    +        }
    +      } else {
    +        val parallelism = conf.getInt("spark.adaptive.map.statistics.cores", 8)
    --- End diff --
    
    I thought only adaptive execution code will call this. But actually it seems after all `ShuffleMapTask`s of a stage completed this will be called, right?  


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r151801786
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -472,17 +474,36 @@ private[spark] class MapOutputTrackerMaster(
         shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
       }
     
    +  /**
    +   * Try to equally divide Range(0, num) to divisor slices
    +   */
    +  def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
    +    assert(divisor > 0, "Divisor should be positive")
    +    val (each, remain) = (num / divisor, num % divisor)
    +    val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
    +    if (each != 0) {
    +      smaller.grouped(each) ++ bigger.grouped(each + 1)
    +    } else {
    +      bigger.grouped(each + 1)
    +    }
    +  }
    +
       /**
        * Return statistics about all of the outputs for a given shuffle.
        */
       def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
         shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
           val totalSizes = new Array[Long](dep.partitioner.numPartitions)
    -      for (s <- statuses) {
    -        for (i <- 0 until totalSizes.length) {
    -          totalSizes(i) += s.getSizeForBlock(i)
    +      val parallelism = conf.getInt("spark.adaptive.map.statistics.cores", 8)
    +
    +      val mapStatusSubmitTasks = equallyDivide(totalSizes.length, parallelism).map {
    --- End diff --
    
    Doing this is not cheap. I would add a config and only run this in multiple threads when `#mapper * #shuffle_partitions` is large.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152035574
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -485,4 +485,13 @@ package object config {
             "array in the sorter.")
           .intConf
           .createWithDefault(Integer.MAX_VALUE)
    +
    +  private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
    --- End diff --
    
    yea let's add it. BTW shall we also use `mapOutput` instead of `mapOutputStatistics`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152911936
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -485,4 +485,13 @@ package object config {
             "array in the sorter.")
           .intConf
           .createWithDefault(Integer.MAX_VALUE)
    +
    +  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
    +    ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
    +      .internal()
    +      .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " +
    +        "or equal to this threshold.")
    --- End diff --
    
    Oh, I see...nvm. I misread the `+ 1`...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152914613
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -485,4 +485,13 @@ package object config {
             "array in the sorter.")
           .intConf
           .createWithDefault(Integer.MAX_VALUE)
    +
    +  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
    +    ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
    +      .internal()
    +      .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " +
    +        "or equal to this threshold.")
    --- End diff --
    
    After rethinking about this, I think it is better to indicate this threshold also determines the number of threads in parallelism. So it should not be set to zero or negative number.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by gczsjdy <gi...@git.apache.org>.
Github user gczsjdy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152907079
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -485,4 +485,13 @@ package object config {
             "array in the sorter.")
           .intConf
           .createWithDefault(Integer.MAX_VALUE)
    +
    +  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
    +    ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
    +      .internal()
    +      .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " +
    +        "or equal to this threshold.")
    --- End diff --
    
    Sorry, but didn't get you.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152870781
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
         shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
       }
     
    +  /**
    +   * Grouped function of Range, this is to avoid traverse of all elements of Range using
    +   * IterableLike's grouped function.
    +   */
    +  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
    +    val start = range.start
    +    val step = range.step
    +    val end = range.end
    +    for (i <- start.until(end, size * step)) yield {
    +      i.until(i + size * step, step)
    +    }
    +  }
    +
    +  /**
    +   * To equally divide n elements into m buckets, basically each bucket should have n/m elements,
    +   * for the remaining n%m elements, add one more element to the first n%m buckets each.
    +   */
    +  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
    +    val elementsPerBucket = numElements / numBuckets
    +    val remaining = numElements % numBuckets
    +    val splitPoint = (elementsPerBucket + 1) * remaining
    +    if (elementsPerBucket == 0) {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
    +    } else {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
    +        rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
    +    }
    +  }
    +
       /**
        * Return statistics about all of the outputs for a given shuffle.
        */
       def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
         shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
           val totalSizes = new Array[Long](dep.partitioner.numPartitions)
    -      for (s <- statuses) {
    -        for (i <- 0 until totalSizes.length) {
    -          totalSizes(i) += s.getSizeForBlock(i)
    +      val parallelAggThreshold = conf.get(
    +        SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
    +      val parallelism = math.min(
    +        Runtime.getRuntime.availableProcessors(),
    +        statuses.length * totalSizes.length / parallelAggThreshold + 1)
    +      if (parallelism <= 1) {
    +        for (s <- statuses) {
    +          for (i <- 0 until totalSizes.length) {
    +            totalSizes(i) += s.getSizeForBlock(i)
    +          }
    +        }
    +      } else {
    +        try {
    +          val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
    +          implicit val executionContext = ExecutionContext.fromExecutor(threadPool)
    +          val mapStatusSubmitTasks = equallyDivide(totalSizes.length, parallelism).map {
    +            reduceIds => Future {
    +              for (s <- statuses; i <- reduceIds) {
    +                totalSizes(i) += s.getSizeForBlock(i)
    +              }
    +            }
    +          }
    +          ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), Duration.Inf)
    +        } finally {
    +          threadpool.shutdown()
    --- End diff --
    
    ah good catch! I misread it...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by gczsjdy <gi...@git.apache.org>.
Github user gczsjdy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152920483
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -485,4 +485,13 @@ package object config {
             "array in the sorter.")
           .intConf
           .createWithDefault(Integer.MAX_VALUE)
    +
    +  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
    +    ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
    +      .internal()
    +      .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " +
    +        "or equal to this threshold.")
    --- End diff --
    
    Yeah, I will add some.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152193763
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
         shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
       }
     
    +  /**
    +   * Grouped function of Range, this is to avoid traverse of all elements of Range using
    +   * IterableLike's grouped function.
    +   */
    +  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
    +    val start = range.start
    +    val step = range.step
    +    val end = range.end
    +    for (i <- start.until(end, size * step)) yield {
    +      i.until(i + size * step, step)
    +    }
    +  }
    +
    +  /**
    +   * To equally divide n elements into m buckets, basically each bucket should have n/m elements,
    +   * for the remaining n%m elements, add one more element to the first n%m buckets each.
    +   */
    +  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
    +    val elementsPerBucket = numElements / numBuckets
    +    val remaining = numElements % numBuckets
    +    val splitPoint = (elementsPerBucket + 1) * remaining
    +    if (elementsPerBucket == 0) {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
    +    } else {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
    +        rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
    +    }
    +  }
    +
       /**
        * Return statistics about all of the outputs for a given shuffle.
        */
       def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
         shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
           val totalSizes = new Array[Long](dep.partitioner.numPartitions)
    -      for (s <- statuses) {
    -        for (i <- 0 until totalSizes.length) {
    -          totalSizes(i) += s.getSizeForBlock(i)
    +      val parallelAggThreshold = conf.get(
    +        SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
    +      val parallelism = math.min(
    +        Runtime.getRuntime.availableProcessors(),
    +        statuses.length * totalSizes.length / parallelAggThreshold + 1)
    +      if (parallelism <= 1) {
    +        for (s <- statuses) {
    +          for (i <- 0 until totalSizes.length) {
    +            totalSizes(i) += s.getSizeForBlock(i)
    +          }
    +        }
    +      } else {
    +        try {
    +          val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
    +          implicit val executionContext = ExecutionContext.fromExecutor(threadPool)
    +          val mapStatusSubmitTasks = equallyDivide(totalSizes.length, parallelism).map {
    +            reduceIds => Future {
    +              for (s <- statuses; i <- reduceIds) {
    +                totalSizes(i) += s.getSizeForBlock(i)
    +              }
    +            }
    +          }
    +          ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), Duration.Inf)
    +        } finally {
    +          threadpool.shutdown()
    --- End diff --
    
    cc @zsxwing do we really need to shut down the thread pool every time? This method may be called many times and is it better to cache this thread pool? like the dispatcher thread pool.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by gczsjdy <gi...@git.apache.org>.
Github user gczsjdy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152005905
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -485,4 +485,13 @@ package object config {
             "array in the sorter.")
           .intConf
           .createWithDefault(Integer.MAX_VALUE)
    +
    +  private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
    --- End diff --
    
    Like https://github.com/gczsjdy/spark/blob/11b60af737a04d931356aa74ebf3c6cf4a6b08d6/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L204-L204 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by gczsjdy <gi...@git.apache.org>.
Github user gczsjdy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152888380
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
         shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
       }
     
    +  /**
    +   * Grouped function of Range, this is to avoid traverse of all elements of Range using
    +   * IterableLike's grouped function.
    +   */
    +  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
    +    val start = range.start
    +    val step = range.step
    +    val end = range.end
    +    for (i <- start.until(end, size * step)) yield {
    +      i.until(i + size * step, step)
    +    }
    +  }
    +
    +  /**
    +   * To equally divide n elements into m buckets, basically each bucket should have n/m elements,
    +   * for the remaining n%m elements, add one more element to the first n%m buckets each.
    +   */
    +  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
    +    val elementsPerBucket = numElements / numBuckets
    +    val remaining = numElements % numBuckets
    +    val splitPoint = (elementsPerBucket + 1) * remaining
    +    if (elementsPerBucket == 0) {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
    +    } else {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
    +        rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
    +    }
    +  }
    +
       /**
        * Return statistics about all of the outputs for a given shuffle.
        */
       def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
         shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
           val totalSizes = new Array[Long](dep.partitioner.numPartitions)
    -      for (s <- statuses) {
    -        for (i <- 0 until totalSizes.length) {
    -          totalSizes(i) += s.getSizeForBlock(i)
    +      val parallelAggThreshold = conf.get(
    +        SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
    +      val parallelism = math.min(
    +        Runtime.getRuntime.availableProcessors(),
    +        statuses.length * totalSizes.length / parallelAggThreshold + 1)
    +      if (parallelism <= 1) {
    +        for (s <- statuses) {
    +          for (i <- 0 until totalSizes.length) {
    +            totalSizes(i) += s.getSizeForBlock(i)
    +          }
    +        }
    +      } else {
    +        try {
    +          val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
    +          implicit val executionContext = ExecutionContext.fromExecutor(threadPool)
    +          val mapStatusSubmitTasks = equallyDivide(totalSizes.length, parallelism).map {
    +            reduceIds => Future {
    +              for (s <- statuses; i <- reduceIds) {
    +                totalSizes(i) += s.getSizeForBlock(i)
    +              }
    +            }
    +          }
    +          ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), Duration.Inf)
    +        } finally {
    +          threadpool.shutdown()
    --- End diff --
    
    @cloud-fan `We can shut down the pool after some certain idle time, but not sure if it's worth the complexity` I know we don't need to do this now. But if we did it how to do?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152020601
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -472,16 +475,45 @@ private[spark] class MapOutputTrackerMaster(
         shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
       }
     
    +  /**
    +   * Try to equally divide Range(0, num) to divisor slices
    +   */
    +  def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
    +    assert(divisor > 0, "Divisor should be positive")
    +    val (each, remain) = (num / divisor, num % divisor)
    +    val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
    --- End diff --
    
    can you add some comment to describe the algorithm? I'd expect something like:
    ```
    to equally divide n elements to m buckets
    each bucket should have n/m elements
    for the remaining n%m elements
    pick the first n/m buckets and add one more element
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19763
  
    Seems like not a big deal for the end-to-end performance?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152907606
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -485,4 +485,13 @@ package object config {
             "array in the sorter.")
           .intConf
           .createWithDefault(Integer.MAX_VALUE)
    +
    +  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
    +    ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
    +      .internal()
    +      .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " +
    +        "or equal to this threshold.")
    --- End diff --
    
    Looks like only `parallelism` >= 2, this parallel aggregation is enabled. Is it equal to `the number of mappers * shuffle partitions >= this threshold`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/19763
  
    Looks not a significant difference.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152896399
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -485,4 +485,13 @@ package object config {
             "array in the sorter.")
           .intConf
           .createWithDefault(Integer.MAX_VALUE)
    +
    +  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
    +    ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
    +      .internal()
    +      .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " +
    +        "or equal to this threshold.")
    --- End diff --
    
    Is this condition to enable parallel aggregation still true?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152912720
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -485,4 +485,13 @@ package object config {
             "array in the sorter.")
           .intConf
           .createWithDefault(Integer.MAX_VALUE)
    +
    +  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
    +    ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
    +      .internal()
    +      .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " +
    +        "or equal to this threshold.")
    --- End diff --
    
    I think we don't need to indicate the calculation way in config description. The current one is enough.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/19763
  
    > We can shut down the pool after some certain idle time, but not sure if it's worth the complexity
    
    Yeah, that's just what the cached thread pool does :)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152702214
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
         shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
       }
     
    +  /**
    +   * Grouped function of Range, this is to avoid traverse of all elements of Range using
    +   * IterableLike's grouped function.
    +   */
    +  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
    +    val start = range.start
    +    val step = range.step
    +    val end = range.end
    +    for (i <- start.until(end, size * step)) yield {
    +      i.until(i + size * step, step)
    +    }
    +  }
    +
    +  /**
    +   * To equally divide n elements into m buckets, basically each bucket should have n/m elements,
    +   * for the remaining n%m elements, add one more element to the first n%m buckets each.
    +   */
    +  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
    +    val elementsPerBucket = numElements / numBuckets
    +    val remaining = numElements % numBuckets
    +    val splitPoint = (elementsPerBucket + 1) * remaining
    +    if (elementsPerBucket == 0) {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
    +    } else {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
    +        rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
    +    }
    +  }
    +
       /**
        * Return statistics about all of the outputs for a given shuffle.
        */
       def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
         shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
           val totalSizes = new Array[Long](dep.partitioner.numPartitions)
    -      for (s <- statuses) {
    -        for (i <- 0 until totalSizes.length) {
    -          totalSizes(i) += s.getSizeForBlock(i)
    +      val parallelAggThreshold = conf.get(
    +        SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
    +      val parallelism = math.min(
    +        Runtime.getRuntime.availableProcessors(),
    +        statuses.length * totalSizes.length / parallelAggThreshold + 1)
    --- End diff --
    
    `statuses.length.toLong`. It's easy to overflow here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19763: [SPARK-22537] Aggregation of map output statistics on dr...

Posted by gczsjdy <gi...@git.apache.org>.
Github user gczsjdy commented on the issue:

    https://github.com/apache/spark/pull/19763
  
    cc @cloud-fan @viirya @gatorsmile @chenghao-intel 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152084018
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -472,16 +475,48 @@ private[spark] class MapOutputTrackerMaster(
         shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
       }
     
    +  /**
    +   * To equally divide n elements into m buckets, basically each bucket should have n/m elements,
    +   * for the remaining n%m elements, add one more element to the first n%m buckets each.
    +   */
    +  def equallyDivide(numElements: Int, numBuckets: Int): Iterator[Seq[Int]] = {
    +    val elementsPerBucket = numElements / numBuckets
    +    val remaining = numElements % numBuckets
    +    if (remaining == 0) {
    +      0.until(numElements).grouped(elementsPerBucket)
    +    } else {
    +      val splitPoint = (elementsPerBucket + 1) * remaining
    +      0.to(splitPoint).grouped(elementsPerBucket + 1) ++
    +        (splitPoint + 1).until(numElements).grouped(elementsPerBucket)
    +    }
    +  }
    +
       /**
        * Return statistics about all of the outputs for a given shuffle.
        */
       def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
         shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
           val totalSizes = new Array[Long](dep.partitioner.numPartitions)
    -      for (s <- statuses) {
    -        for (i <- 0 until totalSizes.length) {
    -          totalSizes(i) += s.getSizeForBlock(i)
    +      val parallelAggThreshold = conf.get(
    +        SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD)
    +      if (statuses.length * totalSizes.length < parallelAggThreshold) {
    +        for (s <- statuses) {
    +          for (i <- 0 until totalSizes.length) {
    +            totalSizes(i) += s.getSizeForBlock(i)
    +          }
    +        }
    +      } else {
    +        val parallelism = conf.get(SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLELISM)
    +        val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-statistics")
    --- End diff --
    
    please put `threadPool.shutdown` in `finally` to shut down the thread pool 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r151901370
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -485,4 +485,13 @@ package object config {
             "array in the sorter.")
           .intConf
           .createWithDefault(Integer.MAX_VALUE)
    +
    +  private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_MULTITHREAD_THRESHOLD =
    +    ConfigBuilder("spark.shuffle.mapOutputStatisticsMultithreadThreshold")
    --- End diff --
    
    `spark.shuffle.mapOutputStatistics.parallelAggregationThreshold`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r151322438
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -473,16 +477,41 @@ private[spark] class MapOutputTrackerMaster(
       }
     
       /**
    +   * Try to equally divide Range(0, num) to divisor slices
    +   */
    +  def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
    +    assert(divisor > 0, "Divisor should be positive")
    +    val (each, remain) = (num / divisor, num % divisor)
    +    val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
    +    if (each != 0) {
    +      smaller.grouped(each) ++ bigger.grouped(each + 1)
    +    } else {
    +      bigger.grouped(each + 1)
    +    }
    +  }
    +
    +  /**
        * Return statistics about all of the outputs for a given shuffle.
        */
       def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
         shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
           val totalSizes = new Array[Long](dep.partitioner.numPartitions)
    -      for (s <- statuses) {
    -        for (i <- 0 until totalSizes.length) {
    -          totalSizes(i) += s.getSizeForBlock(i)
    -        }
    +      val mapStatusSubmitTasks = ArrayBuffer[Future[_]]()
    +      var taskSlices = parallelism
    +
    +      equallyDivide(totalSizes.length, taskSlices).foreach {
    +        reduceIds =>
    +          mapStatusSubmitTasks += threadPoolMapStats.submit(
    +            new Runnable {
    +              override def run(): Unit = {
    +                for (s <- statuses; i <- reduceIds) {
    +                  totalSizes(i) += s.getSizeForBlock(i)
    +                }
    +              }
    +            }
    +          )
           }
    +      mapStatusSubmitTasks.foreach(_.get())
    --- End diff --
    
    this part can be simplified by using scala's Future, 
    
    ```scala
    val futureArray = equallyDivide(totalSizes.length, taskSlices).map {
             reduceIds => Future {
             // whatever you want to do here
             }
    }
    Await.result(Future.sequence(futureArray), Duration.Inf) // or some timeout value you prefer
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by gczsjdy <gi...@git.apache.org>.
Github user gczsjdy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152008181
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -485,4 +485,13 @@ package object config {
             "array in the sorter.")
           .intConf
           .createWithDefault(Integer.MAX_VALUE)
    +
    +  private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
    --- End diff --
    
    `spark.adaptive.map.statistics.cores` needs config entry, but I thought adaptive.xxx item has been put under `spark.sql.` already, so it might be inconsitent. Now I think it's no big deal.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by gczsjdy <gi...@git.apache.org>.
Github user gczsjdy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r151332369
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -473,16 +477,41 @@ private[spark] class MapOutputTrackerMaster(
       }
     
       /**
    +   * Try to equally divide Range(0, num) to divisor slices
    +   */
    +  def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
    +    assert(divisor > 0, "Divisor should be positive")
    +    val (each, remain) = (num / divisor, num % divisor)
    +    val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
    +    if (each != 0) {
    +      smaller.grouped(each) ++ bigger.grouped(each + 1)
    +    } else {
    +      bigger.grouped(each + 1)
    +    }
    +  }
    +
    +  /**
        * Return statistics about all of the outputs for a given shuffle.
        */
       def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
         shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
           val totalSizes = new Array[Long](dep.partitioner.numPartitions)
    -      for (s <- statuses) {
    -        for (i <- 0 until totalSizes.length) {
    -          totalSizes(i) += s.getSizeForBlock(i)
    -        }
    +      val mapStatusSubmitTasks = ArrayBuffer[Future[_]]()
    +      var taskSlices = parallelism
    +
    +      equallyDivide(totalSizes.length, taskSlices).foreach {
    +        reduceIds =>
    +          mapStatusSubmitTasks += threadPoolMapStats.submit(
    +            new Runnable {
    +              override def run(): Unit = {
    +                for (s <- statuses; i <- reduceIds) {
    +                  totalSizes(i) += s.getSizeForBlock(i)
    +                }
    +              }
    +            }
    +          )
           }
    +      mapStatusSubmitTasks.foreach(_.get())
    --- End diff --
    
    Good idea, thx!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19763
  
    LGTM


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/19763


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152863418
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
         shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
       }
     
    +  /**
    +   * Grouped function of Range, this is to avoid traverse of all elements of Range using
    +   * IterableLike's grouped function.
    +   */
    +  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
    +    val start = range.start
    +    val step = range.step
    +    val end = range.end
    +    for (i <- start.until(end, size * step)) yield {
    +      i.until(i + size * step, step)
    +    }
    +  }
    +
    +  /**
    +   * To equally divide n elements into m buckets, basically each bucket should have n/m elements,
    +   * for the remaining n%m elements, add one more element to the first n%m buckets each.
    +   */
    +  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
    +    val elementsPerBucket = numElements / numBuckets
    +    val remaining = numElements % numBuckets
    +    val splitPoint = (elementsPerBucket + 1) * remaining
    +    if (elementsPerBucket == 0) {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
    +    } else {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
    +        rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
    +    }
    +  }
    +
       /**
        * Return statistics about all of the outputs for a given shuffle.
        */
       def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
         shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
           val totalSizes = new Array[Long](dep.partitioner.numPartitions)
    -      for (s <- statuses) {
    -        for (i <- 0 until totalSizes.length) {
    -          totalSizes(i) += s.getSizeForBlock(i)
    +      val parallelAggThreshold = conf.get(
    +        SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
    +      val parallelism = math.min(
    +        Runtime.getRuntime.availableProcessors(),
    +        statuses.length * totalSizes.length / parallelAggThreshold + 1)
    +      if (parallelism <= 1) {
    +        for (s <- statuses) {
    +          for (i <- 0 until totalSizes.length) {
    +            totalSizes(i) += s.getSizeForBlock(i)
    +          }
    +        }
    +      } else {
    +        try {
    +          val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
    +          implicit val executionContext = ExecutionContext.fromExecutor(threadPool)
    +          val mapStatusSubmitTasks = equallyDivide(totalSizes.length, parallelism).map {
    +            reduceIds => Future {
    +              for (s <- statuses; i <- reduceIds) {
    +                totalSizes(i) += s.getSizeForBlock(i)
    +              }
    +            }
    +          }
    +          ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), Duration.Inf)
    +        } finally {
    +          threadpool.shutdown()
    --- End diff --
    
    @gczsjdy Oh, sorry. I didn't realize there is already a `threadpool` field in `MapOutputTrackerMaster`. That's why there is no error. Here you are shutting down a wrong thread pool.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152693924
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
         shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
       }
     
    +  /**
    +   * Grouped function of Range, this is to avoid traverse of all elements of Range using
    +   * IterableLike's grouped function.
    +   */
    +  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
    +    val start = range.start
    +    val step = range.step
    +    val end = range.end
    +    for (i <- start.until(end, size * step)) yield {
    +      i.until(i + size * step, step)
    +    }
    +  }
    +
    +  /**
    +   * To equally divide n elements into m buckets, basically each bucket should have n/m elements,
    +   * for the remaining n%m elements, add one more element to the first n%m buckets each.
    +   */
    +  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
    +    val elementsPerBucket = numElements / numBuckets
    +    val remaining = numElements % numBuckets
    +    val splitPoint = (elementsPerBucket + 1) * remaining
    +    if (elementsPerBucket == 0) {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
    +    } else {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
    +        rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
    +    }
    +  }
    +
       /**
        * Return statistics about all of the outputs for a given shuffle.
        */
       def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
         shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
           val totalSizes = new Array[Long](dep.partitioner.numPartitions)
    -      for (s <- statuses) {
    -        for (i <- 0 until totalSizes.length) {
    -          totalSizes(i) += s.getSizeForBlock(i)
    +      val parallelAggThreshold = conf.get(
    +        SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
    +      val parallelism = math.min(
    +        Runtime.getRuntime.availableProcessors(),
    +        statuses.length * totalSizes.length / parallelAggThreshold + 1)
    +      if (parallelism <= 1) {
    +        for (s <- statuses) {
    +          for (i <- 0 until totalSizes.length) {
    +            totalSizes(i) += s.getSizeForBlock(i)
    +          }
    +        }
    +      } else {
    +        try {
    +          val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
    +          implicit val executionContext = ExecutionContext.fromExecutor(threadPool)
    +          val mapStatusSubmitTasks = equallyDivide(totalSizes.length, parallelism).map {
    +            reduceIds => Future {
    +              for (s <- statuses; i <- reduceIds) {
    +                totalSizes(i) += s.getSizeForBlock(i)
    +              }
    +            }
    +          }
    +          ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), Duration.Inf)
    +        } finally {
    +          threadpool.shutdown()
    --- End diff --
    
    @gczsjdy could you fix the compile error?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19763
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152007204
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -472,16 +475,45 @@ private[spark] class MapOutputTrackerMaster(
         shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
       }
     
    +  /**
    +   * Try to equally divide Range(0, num) to divisor slices
    +   */
    +  def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
    +    assert(divisor > 0, "Divisor should be positive")
    +    val (each, remain) = (num / divisor, num % divisor)
    +    val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
    +    if (each != 0) {
    +      smaller.grouped(each) ++ bigger.grouped(each + 1)
    +    } else {
    +      bigger.grouped(each + 1)
    +    }
    +  }
    +
       /**
        * Return statistics about all of the outputs for a given shuffle.
        */
       def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
         shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
           val totalSizes = new Array[Long](dep.partitioner.numPartitions)
    -      for (s <- statuses) {
    -        for (i <- 0 until totalSizes.length) {
    -          totalSizes(i) += s.getSizeForBlock(i)
    +      if (statuses.length * totalSizes.length <=
    +        conf.get(SHUFFLE_MAP_OUTPUT_STATISTICS_MULTITHREAD_THRESHOLD)) {
    +        for (s <- statuses) {
    +          for (i <- 0 until totalSizes.length) {
    +            totalSizes(i) += s.getSizeForBlock(i)
    +          }
    +        }
    +      } else {
    +        val parallelism = conf.getInt("spark.adaptive.map.statistics.cores", 8)
    --- End diff --
    
    how is this related to `adaptive`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152007061
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -485,4 +485,13 @@ package object config {
             "array in the sorter.")
           .intConf
           .createWithDefault(Integer.MAX_VALUE)
    +
    +  private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
    --- End diff --
    
    I don't get it. You showed me that `spark.sql.adaptive.xxx` have config entries, why `spark.adaptive.map.statistics.cores` doesn't need config entry?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152914684
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
         shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
       }
     
    +  /**
    +   * Grouped function of Range, this is to avoid traverse of all elements of Range using
    +   * IterableLike's grouped function.
    +   */
    +  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
    +    val start = range.start
    +    val step = range.step
    +    val end = range.end
    +    for (i <- start.until(end, size * step)) yield {
    +      i.until(i + size * step, step)
    +    }
    +  }
    +
    +  /**
    +   * To equally divide n elements into m buckets, basically each bucket should have n/m elements,
    +   * for the remaining n%m elements, add one more element to the first n%m buckets each.
    +   */
    +  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
    +    val elementsPerBucket = numElements / numBuckets
    +    val remaining = numElements % numBuckets
    +    val splitPoint = (elementsPerBucket + 1) * remaining
    +    if (elementsPerBucket == 0) {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
    +    } else {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
    +        rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
    +    }
    +  }
    +
       /**
        * Return statistics about all of the outputs for a given shuffle.
        */
       def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
         shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
           val totalSizes = new Array[Long](dep.partitioner.numPartitions)
    -      for (s <- statuses) {
    -        for (i <- 0 until totalSizes.length) {
    -          totalSizes(i) += s.getSizeForBlock(i)
    +      val parallelAggThreshold = conf.get(
    +        SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
    --- End diff --
    
    For zero or negative threshold, see my above comment: https://github.com/apache/spark/pull/19763#discussion_r152914613.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r151331447
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -473,16 +477,41 @@ private[spark] class MapOutputTrackerMaster(
       }
     
       /**
    +   * Try to equally divide Range(0, num) to divisor slices
    +   */
    +  def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
    +    assert(divisor > 0, "Divisor should be positive")
    +    val (each, remain) = (num / divisor, num % divisor)
    +    val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
    +    if (each != 0) {
    +      smaller.grouped(each) ++ bigger.grouped(each + 1)
    +    } else {
    +      bigger.grouped(each + 1)
    +    }
    +  }
    +
    +  /**
        * Return statistics about all of the outputs for a given shuffle.
        */
       def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
         shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
           val totalSizes = new Array[Long](dep.partitioner.numPartitions)
    -      for (s <- statuses) {
    -        for (i <- 0 until totalSizes.length) {
    -          totalSizes(i) += s.getSizeForBlock(i)
    -        }
    +      val mapStatusSubmitTasks = ArrayBuffer[Future[_]]()
    +      var taskSlices = parallelism
    --- End diff --
    
    Why `var`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by gczsjdy <gi...@git.apache.org>.
Github user gczsjdy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152906960
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
         shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
       }
     
    +  /**
    +   * Grouped function of Range, this is to avoid traverse of all elements of Range using
    +   * IterableLike's grouped function.
    +   */
    +  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
    +    val start = range.start
    +    val step = range.step
    +    val end = range.end
    +    for (i <- start.until(end, size * step)) yield {
    +      i.until(i + size * step, step)
    +    }
    +  }
    +
    +  /**
    +   * To equally divide n elements into m buckets, basically each bucket should have n/m elements,
    +   * for the remaining n%m elements, add one more element to the first n%m buckets each.
    +   */
    +  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
    +    val elementsPerBucket = numElements / numBuckets
    +    val remaining = numElements % numBuckets
    +    val splitPoint = (elementsPerBucket + 1) * remaining
    +    if (elementsPerBucket == 0) {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
    +    } else {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
    +        rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
    +    }
    +  }
    +
       /**
        * Return statistics about all of the outputs for a given shuffle.
        */
       def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
         shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
           val totalSizes = new Array[Long](dep.partitioner.numPartitions)
    -      for (s <- statuses) {
    -        for (i <- 0 until totalSizes.length) {
    -          totalSizes(i) += s.getSizeForBlock(i)
    +      val parallelAggThreshold = conf.get(
    +        SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
    +      val parallelism = math.min(
    +        Runtime.getRuntime.availableProcessors(),
    +        statuses.length.toLong * totalSizes.length / parallelAggThreshold + 1).toInt
    +      if (parallelism <= 1) {
    +        for (s <- statuses) {
    +          for (i <- 0 until totalSizes.length) {
    +            totalSizes(i) += s.getSizeForBlock(i)
    +          }
    +        }
    +      } else {
    +        val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
    --- End diff --
    
    I think we don't need to fully utilize all available processors. `parallelAggThreshold` is default to be 10^7, which means a relatively small task to deal with. Therefore the tasks don't need to be cut smaller in most cases. 
    For some cases where the split is a big task, `parallelAggThreshold` should be tuned. This is not very direct because you don't have a `xx.parallelism` config to set, but the benefit is we introduced less configs.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by gczsjdy <gi...@git.apache.org>.
Github user gczsjdy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r151339166
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -473,16 +477,41 @@ private[spark] class MapOutputTrackerMaster(
       }
     
       /**
    +   * Try to equally divide Range(0, num) to divisor slices
    +   */
    +  def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
    +    assert(divisor > 0, "Divisor should be positive")
    +    val (each, remain) = (num / divisor, num % divisor)
    +    val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
    +    if (each != 0) {
    +      smaller.grouped(each) ++ bigger.grouped(each + 1)
    +    } else {
    +      bigger.grouped(each + 1)
    +    }
    +  }
    +
    +  /**
        * Return statistics about all of the outputs for a given shuffle.
        */
       def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
         shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
           val totalSizes = new Array[Long](dep.partitioner.numPartitions)
    -      for (s <- statuses) {
    -        for (i <- 0 until totalSizes.length) {
    -          totalSizes(i) += s.getSizeForBlock(i)
    -        }
    +      val mapStatusSubmitTasks = ArrayBuffer[Future[_]]()
    +      var taskSlices = parallelism
    +
    +      equallyDivide(totalSizes.length, taskSlices).foreach {
    +        reduceIds =>
    +          mapStatusSubmitTasks += threadPoolMapStats.submit(
    +            new Runnable {
    +              override def run(): Unit = {
    +                for (s <- statuses; i <- reduceIds) {
    +                  totalSizes(i) += s.getSizeForBlock(i)
    +                }
    +              }
    +            }
    +          )
           }
    +      mapStatusSubmitTasks.foreach(_.get())
    --- End diff --
    
    Should I use the `scala.concurrent.ExecutionContext.Implicits.global` ExecutionContext?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152016336
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -485,4 +485,20 @@ package object config {
             "array in the sorter.")
           .intConf
           .createWithDefault(Integer.MAX_VALUE)
    +
    +  private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
    +    ConfigBuilder("spark.shuffle.mapOutputStatistics.parallelAggregationThreshold")
    +      .internal()
    +      .doc("Multi-thread is used when the number of mappers * shuffle partitions exceeds this " +
    +        "threshold.")
    +      .intConf
    +      .createWithDefault(10000000)
    +
    +  private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_CORES =
    +    ConfigBuilder("spark.shuffle.mapOutputStatistics.cores")
    --- End diff --
    
    nit: `cores` -> `parallelism`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19763
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r151962620
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -485,4 +485,13 @@ package object config {
             "array in the sorter.")
           .intConf
           .createWithDefault(Integer.MAX_VALUE)
    +
    +  private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
    +    ConfigBuilder("spark.shuffle.mapOutputStatistics.parallelAggregationThreshold")
    +      .internal()
    +      .doc("Multi-thread is used when the number of mappers * shuffle partitions exceeds this " +
    +        "threshold")
    +      .intConf
    +      .createWithDefault(100000000)
    --- End diff --
    
    wow 100 million is really a large threshold, how do you pick this number?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by gczsjdy <gi...@git.apache.org>.
Github user gczsjdy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152006310
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -485,4 +485,13 @@ package object config {
             "array in the sorter.")
           .intConf
           .createWithDefault(Integer.MAX_VALUE)
    +
    +  private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
    --- End diff --
    
    I think that's not a big problem, adaptive execution need both core and sql code, so both confs are needed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19763
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152087573
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -472,16 +475,48 @@ private[spark] class MapOutputTrackerMaster(
         shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
       }
     
    +  /**
    +   * To equally divide n elements into m buckets, basically each bucket should have n/m elements,
    +   * for the remaining n%m elements, add one more element to the first n%m buckets each.
    +   */
    +  def equallyDivide(numElements: Int, numBuckets: Int): Iterator[Seq[Int]] = {
    +    val elementsPerBucket = numElements / numBuckets
    +    val remaining = numElements % numBuckets
    +    if (remaining == 0) {
    +      0.until(numElements).grouped(elementsPerBucket)
    +    } else {
    +      val splitPoint = (elementsPerBucket + 1) * remaining
    +      0.to(splitPoint).grouped(elementsPerBucket + 1) ++
    --- End diff --
    
    `grouped` is expensive here. I saw it generates Vector rather than `Range`:
    ```
    scala> (1 to 100).grouped(10).foreach(g => println(g.getClass))
    class scala.collection.immutable.Vector
    class scala.collection.immutable.Vector
    class scala.collection.immutable.Vector
    class scala.collection.immutable.Vector
    class scala.collection.immutable.Vector
    class scala.collection.immutable.Vector
    class scala.collection.immutable.Vector
    class scala.collection.immutable.Vector
    class scala.collection.immutable.Vector
    class scala.collection.immutable.Vector
    ```
    It means we need to generate all of numbers between 0 and `numElements`. Could you implement a special `grouped` for Range instead?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19763
  
    thanks, merging to master!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19763
  
    **[Test build #84161 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84161/testReport)** for PR 19763 at commit [`0f87dd6`](https://github.com/apache/spark/commit/0f87dd6989ed092a3f06b1a308930265a6bc0a30).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by gczsjdy <gi...@git.apache.org>.
Github user gczsjdy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152002262
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -485,4 +485,13 @@ package object config {
             "array in the sorter.")
           .intConf
           .createWithDefault(Integer.MAX_VALUE)
    +
    +  private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
    --- End diff --
    
    `spark.sql.adaptive.xxx` already exists, will this be a problem? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19763
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84134/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...

Posted by gczsjdy <gi...@git.apache.org>.
Github user gczsjdy commented on the issue:

    https://github.com/apache/spark/pull/19763
  
    @cloud-fan  Seems Jenkins's  not started?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19763
  
    **[Test build #84161 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84161/testReport)** for PR 19763 at commit [`0f87dd6`](https://github.com/apache/spark/commit/0f87dd6989ed092a3f06b1a308930265a6bc0a30).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152896879
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
         shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
       }
     
    +  /**
    +   * Grouped function of Range, this is to avoid traverse of all elements of Range using
    +   * IterableLike's grouped function.
    +   */
    +  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
    +    val start = range.start
    +    val step = range.step
    +    val end = range.end
    +    for (i <- start.until(end, size * step)) yield {
    +      i.until(i + size * step, step)
    +    }
    +  }
    +
    +  /**
    +   * To equally divide n elements into m buckets, basically each bucket should have n/m elements,
    +   * for the remaining n%m elements, add one more element to the first n%m buckets each.
    +   */
    +  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
    +    val elementsPerBucket = numElements / numBuckets
    +    val remaining = numElements % numBuckets
    +    val splitPoint = (elementsPerBucket + 1) * remaining
    +    if (elementsPerBucket == 0) {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
    +    } else {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
    +        rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
    +    }
    +  }
    +
       /**
        * Return statistics about all of the outputs for a given shuffle.
        */
       def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
         shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
           val totalSizes = new Array[Long](dep.partitioner.numPartitions)
    -      for (s <- statuses) {
    -        for (i <- 0 until totalSizes.length) {
    -          totalSizes(i) += s.getSizeForBlock(i)
    +      val parallelAggThreshold = conf.get(
    +        SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
    +      val parallelism = math.min(
    +        Runtime.getRuntime.availableProcessors(),
    +        statuses.length.toLong * totalSizes.length / parallelAggThreshold + 1).toInt
    +      if (parallelism <= 1) {
    +        for (s <- statuses) {
    +          for (i <- 0 until totalSizes.length) {
    +            totalSizes(i) += s.getSizeForBlock(i)
    +          }
    +        }
    +      } else {
    +        val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
    --- End diff --
    
    The value of `parallelism` seems making us not fully utilize all processors at all time? E.g, if `availableProcessors` returns 8, but `parallelism` is 2, we pick 2 as number of threads.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19763
  
    **[Test build #84134 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84134/testReport)** for PR 19763 at commit [`72c3d97`](https://github.com/apache/spark/commit/72c3d97e6e2f2c50504c5e4d8b80ea595797b044).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152496569
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
         shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
       }
     
    +  /**
    +   * Grouped function of Range, this is to avoid traverse of all elements of Range using
    +   * IterableLike's grouped function.
    +   */
    +  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
    +    val start = range.start
    +    val step = range.step
    +    val end = range.end
    +    for (i <- start.until(end, size * step)) yield {
    +      i.until(i + size * step, step)
    +    }
    +  }
    +
    +  /**
    +   * To equally divide n elements into m buckets, basically each bucket should have n/m elements,
    +   * for the remaining n%m elements, add one more element to the first n%m buckets each.
    +   */
    +  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
    +    val elementsPerBucket = numElements / numBuckets
    +    val remaining = numElements % numBuckets
    +    val splitPoint = (elementsPerBucket + 1) * remaining
    +    if (elementsPerBucket == 0) {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
    +    } else {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
    +        rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
    +    }
    +  }
    +
       /**
        * Return statistics about all of the outputs for a given shuffle.
        */
       def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
         shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
           val totalSizes = new Array[Long](dep.partitioner.numPartitions)
    -      for (s <- statuses) {
    -        for (i <- 0 until totalSizes.length) {
    -          totalSizes(i) += s.getSizeForBlock(i)
    +      val parallelAggThreshold = conf.get(
    +        SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
    +      val parallelism = math.min(
    +        Runtime.getRuntime.availableProcessors(),
    +        statuses.length * totalSizes.length / parallelAggThreshold + 1)
    +      if (parallelism <= 1) {
    +        for (s <- statuses) {
    +          for (i <- 0 until totalSizes.length) {
    +            totalSizes(i) += s.getSizeForBlock(i)
    +          }
    +        }
    +      } else {
    +        try {
    +          val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
    +          implicit val executionContext = ExecutionContext.fromExecutor(threadPool)
    +          val mapStatusSubmitTasks = equallyDivide(totalSizes.length, parallelism).map {
    +            reduceIds => Future {
    +              for (s <- statuses; i <- reduceIds) {
    +                totalSizes(i) += s.getSizeForBlock(i)
    +              }
    +            }
    +          }
    +          ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), Duration.Inf)
    +        } finally {
    +          threadpool.shutdown()
    --- End diff --
    
    We can shut down the pool after some certain idle time, but not sure if it's worth the complexity


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19763
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84161/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by gczsjdy <gi...@git.apache.org>.
Github user gczsjdy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152888257
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
         shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
       }
     
    +  /**
    +   * Grouped function of Range, this is to avoid traverse of all elements of Range using
    +   * IterableLike's grouped function.
    +   */
    +  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
    +    val start = range.start
    +    val step = range.step
    +    val end = range.end
    +    for (i <- start.until(end, size * step)) yield {
    +      i.until(i + size * step, step)
    +    }
    +  }
    +
    +  /**
    +   * To equally divide n elements into m buckets, basically each bucket should have n/m elements,
    +   * for the remaining n%m elements, add one more element to the first n%m buckets each.
    +   */
    +  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
    +    val elementsPerBucket = numElements / numBuckets
    +    val remaining = numElements % numBuckets
    +    val splitPoint = (elementsPerBucket + 1) * remaining
    +    if (elementsPerBucket == 0) {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
    +    } else {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
    +        rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
    +    }
    +  }
    +
       /**
        * Return statistics about all of the outputs for a given shuffle.
        */
       def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
         shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
           val totalSizes = new Array[Long](dep.partitioner.numPartitions)
    -      for (s <- statuses) {
    -        for (i <- 0 until totalSizes.length) {
    -          totalSizes(i) += s.getSizeForBlock(i)
    +      val parallelAggThreshold = conf.get(
    +        SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
    +      val parallelism = math.min(
    +        Runtime.getRuntime.availableProcessors(),
    +        statuses.length * totalSizes.length / parallelAggThreshold + 1)
    +      if (parallelism <= 1) {
    +        for (s <- statuses) {
    +          for (i <- 0 until totalSizes.length) {
    +            totalSizes(i) += s.getSizeForBlock(i)
    +          }
    +        }
    +      } else {
    +        try {
    +          val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
    +          implicit val executionContext = ExecutionContext.fromExecutor(threadPool)
    +          val mapStatusSubmitTasks = equallyDivide(totalSizes.length, parallelism).map {
    +            reduceIds => Future {
    +              for (s <- statuses; i <- reduceIds) {
    +                totalSizes(i) += s.getSizeForBlock(i)
    +              }
    +            }
    +          }
    +          ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), Duration.Inf)
    +        } finally {
    +          threadpool.shutdown()
    --- End diff --
    
    My fault!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152084974
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -472,16 +475,48 @@ private[spark] class MapOutputTrackerMaster(
         shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
       }
     
    +  /**
    +   * To equally divide n elements into m buckets, basically each bucket should have n/m elements,
    +   * for the remaining n%m elements, add one more element to the first n%m buckets each.
    +   */
    +  def equallyDivide(numElements: Int, numBuckets: Int): Iterator[Seq[Int]] = {
    +    val elementsPerBucket = numElements / numBuckets
    +    val remaining = numElements % numBuckets
    +    if (remaining == 0) {
    +      0.until(numElements).grouped(elementsPerBucket)
    +    } else {
    +      val splitPoint = (elementsPerBucket + 1) * remaining
    +      0.to(splitPoint).grouped(elementsPerBucket + 1) ++
    +        (splitPoint + 1).until(numElements).grouped(elementsPerBucket)
    +    }
    +  }
    +
       /**
        * Return statistics about all of the outputs for a given shuffle.
        */
       def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
         shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
           val totalSizes = new Array[Long](dep.partitioner.numPartitions)
    -      for (s <- statuses) {
    -        for (i <- 0 until totalSizes.length) {
    -          totalSizes(i) += s.getSizeForBlock(i)
    +      val parallelAggThreshold = conf.get(
    +        SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD)
    +      if (statuses.length * totalSizes.length < parallelAggThreshold) {
    +        for (s <- statuses) {
    +          for (i <- 0 until totalSizes.length) {
    +            totalSizes(i) += s.getSizeForBlock(i)
    +          }
    +        }
    +      } else {
    +        val parallelism = conf.get(SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLELISM)
    --- End diff --
    
    How about setting  `parallelism =  math.min(Runtime.getRuntime.availableProcessors(), statuses.length.toLong * totalSizes.length / parallelAggThreshold)` rather than introducing a new config, such as:
    ```
         val parallelism = math.min(
             Runtime.getRuntime.availableProcessors(), 
             statuses.length.toLong * totalSizes.length / parallelAggThreshold + 1)
          if (parallelism <= 1) {
           ...
          } else {
            ....
          }
    ```
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by gczsjdy <gi...@git.apache.org>.
Github user gczsjdy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152002860
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -485,4 +485,13 @@ package object config {
             "array in the sorter.")
           .intConf
           .createWithDefault(Integer.MAX_VALUE)
    +
    +  private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
    +    ConfigBuilder("spark.shuffle.mapOutputStatistics.parallelAggregationThreshold")
    +      .internal()
    +      .doc("Multi-thread is used when the number of mappers * shuffle partitions exceeds this " +
    +        "threshold")
    +      .intConf
    +      .createWithDefault(100000000)
    --- End diff --
    
    Now I also think it's a little bit large... In the case I mentioned, the 5s gap is created by 10^8 of this value. Maybe 10^7 is good? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by gczsjdy <gi...@git.apache.org>.
Github user gczsjdy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152021708
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -472,16 +475,45 @@ private[spark] class MapOutputTrackerMaster(
         shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
       }
     
    +  /**
    +   * Try to equally divide Range(0, num) to divisor slices
    +   */
    +  def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
    +    assert(divisor > 0, "Divisor should be positive")
    +    val (each, remain) = (num / divisor, num % divisor)
    +    val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
    --- End diff --
    
    Sure : )


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19763
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...

Posted by gczsjdy <gi...@git.apache.org>.
Github user gczsjdy commented on the issue:

    https://github.com/apache/spark/pull/19763
  
    This happens a lot in our TPC-DS 100TB test. We have a Intel Xeon CPU E5-2699 v4 @2.2GHz CPU as master, this will influence the driver's performance. And we set `spark.sql.shuffle.partitions` to 10976. Shuffle partitions * number of mappers will influence the workload driver does.
    
    Let's take TPC-DS q67 as example:
    Without this PR, there's 47:39-(41:16+6.3min) ~ 5s gap between map and reduce stages, most of which is used to aggregate map statistics using one thread. 
    <img width="927" alt="single_thread_q67" src="https://user-images.githubusercontent.com/7685352/32893095-49216a4a-cb13-11e7-82fe-ccb552a6a625.PNG">
    With this PR, there's 25:32-(18:58+6.6min) ~ 0s gap:
    <img width="926" alt="multi-thread_q67" src="https://user-images.githubusercontent.com/7685352/32893264-beb31b82-cb13-11e7-954f-a893f6a9966f.PNG">
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by gczsjdy <gi...@git.apache.org>.
Github user gczsjdy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152185531
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -485,4 +485,13 @@ package object config {
             "array in the sorter.")
           .intConf
           .createWithDefault(Integer.MAX_VALUE)
    +
    +  private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
    --- End diff --
    
    Actually there are 3 confs like that... all need?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152022126
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -472,16 +475,45 @@ private[spark] class MapOutputTrackerMaster(
         shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
       }
     
    +  /**
    +   * Try to equally divide Range(0, num) to divisor slices
    +   */
    +  def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
    +    assert(divisor > 0, "Divisor should be positive")
    +    val (each, remain) = (num / divisor, num % divisor)
    +    val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
    --- End diff --
    
    my proposal
    ```
    def equallyDivide(numElements: Int, numBuckets: Int) {
      val elementsPerBucket = numElements / numBuckets
      val remaining = numElements % numBuckets
      if (remaining == 0) {
        0.until(num).grouped(elementsPerBucket)
      } else {
        val splitPoint = (elementsPerBucket + 1) * remaining
        0.to(splitPoint).grouped(elementsPerBucket + 1) ++ (splitPoint + 1).until(numElements).grouped(elementsPerBucket)
      }
    }
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152913671
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
         shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
       }
     
    +  /**
    +   * Grouped function of Range, this is to avoid traverse of all elements of Range using
    +   * IterableLike's grouped function.
    +   */
    +  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
    +    val start = range.start
    +    val step = range.step
    +    val end = range.end
    +    for (i <- start.until(end, size * step)) yield {
    +      i.until(i + size * step, step)
    +    }
    +  }
    +
    +  /**
    +   * To equally divide n elements into m buckets, basically each bucket should have n/m elements,
    +   * for the remaining n%m elements, add one more element to the first n%m buckets each.
    +   */
    +  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
    +    val elementsPerBucket = numElements / numBuckets
    +    val remaining = numElements % numBuckets
    +    val splitPoint = (elementsPerBucket + 1) * remaining
    +    if (elementsPerBucket == 0) {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
    +    } else {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
    +        rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
    +    }
    +  }
    +
       /**
        * Return statistics about all of the outputs for a given shuffle.
        */
       def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
         shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
           val totalSizes = new Array[Long](dep.partitioner.numPartitions)
    -      for (s <- statuses) {
    -        for (i <- 0 until totalSizes.length) {
    -          totalSizes(i) += s.getSizeForBlock(i)
    +      val parallelAggThreshold = conf.get(
    +        SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
    --- End diff --
    
    Maybe a little picky, but should we do:
    ```scala
    val parallelAggThreshold = conf.get(
      SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) + 1
    ...
    val parallelism = math.min(
      Runtime.getRuntime.availableProcessors(),
      (statuses.length.toLong * totalSizes.length + 1) / parallelAggThreshold + 1).toInt
    ```
    
    In case of the threshold being set to zero?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...

Posted by gczsjdy <gi...@git.apache.org>.
Github user gczsjdy commented on the issue:

    https://github.com/apache/spark/pull/19763
  
    Actually, the time gap is O(number of mappers * shuffle partitions). In this case, number of mappers is not very large, while users are more likely to get slowed down when they run on a big data set.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r151801570
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -473,16 +477,41 @@ private[spark] class MapOutputTrackerMaster(
       }
     
       /**
    +   * Try to equally divide Range(0, num) to divisor slices
    +   */
    +  def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
    +    assert(divisor > 0, "Divisor should be positive")
    +    val (each, remain) = (num / divisor, num % divisor)
    +    val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
    +    if (each != 0) {
    +      smaller.grouped(each) ++ bigger.grouped(each + 1)
    +    } else {
    +      bigger.grouped(each + 1)
    +    }
    +  }
    +
    +  /**
        * Return statistics about all of the outputs for a given shuffle.
        */
       def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
         shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
           val totalSizes = new Array[Long](dep.partitioner.numPartitions)
    -      for (s <- statuses) {
    -        for (i <- 0 until totalSizes.length) {
    -          totalSizes(i) += s.getSizeForBlock(i)
    -        }
    +      val mapStatusSubmitTasks = ArrayBuffer[Future[_]]()
    +      var taskSlices = parallelism
    +
    +      equallyDivide(totalSizes.length, taskSlices).foreach {
    +        reduceIds =>
    +          mapStatusSubmitTasks += threadPoolMapStats.submit(
    +            new Runnable {
    +              override def run(): Unit = {
    +                for (s <- statuses; i <- reduceIds) {
    +                  totalSizes(i) += s.getSizeForBlock(i)
    +                }
    +              }
    +            }
    +          )
           }
    +      mapStatusSubmitTasks.foreach(_.get())
    --- End diff --
    
    Don't use `scala.concurrent.ExecutionContext.Implicits.global`. You need to create a thread pool.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by gczsjdy <gi...@git.apache.org>.
Github user gczsjdy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152912084
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -485,4 +485,13 @@ package object config {
             "array in the sorter.")
           .intConf
           .createWithDefault(Integer.MAX_VALUE)
    +
    +  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
    +    ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
    +      .internal()
    +      .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " +
    +        "or equal to this threshold.")
    --- End diff --
    
    Do you think it's necessary to indicate the actual parallelism's calculation way here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152004301
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -485,4 +485,13 @@ package object config {
             "array in the sorter.")
           .intConf
           .createWithDefault(Integer.MAX_VALUE)
    +
    +  private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
    --- End diff --
    
    Really? I grep the code base but can't find it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by gczsjdy <gi...@git.apache.org>.
Github user gczsjdy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152493779
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
         shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
       }
     
    +  /**
    +   * Grouped function of Range, this is to avoid traverse of all elements of Range using
    +   * IterableLike's grouped function.
    +   */
    +  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
    +    val start = range.start
    +    val step = range.step
    +    val end = range.end
    +    for (i <- start.until(end, size * step)) yield {
    +      i.until(i + size * step, step)
    +    }
    +  }
    +
    +  /**
    +   * To equally divide n elements into m buckets, basically each bucket should have n/m elements,
    +   * for the remaining n%m elements, add one more element to the first n%m buckets each.
    +   */
    +  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
    +    val elementsPerBucket = numElements / numBuckets
    +    val remaining = numElements % numBuckets
    +    val splitPoint = (elementsPerBucket + 1) * remaining
    +    if (elementsPerBucket == 0) {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
    +    } else {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
    +        rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
    +    }
    +  }
    +
       /**
        * Return statistics about all of the outputs for a given shuffle.
        */
       def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
         shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
           val totalSizes = new Array[Long](dep.partitioner.numPartitions)
    -      for (s <- statuses) {
    -        for (i <- 0 until totalSizes.length) {
    -          totalSizes(i) += s.getSizeForBlock(i)
    +      val parallelAggThreshold = conf.get(
    +        SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
    +      val parallelism = math.min(
    +        Runtime.getRuntime.availableProcessors(),
    +        statuses.length * totalSizes.length / parallelAggThreshold + 1)
    +      if (parallelism <= 1) {
    +        for (s <- statuses) {
    +          for (i <- 0 until totalSizes.length) {
    +            totalSizes(i) += s.getSizeForBlock(i)
    +          }
    +        }
    +      } else {
    +        try {
    +          val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
    +          implicit val executionContext = ExecutionContext.fromExecutor(threadPool)
    +          val mapStatusSubmitTasks = equallyDivide(totalSizes.length, parallelism).map {
    +            reduceIds => Future {
    +              for (s <- statuses; i <- reduceIds) {
    +                totalSizes(i) += s.getSizeForBlock(i)
    +              }
    +            }
    +          }
    +          ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), Duration.Inf)
    +        } finally {
    +          threadpool.shutdown()
    --- End diff --
    
    I agree with you, with putting the thread pool in the class, the only lost is that: even if when single-thread is used, this pool still exists. The gain is reducing creating the pool after every shuffle.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19763
  
    OK to test


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152693851
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
         shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
       }
     
    +  /**
    +   * Grouped function of Range, this is to avoid traverse of all elements of Range using
    +   * IterableLike's grouped function.
    +   */
    +  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
    +    val start = range.start
    +    val step = range.step
    +    val end = range.end
    +    for (i <- start.until(end, size * step)) yield {
    +      i.until(i + size * step, step)
    +    }
    +  }
    +
    +  /**
    +   * To equally divide n elements into m buckets, basically each bucket should have n/m elements,
    +   * for the remaining n%m elements, add one more element to the first n%m buckets each.
    +   */
    +  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
    +    val elementsPerBucket = numElements / numBuckets
    +    val remaining = numElements % numBuckets
    +    val splitPoint = (elementsPerBucket + 1) * remaining
    +    if (elementsPerBucket == 0) {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
    +    } else {
    +      rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
    +        rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
    +    }
    +  }
    +
       /**
        * Return statistics about all of the outputs for a given shuffle.
        */
       def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
         shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
           val totalSizes = new Array[Long](dep.partitioner.numPartitions)
    -      for (s <- statuses) {
    -        for (i <- 0 until totalSizes.length) {
    -          totalSizes(i) += s.getSizeForBlock(i)
    +      val parallelAggThreshold = conf.get(
    +        SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
    +      val parallelism = math.min(
    +        Runtime.getRuntime.availableProcessors(),
    +        statuses.length * totalSizes.length / parallelAggThreshold + 1)
    +      if (parallelism <= 1) {
    +        for (s <- statuses) {
    +          for (i <- 0 until totalSizes.length) {
    +            totalSizes(i) += s.getSizeForBlock(i)
    +          }
    +        }
    +      } else {
    +        try {
    +          val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
    +          implicit val executionContext = ExecutionContext.fromExecutor(threadPool)
    +          val mapStatusSubmitTasks = equallyDivide(totalSizes.length, parallelism).map {
    +            reduceIds => Future {
    +              for (s <- statuses; i <- reduceIds) {
    +                totalSizes(i) += s.getSizeForBlock(i)
    +              }
    +            }
    +          }
    +          ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), Duration.Inf)
    +        } finally {
    +          threadpool.shutdown()
    --- End diff --
    
    I'm fine to create a thread pool every time since this code path seems not run pretty frequently because 
    - Using a shared cached thread poll is just like creating new thread pool since the idle time of a thread is pretty large and is likely killed before the next call.
    - Using a shared fixed thread pool is totally a waste for most of use cases.
    - The cost of creating threads is trivial comparing the total time of a job.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19763#discussion_r152016632
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -472,16 +475,45 @@ private[spark] class MapOutputTrackerMaster(
         shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
       }
     
    +  /**
    +   * Try to equally divide Range(0, num) to divisor slices
    +   */
    +  def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
    +    assert(divisor > 0, "Divisor should be positive")
    +    val (each, remain) = (num / divisor, num % divisor)
    +    val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
    +    if (each != 0) {
    +      smaller.grouped(each) ++ bigger.grouped(each + 1)
    +    } else {
    +      bigger.grouped(each + 1)
    +    }
    +  }
    +
       /**
        * Return statistics about all of the outputs for a given shuffle.
        */
       def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
         shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
           val totalSizes = new Array[Long](dep.partitioner.numPartitions)
    -      for (s <- statuses) {
    -        for (i <- 0 until totalSizes.length) {
    -          totalSizes(i) += s.getSizeForBlock(i)
    +      if (statuses.length * totalSizes.length <=
    +        conf.get(SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD)) {
    --- End diff --
    
    nit:
    ```
    val parallelAggThreshold = ...
    if (statuses.length * totalSizes.length < parallelAggThreshold)
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org