You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Ilya Ganelin (JIRA)" <ji...@apache.org> on 2015/01/02 19:57:34 UTC

[jira] [Comment Edited] (SPARK-4927) Spark does not clean up properly during long jobs.

    [ https://issues.apache.org/jira/browse/SPARK-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14262313#comment-14262313 ] 

Ilya Ganelin edited comment on SPARK-4927 at 1/2/15 6:56 PM:
-------------------------------------------------------------

The below code reproduces the problem. Code fragment is a little meaningless since it's based on more filled out code and is intended mainly to show the issue.

The memory consumption becomes an issue only when i add the "lookup" on the second larger RDD. 

{code}
def showMemoryUsage(sc: SparkContext) = {
  val usersPerStep = 2500
  val count = 1000000
  val numSteps = count / usersPerStep

  val users = sc.parallelize(1 to count)
  val zippedUsers = users.zipWithIndex().cache()
  val userFeatures: RDD[(Int, Int)] = sc.parallelize(1 to count).map(s => (s, 2)).partitionBy(new HashPartitioner(200)).cache()
  val productFeatures: RDD[(Int, Int)] = sc.parallelize(1 to 1000000).map(s => (s, 4)).repartition(1).cache()

  for (i <- 1 to numSteps) {
    val usersFiltered = zippedUsers.filter(s => {
      ((i - 1) * usersPerStep <= s._2) && (s._2 < i * usersPerStep)
    }).map(_._1).collect()

    val results = usersFiltered.map(user => {
      val userScore = userFeatures.lookup(user).head
      val recPerUser = Array(1,2,userScore)
      recPerUser
    })

    val mapedResults: Array[Int] = results.flatMap(scores => scores).toArray
    log("State: Computed " + mapedResults.length + " predictions for stage " + i)

    sc.parallelize(mapedResults)
    // Write to disk (left out since problem is evident even without it)
  }
}
{code}

Example broadcast variable added:
14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0piece0 in memory on CLIENT_NODE:54640 (size: 794.0 B, free: 441.9 MB)
And then if I parse the entire log looking for “free : XXX.X MB” within a single step memory is cleared properly:
Free 441.1 MB
Free 439.8 MB
Free 439.8 MB
Free 441.1 MB
Free 441.1 MB
Free 439.8 MB

But between steps, the amount of available memory decreases (e.g. That range that things oscillate between shrinks) and over the course of many hours this eventually reduces to zero. 
Free 440.7 MB
Free 438.7 MB
Free 438.7 MB
Free 440.7 MB
Free 435.4 MB
Free 425.0 MB
Free 425.0 MB
Free 435.4 MB
Free 425.0 MB
Free 425.0 MB
Free 435.4 MB



was (Author: ilganeli):
The below code reproduces the problem. Code fragment is a little meaningless since it's based on more filled out code and is intended mainly to show the issue.
{code}
def showMemoryUsage(sc: SparkContext) = {
  val usersPerStep = 2500
  val count = 1000000
  val numSteps = count / usersPerStep

  val users = sc.parallelize(1 to count)
  val zippedUsers = users.zipWithIndex().cache()
  val userFeatures: RDD[(Int, Int)] = sc.parallelize(1 to count).map(s => (s, 2)).partitionBy(new HashPartitioner(200)).cache()
  val productFeatures: RDD[(Int, Int)] = sc.parallelize(1 to 1000000).map(s => (s, 4)).repartition(1).cache()

  for (i <- 1 to numSteps) {
    val usersFiltered = zippedUsers.filter(s => {
      ((i - 1) * usersPerStep <= s._2) && (s._2 < i * usersPerStep)
    }).map(_._1).collect()

    val results = usersFiltered.map(user => {
      val userScore = userFeatures.lookup(user).head
      val recPerUser = Array(1,2,userScore)
      recPerUser
    })

    val mapedResults: Array[Int] = results.flatMap(scores => scores).toArray
    log("State: Computed " + mapedResults.length + " predictions for stage " + i)

    sc.parallelize(mapedResults)
    // Write to disk (left out since problem is evident even without it)
  }
}
{code}

Example broadcast variable added:
14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0piece0 in memory on CLIENT_NODE:54640 (size: 794.0 B, free: 441.9 MB)
And then if I parse the entire log looking for “free : XXX.X MB” within a single step memory is cleared properly:
Free 441.1 MB
Free 439.8 MB
Free 439.8 MB
Free 441.1 MB
Free 441.1 MB
Free 439.8 MB

But between steps, the amount of available memory decreases (e.g. That range that things oscillate between shrinks) and over the course of many hours this eventually reduces to zero. 
Free 440.7 MB
Free 438.7 MB
Free 438.7 MB
Free 440.7 MB
Free 435.4 MB
Free 425.0 MB
Free 425.0 MB
Free 435.4 MB
Free 425.0 MB
Free 425.0 MB
Free 435.4 MB


> Spark does not clean up properly during long jobs. 
> ---------------------------------------------------
>
>                 Key: SPARK-4927
>                 URL: https://issues.apache.org/jira/browse/SPARK-4927
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.1.0
>            Reporter: Ilya Ganelin
>
> On a long running Spark job, Spark will eventually run out of memory on the driver node due to metadata overhead from the shuffle operation. Spark will continue to operate, however with drastically decreased performance (since swapping now occurs with every operation).
> The spark.cleanup.tll parameter allows a user to configure when cleanup happens but the issue with doing this is that it isn’t done safely, e.g. If this clears a cached RDD or active task in the middle of processing a stage, this ultimately causes a KeyNotFoundException when the next stage attempts to reference the cleared RDD or task.
> There should be a sustainable mechanism for cleaning up stale metadata that allows the program to continue running. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org