You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by jimjh <gi...@git.apache.org> on 2014/09/16 21:39:52 UTC

[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

GitHub user jimjh opened a pull request:

    https://github.com/apache/spark/pull/2416

    SPARK-2761 refactor #maybeSpill into Spillable

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jimjh/spark SPARK-2761

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2416.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2416
    
----
commit 7270e0d45e55113e8a3de0e08c93d31c4c93efba
Author: Jim Lim <ji...@quixey.com>
Date:   2014-09-12T18:37:01Z

    SPARK-2761 refactor #maybeSpill into Spillable

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2416#discussion_r17923603
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
    @@ -232,35 +224,18 @@ private[spark] class ExternalSorter[K, V, C](
           return
         }
     
    -    val collection: SizeTrackingPairCollection[(Int, K), C] = if (usingMap) map else buffer
    -
    -    // TODO: factor this out of both here and ExternalAppendOnlyMap
    -    if (elementsRead > trackMemoryThreshold && elementsRead % 32 == 0 &&
    -        collection.estimateSize() >= myMemoryThreshold)
    -    {
    -      // Claim up to double our current memory from the shuffle memory pool
    -      val currentMemory = collection.estimateSize()
    -      val amountToRequest = 2 * currentMemory - myMemoryThreshold
    -      val granted = shuffleMemoryManager.tryToAcquire(amountToRequest)
    -      myMemoryThreshold += granted
    -      if (myMemoryThreshold <= currentMemory) {
    -        // We were granted too little memory to grow further (either tryToAcquire returned 0,
    -        // or we already had more memory than myMemoryThreshold); spill the current collection
    -        spill(currentMemory, usingMap)  // Will also release memory back to ShuffleMemoryManager
    -      }
    +    if (usingMap) {
    +      map = maybeSpill(map)
    --- End diff --
    
    It's a little confusing for a `maybeSpill` to call a different `maybeSpill`, from the base class to the parent class. We should rename one of them (not too sure which one) to `maybeSpillCollection` or something.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2416#discussion_r17922832
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/Spillable.scala ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.util.collection
    +
    +import org.apache.spark.SparkEnv
    +import scala.language.reflectiveCalls
    +
    +/**
    + * Spills contents of an in-memory collection to disk when the memory threshold
    + * has been exceeded.
    + *
    + * @tparam C collection type that provides a size estimate
    + */
    +private[spark] trait Spillable[C <: { def estimateSize(): Long }] {
    +
    +  // Number of elements read from input since last spill
    +  protected[this] var elementsRead: Long
    +
    +  // Memory manager that can be used to acquire/release memory
    +  private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager
    +
    +  // What threshold of elementsRead we start estimating collection size at
    +  private[this] val trackMemoryThreshold = 1000
    +
    +  // How much of the shared memory pool this collection has claimed
    +  private[this] var myMemoryThreshold = 0L
    +
    +  // Number of bytes spilled in total
    +  private[this] var _memoryBytesSpilled = 0L
    +
    +  // Number of spills
    +  private[this] var _spillCount = 0
    +
    +  /**
    +   * Spills the current in-memory collection to disk if needed. Attempts to acquire more
    +   * memory before spilling.
    +   *
    +   * @tparam A type of collection to be spilled
    +   * @return if spilled, a new empty collection instance; otherwise, the same collection instance
    +   */
    +  protected[this] def maybeSpill[A <: C](collection: A): A = {
    +    if (elementsRead > trackMemoryThreshold && elementsRead % 32 == 0 &&
    +        collection.estimateSize() >= myMemoryThreshold) {
    +      // Claim up to double our current memory from the shuffle memory pool
    +      val currentMemory = collection.estimateSize()
    +      val amountToRequest = 2 * currentMemory - myMemoryThreshold
    +      val granted = shuffleMemoryManager.tryToAcquire(amountToRequest)
    +      myMemoryThreshold += granted
    +      if (myMemoryThreshold <= currentMemory) {
    +        // We were granted too little memory to grow further (either tryToAcquire returned 0,
    +        // or we already had more memory than myMemoryThreshold); spill the current collection
    +        _spillCount += 1
    +        val empty = spill[A](collection)
    +        _memoryBytesSpilled += currentMemory
    +        release()
    +        return empty
    +      }
    +    }
    +    collection
    +  }
    +
    +  /**
    +   * Spills the current in-memory collection to disk, and releases the memory.
    +   *
    +   * @param collection collection to spill to disk
    +   * @return new, empty collection
    +   */
    +  protected[this] def spill[A <: C](collection: A): A
    +
    +  /**
    +   * @return total number of times this collection was spilled
    +   */
    +  protected[this] def spillCount: Int = _spillCount
    +
    +  /**
    +   * @return number of bytes spilled in total
    +   */
    +  def memoryBytesSpilled: Long = _memoryBytesSpilled
    +
    +  /**
    +   * Release our memory back to the shuffle pool so that other threads can grab it.
    +   */
    +  private[this] def release(): Unit = {
    --- End diff --
    
    Better to call this `releaseMemory` or `releaseMemoryForThisThread` to be more explicit. We do something similar in `MemoryStore.scala`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-56991531
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20862/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-57072024
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20924/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by jimjh <gi...@git.apache.org>.
Github user jimjh commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-55839236
  
    Did the tests run?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2416#discussion_r18121825
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/Spillable.scala ---
    @@ -0,0 +1,111 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.util.collection
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkEnv
    +import scala.language.reflectiveCalls
    +
    +/**
    + * Spills contents of an in-memory collection to disk when the memory threshold
    + * has been exceeded.
    + */
    +private[spark] trait Spillable[C] {
    +
    +  this: Logging =>
    +
    +  // Number of elements read from input since last spill
    +  protected var elementsRead: Long
    +
    +  // Memory manager that can be used to acquire/release memory
    +  private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager
    +
    +  // What threshold of elementsRead we start estimating collection size at
    +  private[this] val trackMemoryThreshold = 1000
    +
    +  // How much of the shared memory pool this collection has claimed
    +  private[this] var myMemoryThreshold = 0L
    +
    +  // Number of bytes spilled in total
    +  private[this] var _memoryBytesSpilled = 0L
    +
    +  // Number of spills
    +  private[this] var _spillCount = 0
    +
    +  /**
    +   * Spills the current in-memory collection to disk if needed. Attempts to acquire more
    +   * memory before spilling.
    +   *
    +   * @return if spilled, a new empty collection instance; otherwise, the same collection instance
    +   */
    +  protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
    +    if (elementsRead > trackMemoryThreshold && elementsRead % 32 == 0 &&
    +        currentMemory >= myMemoryThreshold) {
    +      // Claim up to double our current memory from the shuffle memory pool
    +      val amountToRequest = 2 * currentMemory - myMemoryThreshold
    +      val granted = shuffleMemoryManager.tryToAcquire(amountToRequest)
    +      myMemoryThreshold += granted
    +      if (myMemoryThreshold <= currentMemory) {
    +        // We were granted too little memory to grow further (either tryToAcquire returned 0,
    +        // or we already had more memory than myMemoryThreshold); spill the current collection
    +        _spillCount += 1
    +        logSpillage(currentMemory)
    +
    +        spill(collection)
    +
    +        // Keep track of spills, and release memory
    +        _memoryBytesSpilled += currentMemory
    +        releaseMemoryForThisThread()
    +        return true
    +      }
    +    }
    +    false
    +  }
    +
    +  /**
    +   * Spills the current in-memory collection to disk, and releases the memory.
    +   *
    +   * @param collection collection to spill to disk
    +   * @return new, empty collection
    --- End diff --
    
    spill doesn't return anything ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by jimjh <gi...@git.apache.org>.
Github user jimjh commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-57102492
  
    @rxin You made a good point, and I understand that traits with default implementations are not Java-compatible and can cause binary compatibility issues. Like you said, this might not be a big deal since we will always compile `Spillable` and everything that uses `Spillable` together. Nothing outside the `spark` package can use `Spillable`.
    
    Another concern comes to mind: if we made `Spillable` an abstract class, would it be more inconvenient to use in other collections in the future, since each class may extend at most one abstract class?
    
    I am actually ambivalent on this issue - it seems that either way has pros and cons, and the none of the cons appear to be huge problems. Let me know if you want me to change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by jimjh <gi...@git.apache.org>.
Github user jimjh commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-56976270
  
    Hey thanks for your review! I am going to work on this over the next few days.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by jimjh <gi...@git.apache.org>.
Github user jimjh commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-57116595
  
    YEAH. Thanks @rxin and @andrewor14 for the feedback.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by jimjh <gi...@git.apache.org>.
Github user jimjh commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-55935637
  
    Are we good to merge?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-55808306
  
    ok to test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-57070772
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20924/consoleFull) for   PR 2416 at commit [`cf8be9a`](https://github.com/apache/spark/commit/cf8be9a59f1dbca3d0dcfbd973c3858b6fa50d50).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by jimjh <gi...@git.apache.org>.
Github user jimjh commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-57038079
  
    Huh pyspark?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-56990060
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20861/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-57072022
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20924/consoleFull) for   PR 2416 at commit [`cf8be9a`](https://github.com/apache/spark/commit/cf8be9a59f1dbca3d0dcfbd973c3858b6fa50d50).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2416#discussion_r17922543
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/Spillable.scala ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.util.collection
    +
    +import org.apache.spark.SparkEnv
    +import scala.language.reflectiveCalls
    +
    +/**
    + * Spills contents of an in-memory collection to disk when the memory threshold
    + * has been exceeded.
    + *
    + * @tparam C collection type that provides a size estimate
    + */
    +private[spark] trait Spillable[C <: { def estimateSize(): Long }] {
    +
    +  // Number of elements read from input since last spill
    +  protected[this] var elementsRead: Long
    +
    +  // Memory manager that can be used to acquire/release memory
    +  private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager
    +
    +  // What threshold of elementsRead we start estimating collection size at
    +  private[this] val trackMemoryThreshold = 1000
    +
    +  // How much of the shared memory pool this collection has claimed
    +  private[this] var myMemoryThreshold = 0L
    +
    +  // Number of bytes spilled in total
    +  private[this] var _memoryBytesSpilled = 0L
    +
    +  // Number of spills
    +  private[this] var _spillCount = 0
    +
    +  /**
    +   * Spills the current in-memory collection to disk if needed. Attempts to acquire more
    +   * memory before spilling.
    +   *
    +   * @tparam A type of collection to be spilled
    +   * @return if spilled, a new empty collection instance; otherwise, the same collection instance
    +   */
    +  protected[this] def maybeSpill[A <: C](collection: A): A = {
    +    if (elementsRead > trackMemoryThreshold && elementsRead % 32 == 0 &&
    +        collection.estimateSize() >= myMemoryThreshold) {
    +      // Claim up to double our current memory from the shuffle memory pool
    +      val currentMemory = collection.estimateSize()
    +      val amountToRequest = 2 * currentMemory - myMemoryThreshold
    +      val granted = shuffleMemoryManager.tryToAcquire(amountToRequest)
    +      myMemoryThreshold += granted
    +      if (myMemoryThreshold <= currentMemory) {
    +        // We were granted too little memory to grow further (either tryToAcquire returned 0,
    +        // or we already had more memory than myMemoryThreshold); spill the current collection
    +        _spillCount += 1
    +        val empty = spill[A](collection)
    +        _memoryBytesSpilled += currentMemory
    +        release()
    +        return empty
    +      }
    +    }
    +    collection
    +  }
    +
    +  /**
    +   * Spills the current in-memory collection to disk, and releases the memory.
    +   *
    +   * @param collection collection to spill to disk
    +   * @return new, empty collection
    +   */
    +  protected[this] def spill[A <: C](collection: A): A
    --- End diff --
    
    This should just return unit, and let the caller create a new collection.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-57039561
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20898/consoleFull) for   PR 2416 at commit [`f94d522`](https://github.com/apache/spark/commit/f94d522f85274c3562c25235936e74536cd3daa7).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-56992523
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20863/consoleFull) for   PR 2416 at commit [`f94d522`](https://github.com/apache/spark/commit/f94d522f85274c3562c25235936e74536cd3daa7).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2416#discussion_r18121816
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/Spillable.scala ---
    @@ -0,0 +1,111 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.util.collection
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkEnv
    +import scala.language.reflectiveCalls
    +
    +/**
    + * Spills contents of an in-memory collection to disk when the memory threshold
    + * has been exceeded.
    + */
    +private[spark] trait Spillable[C] {
    +
    +  this: Logging =>
    +
    +  // Number of elements read from input since last spill
    +  protected var elementsRead: Long
    +
    +  // Memory manager that can be used to acquire/release memory
    +  private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager
    +
    +  // What threshold of elementsRead we start estimating collection size at
    +  private[this] val trackMemoryThreshold = 1000
    +
    +  // How much of the shared memory pool this collection has claimed
    +  private[this] var myMemoryThreshold = 0L
    +
    +  // Number of bytes spilled in total
    +  private[this] var _memoryBytesSpilled = 0L
    +
    +  // Number of spills
    +  private[this] var _spillCount = 0
    +
    +  /**
    +   * Spills the current in-memory collection to disk if needed. Attempts to acquire more
    +   * memory before spilling.
    +   *
    +   * @return if spilled, a new empty collection instance; otherwise, the same collection instance
    +   */
    +  protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
    +    if (elementsRead > trackMemoryThreshold && elementsRead % 32 == 0 &&
    +        currentMemory >= myMemoryThreshold) {
    +      // Claim up to double our current memory from the shuffle memory pool
    +      val amountToRequest = 2 * currentMemory - myMemoryThreshold
    +      val granted = shuffleMemoryManager.tryToAcquire(amountToRequest)
    +      myMemoryThreshold += granted
    +      if (myMemoryThreshold <= currentMemory) {
    +        // We were granted too little memory to grow further (either tryToAcquire returned 0,
    +        // or we already had more memory than myMemoryThreshold); spill the current collection
    +        _spillCount += 1
    +        logSpillage(currentMemory)
    +
    +        spill(collection)
    +
    +        // Keep track of spills, and release memory
    +        _memoryBytesSpilled += currentMemory
    +        releaseMemoryForThisThread()
    +        return true
    +      }
    +    }
    +    false
    +  }
    +
    +  /**
    +   * Spills the current in-memory collection to disk, and releases the memory.
    +   *
    +   * @param collection collection to spill to disk
    +   * @return new, empty collection
    +   */
    +  protected def spill(collection: C): Unit
    +
    +  /**
    +   * @return number of bytes spilled in total
    +   */
    +  def memoryBytesSpilled: Long = _memoryBytesSpilled
    +
    +  /**
    +   * Release our memory back to the shuffle pool so that other threads can grab it.
    +   */
    +  private[this] def releaseMemoryForThisThread(): Unit = {
    --- End diff --
    
    let's just do private instead of private[this] for methods. for fields private[this] removes the accessor which is good, but for methods it doesn't really mean much


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2416#discussion_r17923361
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
    @@ -271,19 +246,17 @@ private[spark] class ExternalSorter[K, V, C](
           spillToMergeableFile(collection)
         }
     
    -    if (usingMap) {
    -      map = new SizeTrackingAppendOnlyMap[(Int, K), C]
    -    } else {
    -      buffer = new SizeTrackingPairBuffer[(Int, K), C]
    -    }
    -
    -    // Release our memory back to the shuffle pool so that other threads can grab it
    -    shuffleMemoryManager.release(myMemoryThreshold)
    -    myMemoryThreshold = 0
    -
    -    _memoryBytesSpilled += memorySize
    +    newCollection[A](collection)
       }
     
    +  private def newCollection[A](t: A): A =
    +    t match {
    +      case _: AppendOnlyMap[_, _] =>
    +        new SizeTrackingAppendOnlyMap[(Int, K), C].asInstanceOf[A]
    +      case _ =>
    +        new SizeTrackingPairBuffer[(Int, K), C].asInstanceOf[A]
    +    }
    +
    --- End diff --
    
    We don't need to match here to find out what type to return. We already know this through `usingMap`. Also, if you follow my suggestions elsewhere then we can just the `map` and `buffer` variables directly instead of returning it and trying to cast it back to some type `A`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2416#discussion_r17924149
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
    @@ -232,35 +224,18 @@ private[spark] class ExternalSorter[K, V, C](
           return
         }
     
    -    val collection: SizeTrackingPairCollection[(Int, K), C] = if (usingMap) map else buffer
    -
    -    // TODO: factor this out of both here and ExternalAppendOnlyMap
    -    if (elementsRead > trackMemoryThreshold && elementsRead % 32 == 0 &&
    -        collection.estimateSize() >= myMemoryThreshold)
    -    {
    -      // Claim up to double our current memory from the shuffle memory pool
    -      val currentMemory = collection.estimateSize()
    -      val amountToRequest = 2 * currentMemory - myMemoryThreshold
    -      val granted = shuffleMemoryManager.tryToAcquire(amountToRequest)
    -      myMemoryThreshold += granted
    -      if (myMemoryThreshold <= currentMemory) {
    -        // We were granted too little memory to grow further (either tryToAcquire returned 0,
    -        // or we already had more memory than myMemoryThreshold); spill the current collection
    -        spill(currentMemory, usingMap)  // Will also release memory back to ShuffleMemoryManager
    -      }
    +    if (usingMap) {
    +      map = maybeSpill(map)
    +    } else {
    +      buffer = maybeSpill(buffer)
         }
       }
     
       /**
        * Spill the current in-memory collection to disk, adding a new file to spills, and clear it.
    -   *
    -   * @param usingMap whether we're using a map or buffer as our current in-memory collection
        */
    -  private def spill(memorySize: Long, usingMap: Boolean): Unit = {
    -    val collection: SizeTrackingPairCollection[(Int, K), C] = if (usingMap) map else buffer
    +  protected[this] def spill[A <: SizeTrackingPairCollection[(Int, K), C]](collection: A): A = {
         val memorySize = collection.estimateSize()
    -
    -    spillCount += 1
         val threadId = Thread.currentThread().getId
         logInfo("Thread %d spilling in-memory batch of %d MB to disk (%d spill%s so far)"
           .format(threadId, memorySize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
    --- End diff --
    
    Not your code, but this message is also identical across both places. Maybe move them into the parent `spill` as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by jimjh <gi...@git.apache.org>.
Github user jimjh commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-56991616
  
    @andrewor14 I made most of the changes you recommended, and simplified the signature for Spillable.
    
    However, I am not sure how to remove `Spillable`'s type parameter entirely, because I think child classes should be able to specialize `#spill`. For example, `ExternalSorter` should provide `spill(collection: SizeTrackingPairCollection[(Int, K), C])`, but not `spill[C](collection: C)`. This allows it to pass `collection` along to customized spilling code (such as `spillToPartitionFiles` and `spillToMergeableFiles`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-55801917
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-55843994
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20444/consoleFull) for   PR 2416 at commit [`7270e0d`](https://github.com/apache/spark/commit/7270e0d45e55113e8a3de0e08c93d31c4c93efba).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-57000909
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20863/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-57040993
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20898/consoleFull) for   PR 2416 at commit [`f94d522`](https://github.com/apache/spark/commit/f94d522f85274c3562c25235936e74536cd3daa7).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-57039489
  
    Yeah PySpark tests were failing across all PRs. Let's retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by jimjh <gi...@git.apache.org>.
Github user jimjh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2416#discussion_r18096986
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---
    @@ -66,23 +66,19 @@ class ExternalAppendOnlyMap[K, V, C](
         mergeCombiners: (C, C) => C,
         serializer: Serializer = SparkEnv.get.serializer,
         blockManager: BlockManager = SparkEnv.get.blockManager)
    -  extends Iterable[(K, C)] with Serializable with Logging {
    +  extends Iterable[(K, C)]
    +  with Serializable
    +  with Logging
    +  with Spillable[SizeTrackingAppendOnlyMap[K, C]] {
     
       private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
       private val spilledMaps = new ArrayBuffer[DiskMapIterator]
       private val sparkConf = SparkEnv.get.conf
       private val diskBlockManager = blockManager.diskBlockManager
    -  private val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager
     
       // Number of pairs inserted since last spill; note that we count them even if a value is merged
       // with a previous key in case we're doing something like groupBy where the result grows
    -  private var elementsRead = 0L
    -
    -  // Number of in-memory pairs inserted before tracking the map's shuffle memory usage
    -  private val trackMemoryThreshold = 1000
    -
    -  // How much of the shared memory pool this collection has claimed
    -  private var myMemoryThreshold = 0L
    +  protected[this] var elementsRead = 0L
    --- End diff --
    
    `protected[this]` prevents other instances of the same class from accessing the member. You are right - it's overkill.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by jimjh <gi...@git.apache.org>.
Github user jimjh commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-56545365
  
    Hey @andrewor14 - have you had time to review the pull request?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-57040996
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20898/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by jimjh <gi...@git.apache.org>.
Github user jimjh commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-57070773
  
    @rxin Thanks for your feedback. I agree with almost all of your comments and made the appropriate changes. However, I don't think it should be an abstract class. According to Programming in Scala: _To trait or not to trait_,
    
    > If it might be reused in multiple, unrelated classes, make it a trait. Only traits can be mixed into different parts of the class hierarchy.
    
    `Spillable` seems to fit that criteria.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2416#discussion_r18121824
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/Spillable.scala ---
    @@ -0,0 +1,111 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.util.collection
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkEnv
    +import scala.language.reflectiveCalls
    +
    +/**
    + * Spills contents of an in-memory collection to disk when the memory threshold
    + * has been exceeded.
    + */
    +private[spark] trait Spillable[C] {
    +
    +  this: Logging =>
    +
    +  // Number of elements read from input since last spill
    +  protected var elementsRead: Long
    +
    +  // Memory manager that can be used to acquire/release memory
    +  private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager
    +
    +  // What threshold of elementsRead we start estimating collection size at
    +  private[this] val trackMemoryThreshold = 1000
    +
    +  // How much of the shared memory pool this collection has claimed
    +  private[this] var myMemoryThreshold = 0L
    +
    +  // Number of bytes spilled in total
    +  private[this] var _memoryBytesSpilled = 0L
    +
    +  // Number of spills
    +  private[this] var _spillCount = 0
    +
    +  /**
    +   * Spills the current in-memory collection to disk if needed. Attempts to acquire more
    +   * memory before spilling.
    +   *
    +   * @return if spilled, a new empty collection instance; otherwise, the same collection instance
    +   */
    +  protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
    +    if (elementsRead > trackMemoryThreshold && elementsRead % 32 == 0 &&
    +        currentMemory >= myMemoryThreshold) {
    +      // Claim up to double our current memory from the shuffle memory pool
    +      val amountToRequest = 2 * currentMemory - myMemoryThreshold
    +      val granted = shuffleMemoryManager.tryToAcquire(amountToRequest)
    +      myMemoryThreshold += granted
    +      if (myMemoryThreshold <= currentMemory) {
    +        // We were granted too little memory to grow further (either tryToAcquire returned 0,
    +        // or we already had more memory than myMemoryThreshold); spill the current collection
    +        _spillCount += 1
    +        logSpillage(currentMemory)
    +
    +        spill(collection)
    +
    +        // Keep track of spills, and release memory
    +        _memoryBytesSpilled += currentMemory
    +        releaseMemoryForThisThread()
    +        return true
    +      }
    +    }
    +    false
    +  }
    +
    +  /**
    +   * Spills the current in-memory collection to disk, and releases the memory.
    +   *
    +   * @param collection collection to spill to disk
    +   * @return new, empty collection
    +   */
    +  protected def spill(collection: C): Unit
    --- End diff --
    
    is this the only method subclasses need to implement? if yes, maybe move this to the beginning of the class so it is obvious ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2416#discussion_r17922781
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/Spillable.scala ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.util.collection
    +
    +import org.apache.spark.SparkEnv
    +import scala.language.reflectiveCalls
    --- End diff --
    
    Is this import used?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-55840004
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20444/consoleFull) for   PR 2416 at commit [`7270e0d`](https://github.com/apache/spark/commit/7270e0d45e55113e8a3de0e08c93d31c4c93efba).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by jimjh <gi...@git.apache.org>.
Github user jimjh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2416#discussion_r18097804
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/Spillable.scala ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.util.collection
    +
    +import org.apache.spark.SparkEnv
    +import scala.language.reflectiveCalls
    --- End diff --
    
    This import was used because of the funky type parameter. It will be removed after I refactor that bit out.
    
    ```scala
    trait Spillable[C <: { def estimateSize(): Long }] {
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2416#discussion_r18121813
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/Spillable.scala ---
    @@ -0,0 +1,111 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.util.collection
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkEnv
    +import scala.language.reflectiveCalls
    --- End diff --
    
    can u move this before org.apache.spark?  Take a look at the import ordering section: https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-55839767
  
    Let's try again. Jenkins, this is a-ok to test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2416#discussion_r17923757
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/Spillable.scala ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.util.collection
    +
    +import org.apache.spark.SparkEnv
    +import scala.language.reflectiveCalls
    +
    +/**
    + * Spills contents of an in-memory collection to disk when the memory threshold
    + * has been exceeded.
    + *
    + * @tparam C collection type that provides a size estimate
    + */
    +private[spark] trait Spillable[C <: { def estimateSize(): Long }] {
    +
    +  // Number of elements read from input since last spill
    +  protected[this] var elementsRead: Long
    +
    +  // Memory manager that can be used to acquire/release memory
    +  private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager
    +
    +  // What threshold of elementsRead we start estimating collection size at
    +  private[this] val trackMemoryThreshold = 1000
    +
    +  // How much of the shared memory pool this collection has claimed
    +  private[this] var myMemoryThreshold = 0L
    +
    +  // Number of bytes spilled in total
    +  private[this] var _memoryBytesSpilled = 0L
    +
    +  // Number of spills
    +  private[this] var _spillCount = 0
    +
    +  /**
    +   * Spills the current in-memory collection to disk if needed. Attempts to acquire more
    +   * memory before spilling.
    +   *
    +   * @tparam A type of collection to be spilled
    +   * @return if spilled, a new empty collection instance; otherwise, the same collection instance
    +   */
    +  protected[this] def maybeSpill[A <: C](collection: A): A = {
    +    if (elementsRead > trackMemoryThreshold && elementsRead % 32 == 0 &&
    +        collection.estimateSize() >= myMemoryThreshold) {
    +      // Claim up to double our current memory from the shuffle memory pool
    +      val currentMemory = collection.estimateSize()
    +      val amountToRequest = 2 * currentMemory - myMemoryThreshold
    +      val granted = shuffleMemoryManager.tryToAcquire(amountToRequest)
    +      myMemoryThreshold += granted
    +      if (myMemoryThreshold <= currentMemory) {
    +        // We were granted too little memory to grow further (either tryToAcquire returned 0,
    +        // or we already had more memory than myMemoryThreshold); spill the current collection
    +        _spillCount += 1
    +        val empty = spill[A](collection)
    +        _memoryBytesSpilled += currentMemory
    +        release()
    +        return empty
    +      }
    +    }
    +    collection
    +  }
    +
    +  /**
    +   * Spills the current in-memory collection to disk, and releases the memory.
    +   *
    +   * @param collection collection to spill to disk
    +   * @return new, empty collection
    +   */
    +  protected[this] def spill[A <: C](collection: A): A
    +
    +  /**
    +   * @return total number of times this collection was spilled
    +   */
    +  protected[this] def spillCount: Int = _spillCount
    --- End diff --
    
    Same here, no need for `this`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-56118839
  
    @jimjh First we will need to review it. I'll get it to it in the next day or two.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-56556992
  
    Hey @jimjh thanks for doing this. I left a few comments. In general I think we can simplify the signature of `Spillable` and its methods by not taking in a type parameter and letting the subclasses handle instantiating new collections themselves (they already do this anyway).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-57072352
  
    BTW the reason why abstract classes are favored over traits is because traits (as with many advanced scala features) with default impls complicate a lot of things, especially when it comes to java and binary compatibility.
    
    I guess in this case it might be ok because Spillable is not a public thing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-56989906
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20861/consoleFull) for   PR 2416 at commit [`c579aab`](https://github.com/apache/spark/commit/c579aaba88031570cb7261f1f1c4de23407db3b7).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2416#discussion_r17922720
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/Spillable.scala ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.util.collection
    +
    +import org.apache.spark.SparkEnv
    +import scala.language.reflectiveCalls
    +
    +/**
    + * Spills contents of an in-memory collection to disk when the memory threshold
    + * has been exceeded.
    + *
    + * @tparam C collection type that provides a size estimate
    + */
    +private[spark] trait Spillable[C <: { def estimateSize(): Long }] {
    --- End diff --
    
    Actually this doesn't even need to take in a type param. See my comments elsewhere.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2416#discussion_r18121834
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/Spillable.scala ---
    @@ -0,0 +1,111 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.util.collection
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkEnv
    +import scala.language.reflectiveCalls
    +
    +/**
    + * Spills contents of an in-memory collection to disk when the memory threshold
    + * has been exceeded.
    + */
    +private[spark] trait Spillable[C] {
    --- End diff --
    
    do u think we can maybe make this an abstract class instead of trait since it has a lot of default implementations? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-57000902
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20863/consoleFull) for   PR 2416 at commit [`f94d522`](https://github.com/apache/spark/commit/f94d522f85274c3562c25235936e74536cd3daa7).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class IDF(val minDocFreq: Int) `
      * `  class DocumentFrequencyAggregator(val minDocFreq: Int) extends Serializable `
      * `class PStatsParam(AccumulatorParam):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/2416


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-57109766
  
    Ok I'm merging this. Thanks @jimjh.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2416#discussion_r18121818
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/Spillable.scala ---
    @@ -0,0 +1,111 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.util.collection
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkEnv
    +import scala.language.reflectiveCalls
    +
    +/**
    + * Spills contents of an in-memory collection to disk when the memory threshold
    + * has been exceeded.
    + */
    +private[spark] trait Spillable[C] {
    +
    +  this: Logging =>
    +
    +  // Number of elements read from input since last spill
    +  protected var elementsRead: Long
    +
    +  // Memory manager that can be used to acquire/release memory
    +  private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager
    +
    +  // What threshold of elementsRead we start estimating collection size at
    +  private[this] val trackMemoryThreshold = 1000
    +
    +  // How much of the shared memory pool this collection has claimed
    +  private[this] var myMemoryThreshold = 0L
    +
    +  // Number of bytes spilled in total
    +  private[this] var _memoryBytesSpilled = 0L
    +
    +  // Number of spills
    +  private[this] var _spillCount = 0
    +
    +  /**
    +   * Spills the current in-memory collection to disk if needed. Attempts to acquire more
    +   * memory before spilling.
    +   *
    +   * @return if spilled, a new empty collection instance; otherwise, the same collection instance
    +   */
    +  protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
    +    if (elementsRead > trackMemoryThreshold && elementsRead % 32 == 0 &&
    +        currentMemory >= myMemoryThreshold) {
    +      // Claim up to double our current memory from the shuffle memory pool
    +      val amountToRequest = 2 * currentMemory - myMemoryThreshold
    +      val granted = shuffleMemoryManager.tryToAcquire(amountToRequest)
    +      myMemoryThreshold += granted
    +      if (myMemoryThreshold <= currentMemory) {
    +        // We were granted too little memory to grow further (either tryToAcquire returned 0,
    +        // or we already had more memory than myMemoryThreshold); spill the current collection
    +        _spillCount += 1
    +        logSpillage(currentMemory)
    +
    +        spill(collection)
    +
    +        // Keep track of spills, and release memory
    +        _memoryBytesSpilled += currentMemory
    +        releaseMemoryForThisThread()
    +        return true
    +      }
    +    }
    +    false
    +  }
    +
    +  /**
    +   * Spills the current in-memory collection to disk, and releases the memory.
    +   *
    +   * @param collection collection to spill to disk
    +   * @return new, empty collection
    +   */
    +  protected def spill(collection: C): Unit
    +
    +  /**
    +   * @return number of bytes spilled in total
    +   */
    +  def memoryBytesSpilled: Long = _memoryBytesSpilled
    +
    +  /**
    +   * Release our memory back to the shuffle pool so that other threads can grab it.
    +   */
    +  private[this] def releaseMemoryForThisThread(): Unit = {
    +    shuffleMemoryManager.release(myMemoryThreshold)
    +    myMemoryThreshold = 0L
    +  }
    +
    +  /**
    +   * Prints a standard log message detailing spillage.
    +   *
    +   * @param size number of bytes spilled
    +   */
    +  @inline private[this] def logSpillage(size: Long) {
    --- End diff --
    
    here too


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-57072266
  
    Are we going to use this in multiple unrelated classes? As far as I can tell, this is only used for collections ... 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2416#discussion_r17921484
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---
    @@ -66,23 +66,19 @@ class ExternalAppendOnlyMap[K, V, C](
         mergeCombiners: (C, C) => C,
         serializer: Serializer = SparkEnv.get.serializer,
         blockManager: BlockManager = SparkEnv.get.blockManager)
    -  extends Iterable[(K, C)] with Serializable with Logging {
    +  extends Iterable[(K, C)]
    +  with Serializable
    +  with Logging
    +  with Spillable[SizeTrackingAppendOnlyMap[K, C]] {
     
       private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
       private val spilledMaps = new ArrayBuffer[DiskMapIterator]
       private val sparkConf = SparkEnv.get.conf
       private val diskBlockManager = blockManager.diskBlockManager
    -  private val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager
     
       // Number of pairs inserted since last spill; note that we count them even if a value is merged
       // with a previous key in case we're doing something like groupBy where the result grows
    -  private var elementsRead = 0L
    -
    -  // Number of in-memory pairs inserted before tracking the map's shuffle memory usage
    -  private val trackMemoryThreshold = 1000
    -
    -  // How much of the shared memory pool this collection has claimed
    -  private var myMemoryThreshold = 0L
    +  protected[this] var elementsRead = 0L
    --- End diff --
    
    What does `protected[this]` mean? It seems that we can just use `protected` here to keep the signatures simple.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2416#discussion_r17922058
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/Spillable.scala ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.util.collection
    +
    +import org.apache.spark.SparkEnv
    +import scala.language.reflectiveCalls
    +
    +/**
    + * Spills contents of an in-memory collection to disk when the memory threshold
    + * has been exceeded.
    + *
    + * @tparam C collection type that provides a size estimate
    + */
    +private[spark] trait Spillable[C <: { def estimateSize(): Long }] {
    --- End diff --
    
    You can just make it extend `SizeTracker` here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2416#discussion_r17922474
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/Spillable.scala ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.util.collection
    +
    +import org.apache.spark.SparkEnv
    +import scala.language.reflectiveCalls
    +
    +/**
    + * Spills contents of an in-memory collection to disk when the memory threshold
    + * has been exceeded.
    + *
    + * @tparam C collection type that provides a size estimate
    + */
    +private[spark] trait Spillable[C <: { def estimateSize(): Long }] {
    +
    +  // Number of elements read from input since last spill
    +  protected[this] var elementsRead: Long
    +
    +  // Memory manager that can be used to acquire/release memory
    +  private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager
    +
    +  // What threshold of elementsRead we start estimating collection size at
    +  private[this] val trackMemoryThreshold = 1000
    +
    +  // How much of the shared memory pool this collection has claimed
    +  private[this] var myMemoryThreshold = 0L
    +
    +  // Number of bytes spilled in total
    +  private[this] var _memoryBytesSpilled = 0L
    +
    +  // Number of spills
    +  private[this] var _spillCount = 0
    +
    +  /**
    +   * Spills the current in-memory collection to disk if needed. Attempts to acquire more
    +   * memory before spilling.
    +   *
    +   * @tparam A type of collection to be spilled
    +   * @return if spilled, a new empty collection instance; otherwise, the same collection instance
    +   */
    +  protected[this] def maybeSpill[A <: C](collection: A): A = {
    --- End diff --
    
    Actually, after staring at this for a while I think we don't even need the type signature here. Can't we just pass in a `SizeTracker` as a parameter and return a boolean of whether we spilled or not?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2761 refactor #maybeSpill into Spillable

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2416#issuecomment-56990056
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20861/consoleFull) for   PR 2416 at commit [`c579aab`](https://github.com/apache/spark/commit/c579aaba88031570cb7261f1f1c4de23407db3b7).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class IDF(val minDocFreq: Int) `
      * `  class DocumentFrequencyAggregator(val minDocFreq: Int) extends Serializable `
      * `class PStatsParam(AccumulatorParam):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org