You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by jimfcarroll <gi...@git.apache.org> on 2015/04/23 21:21:11 UTC

[GitHub] spark pull request: [SPARK-7100][MLLib] Fix persisted RDD leak in ...

GitHub user jimfcarroll opened a pull request:

    https://github.com/apache/spark/pull/5669

    [SPARK-7100][MLLib] Fix persisted RDD leak in GradientBoostTrees

    This fixes a leak of a persisted RDD where GradientBoostTrees can call persist but never unpersists.
    
    Jira: https://issues.apache.org/jira/browse/SPARK-7100
    
    Discussion: http://apache-spark-developers-list.1001551.n3.nabble.com/GradientBoostTrees-leaks-a-persisted-RDD-td11750.html


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jimfcarroll/spark gb-unpersist-fix

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/5669.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5669
    
----
commit e5be57c925f371edb96edbe7385cf62ebd9d575f
Author: Jim Carroll <ji...@dontcallme.com>
Date:   2015-04-23T19:18:33Z

    [SPARK-7100][MLLib] Fix persisted RDD leak in GradientBoostTrees

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7100][MLLib] Fix persisted RDD leak in ...

Posted by jimfcarroll <gi...@git.apache.org>.
Github user jimfcarroll commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5669#discussion_r28995720
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala ---
    @@ -177,102 +177,108 @@ object GradientBoostedTrees extends Logging {
         treeStrategy.assertValid()
     
         // Cache input
    -    if (input.getStorageLevel == StorageLevel.NONE) {
    +    val persistedInput = if (input.getStorageLevel == StorageLevel.NONE) {
           input.persist(StorageLevel.MEMORY_AND_DISK)
    -    }
    +      true
    +    } else false
     
    -    timer.stop("init")
    +    try {
    --- End diff --
    
    I override and intercept most RDD calls in my own code to optimize them (which is how I found this in the first place) and we use try-with-resource to persist and unpersist. It's too bad you cant do that simply in scala.
    
    If you want the simpler solution let me know. I can change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7100][MLLib] Fix persisted RDD leak in ...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5669#discussion_r29004162
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala ---
    @@ -177,102 +177,108 @@ object GradientBoostedTrees extends Logging {
         treeStrategy.assertValid()
     
         // Cache input
    -    if (input.getStorageLevel == StorageLevel.NONE) {
    +    val persistedInput = if (input.getStorageLevel == StorageLevel.NONE) {
           input.persist(StorageLevel.MEMORY_AND_DISK)
    -    }
    +      true
    +    } else false
     
    -    timer.stop("init")
    +    try {
    +      timer.stop("init")
     
    -    logDebug("##########")
    -    logDebug("Building tree 0")
    -    logDebug("##########")
    -    var data = input
    +      logDebug("##########")
    +      logDebug("Building tree 0")
    +      logDebug("##########")
    +      var data = input
     
    -    // Initialize tree
    -    timer.start("building tree 0")
    -    val firstTreeModel = new DecisionTree(treeStrategy).run(data)
    -    val firstTreeWeight = 1.0
    -    baseLearners(0) = firstTreeModel
    -    baseLearnerWeights(0) = firstTreeWeight
    +      // Initialize tree
    +      timer.start("building tree 0")
    +      val firstTreeModel = new DecisionTree(treeStrategy).run(data)
    +      val firstTreeWeight = 1.0
    +      baseLearners(0) = firstTreeModel
    +      baseLearnerWeights(0) = firstTreeWeight
     
    -    var predError: RDD[(Double, Double)] = GradientBoostedTreesModel.
    -      computeInitialPredictionAndError(input, firstTreeWeight, firstTreeModel, loss)
    -    logDebug("error of gbt = " + predError.values.mean())
    +      var predError: RDD[(Double, Double)] = GradientBoostedTreesModel.
    +        computeInitialPredictionAndError(input, firstTreeWeight, firstTreeModel, loss)
    +      logDebug("error of gbt = " + predError.values.mean())
     
    -    // Note: A model of type regression is used since we require raw prediction
    -    timer.stop("building tree 0")
    +      // Note: A model of type regression is used since we require raw prediction
    +      timer.stop("building tree 0")
     
    -    var validatePredError: RDD[(Double, Double)] = GradientBoostedTreesModel.
    -      computeInitialPredictionAndError(validationInput, firstTreeWeight, firstTreeModel, loss)
    -    var bestValidateError = if (validate) validatePredError.values.mean() else 0.0
    -    var bestM = 1
    +      var validatePredError: RDD[(Double, Double)] = GradientBoostedTreesModel.
    +        computeInitialPredictionAndError(validationInput, firstTreeWeight, firstTreeModel, loss)
    +      var bestValidateError = if (validate) validatePredError.values.mean() else 0.0
    +      var bestM = 1
     
    -    // pseudo-residual for second iteration
    -    data = predError.zip(input).map { case ((pred, _), point) =>
    -      LabeledPoint(-loss.gradient(pred, point.label), point.features)
    -    }
    +      // pseudo-residual for second iteration
    +      data = predError.zip(input).map { case ((pred, _), point) =>
    +        LabeledPoint(-loss.gradient(pred, point.label), point.features)
    +      }
     
    -    var m = 1
    -    while (m < numIterations) {
    -      timer.start(s"building tree $m")
    -      logDebug("###################################################")
    -      logDebug("Gradient boosting tree iteration " + m)
    -      logDebug("###################################################")
    -      val model = new DecisionTree(treeStrategy).run(data)
    -      timer.stop(s"building tree $m")
    -      // Create partial model
    -      baseLearners(m) = model
    -      // Note: The setting of baseLearnerWeights is incorrect for losses other than SquaredError.
    -      //       Technically, the weight should be optimized for the particular loss.
    -      //       However, the behavior should be reasonable, though not optimal.
    -      baseLearnerWeights(m) = learningRate
    -      // Note: A model of type regression is used since we require raw prediction
    -      val partialModel = new GradientBoostedTreesModel(
    -        Regression, baseLearners.slice(0, m + 1),
    -        baseLearnerWeights.slice(0, m + 1))
    +      var m = 1
    +      while (m < numIterations) {
    +        timer.start(s"building tree $m")
    +        logDebug("###################################################")
    +        logDebug("Gradient boosting tree iteration " + m)
    +        logDebug("###################################################")
    +        val model = new DecisionTree(treeStrategy).run(data)
    +        timer.stop(s"building tree $m")
    +        // Create partial model
    +        baseLearners(m) = model
    +        // Note: The setting of baseLearnerWeights is incorrect for losses other than SquaredError.
    +        //       Technically, the weight should be optimized for the particular loss.
    +        //       However, the behavior should be reasonable, though not optimal.
    +        baseLearnerWeights(m) = learningRate
    +        // Note: A model of type regression is used since we require raw prediction
    +        val partialModel = new GradientBoostedTreesModel(
    +          Regression, baseLearners.slice(0, m + 1),
    +          baseLearnerWeights.slice(0, m + 1))
     
    -      predError = GradientBoostedTreesModel.updatePredictionError(
    -        input, predError, baseLearnerWeights(m), baseLearners(m), loss)
    -      logDebug("error of gbt = " + predError.values.mean())
    +        predError = GradientBoostedTreesModel.updatePredictionError(
    +          input, predError, baseLearnerWeights(m), baseLearners(m), loss)
    +        logDebug("error of gbt = " + predError.values.mean())
     
    -      if (validate) {
    -        // Stop training early if
    -        // 1. Reduction in error is less than the validationTol or
    -        // 2. If the error increases, that is if the model is overfit.
    -        // We want the model returned corresponding to the best validation error.
    +        if (validate) {
    +          // Stop training early if
    +          // 1. Reduction in error is less than the validationTol or
    +          // 2. If the error increases, that is if the model is overfit.
    +          // We want the model returned corresponding to the best validation error.
     
    -        validatePredError = GradientBoostedTreesModel.updatePredictionError(
    -          validationInput, validatePredError, baseLearnerWeights(m), baseLearners(m), loss)
    -        val currentValidateError = validatePredError.values.mean()
    -        if (bestValidateError - currentValidateError < validationTol) {
    -          return new GradientBoostedTreesModel(
    -            boostingStrategy.treeStrategy.algo,
    -            baseLearners.slice(0, bestM),
    -            baseLearnerWeights.slice(0, bestM))
    -        } else if (currentValidateError < bestValidateError) {
    -            bestValidateError = currentValidateError
    -            bestM = m + 1
    +          validatePredError = GradientBoostedTreesModel.updatePredictionError(
    +            validationInput, validatePredError, baseLearnerWeights(m), baseLearners(m), loss)
    +          val currentValidateError = validatePredError.values.mean()
    +          if (bestValidateError - currentValidateError < validationTol) {
    +            return new GradientBoostedTreesModel(
    +              boostingStrategy.treeStrategy.algo,
    +              baseLearners.slice(0, bestM),
    +              baseLearnerWeights.slice(0, bestM))
    +          } else if (currentValidateError < bestValidateError) {
    +              bestValidateError = currentValidateError
    +              bestM = m + 1
    +          }
             }
    +        // Update data with pseudo-residuals
    +        data = predError.zip(input).map { case ((pred, _), point) =>
    +          LabeledPoint(-loss.gradient(pred, point.label), point.features)
    +        }
    +        m += 1
           }
    -      // Update data with pseudo-residuals
    -      data = predError.zip(input).map { case ((pred, _), point) =>
    -        LabeledPoint(-loss.gradient(pred, point.label), point.features)
    -      }
    -      m += 1
    -    }
     
    -    timer.stop("total")
    +      timer.stop("total")
    +
    +      logInfo("Internal timing for DecisionTree:")
    +      logInfo(s"$timer")
    +      if (validate) {
    +        new GradientBoostedTreesModel(
    +          boostingStrategy.treeStrategy.algo,
    +          baseLearners.slice(0, bestM),
    +          baseLearnerWeights.slice(0, bestM))
    +      } else {
    +        new GradientBoostedTreesModel(
    +          boostingStrategy.treeStrategy.algo, baseLearners, baseLearnerWeights)
    +      }
     
    -    logInfo("Internal timing for DecisionTree:")
    -    logInfo(s"$timer")
    -    if (validate) {
    -      new GradientBoostedTreesModel(
    -        boostingStrategy.treeStrategy.algo,
    -        baseLearners.slice(0, bestM),
    -        baseLearnerWeights.slice(0, bestM))
    -    } else {
    -      new GradientBoostedTreesModel(
    -        boostingStrategy.treeStrategy.algo, baseLearners, baseLearnerWeights)
    +    } finally {
    +       if (persistedInput) input.unpersist()
    --- End diff --
    
    I agree it's cleaner this way, if a little more complex.  Concerning consistency, let me ping @mengxr since, if we do this here, we might start talking about doing it everywhere.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7100][MLLib] Fix persisted RDD leak in ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5669#issuecomment-96850642
  
      [Test build #723 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/723/consoleFull) for   PR 5669 at commit [`45f4b03`](https://github.com/apache/spark/commit/45f4b03c4bc8f5e2fa06c84cb83ae5db97e6abec).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7100][MLLib] Fix persisted RDD leak in ...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5669#discussion_r29004054
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala ---
    @@ -177,102 +177,108 @@ object GradientBoostedTrees extends Logging {
         treeStrategy.assertValid()
     
         // Cache input
    -    if (input.getStorageLevel == StorageLevel.NONE) {
    +    val persistedInput = if (input.getStorageLevel == StorageLevel.NONE) {
           input.persist(StorageLevel.MEMORY_AND_DISK)
    -    }
    +      true
    +    } else false
     
    -    timer.stop("init")
    +    try {
    --- End diff --
    
    I too think it makes some sense but have not seen it done elsewhere.  I think I asked about it when I first started working on Spark and was told something like @srowen said.  Even if it's "leaked," other RDDs can still push it out of memory or disk if needed, right?  Have you encountered cases where that does not happen?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7100][MLLib] Fix persisted RDD leak in ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5669#issuecomment-96286212
  
      [Test build #704 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/704/consoleFull) for   PR 5669 at commit [`e5be57c`](https://github.com/apache/spark/commit/e5be57c925f371edb96edbe7385cf62ebd9d575f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7100][MLLib] Fix persisted RDD leak in ...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/5669#issuecomment-96286195
  
    ok to test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7100][MLLib] Fix persisted RDD leak in ...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/5669#issuecomment-96816164
  
    After discussing with @mengxr I think we should not bother with the try-finally wrapper.  As mentioned above, the method should generally not fail, so the data will be unpersisted as needed.  When an exception is thrown, then the data will be unpersisted whenever another RDD pushes "input" out of memory/disk, without undue harm to other jobs.
    
    @jimfcarroll  Could you please update the PR to remove the try-finally wrapper, but keep the unpersist at the end?
    
    Thanks for going through this discussion!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7100][MLLib] Fix persisted RDD leak in ...

Posted by jimfcarroll <gi...@git.apache.org>.
Github user jimfcarroll commented on the pull request:

    https://github.com/apache/spark/pull/5669#issuecomment-96843411
  
    Okay. I force pushed a different commit. I removed the try-finally.
    
    Hope this works. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7100][MLLib] Fix persisted RDD leak in ...

Posted by jimfcarroll <gi...@git.apache.org>.
Github user jimfcarroll commented on the pull request:

    https://github.com/apache/spark/pull/5669#issuecomment-96816661
  
    Your project. I'll downgrade it if you want. :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7100][MLLib] Fix persisted RDD leak in ...

Posted by jimfcarroll <gi...@git.apache.org>.
Github user jimfcarroll commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5669#discussion_r28995832
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala ---
    @@ -177,102 +177,108 @@ object GradientBoostedTrees extends Logging {
         treeStrategy.assertValid()
     
         // Cache input
    -    if (input.getStorageLevel == StorageLevel.NONE) {
    +    val persistedInput = if (input.getStorageLevel == StorageLevel.NONE) {
           input.persist(StorageLevel.MEMORY_AND_DISK)
    -    }
    +      true
    +    } else false
     
    -    timer.stop("init")
    +    try {
    +      timer.stop("init")
     
    -    logDebug("##########")
    -    logDebug("Building tree 0")
    -    logDebug("##########")
    -    var data = input
    +      logDebug("##########")
    +      logDebug("Building tree 0")
    +      logDebug("##########")
    +      var data = input
     
    -    // Initialize tree
    -    timer.start("building tree 0")
    -    val firstTreeModel = new DecisionTree(treeStrategy).run(data)
    -    val firstTreeWeight = 1.0
    -    baseLearners(0) = firstTreeModel
    -    baseLearnerWeights(0) = firstTreeWeight
    +      // Initialize tree
    +      timer.start("building tree 0")
    +      val firstTreeModel = new DecisionTree(treeStrategy).run(data)
    +      val firstTreeWeight = 1.0
    +      baseLearners(0) = firstTreeModel
    +      baseLearnerWeights(0) = firstTreeWeight
     
    -    var predError: RDD[(Double, Double)] = GradientBoostedTreesModel.
    -      computeInitialPredictionAndError(input, firstTreeWeight, firstTreeModel, loss)
    -    logDebug("error of gbt = " + predError.values.mean())
    +      var predError: RDD[(Double, Double)] = GradientBoostedTreesModel.
    +        computeInitialPredictionAndError(input, firstTreeWeight, firstTreeModel, loss)
    +      logDebug("error of gbt = " + predError.values.mean())
     
    -    // Note: A model of type regression is used since we require raw prediction
    -    timer.stop("building tree 0")
    +      // Note: A model of type regression is used since we require raw prediction
    +      timer.stop("building tree 0")
     
    -    var validatePredError: RDD[(Double, Double)] = GradientBoostedTreesModel.
    -      computeInitialPredictionAndError(validationInput, firstTreeWeight, firstTreeModel, loss)
    -    var bestValidateError = if (validate) validatePredError.values.mean() else 0.0
    -    var bestM = 1
    +      var validatePredError: RDD[(Double, Double)] = GradientBoostedTreesModel.
    +        computeInitialPredictionAndError(validationInput, firstTreeWeight, firstTreeModel, loss)
    +      var bestValidateError = if (validate) validatePredError.values.mean() else 0.0
    +      var bestM = 1
     
    -    // pseudo-residual for second iteration
    -    data = predError.zip(input).map { case ((pred, _), point) =>
    -      LabeledPoint(-loss.gradient(pred, point.label), point.features)
    -    }
    +      // pseudo-residual for second iteration
    +      data = predError.zip(input).map { case ((pred, _), point) =>
    +        LabeledPoint(-loss.gradient(pred, point.label), point.features)
    +      }
     
    -    var m = 1
    -    while (m < numIterations) {
    -      timer.start(s"building tree $m")
    -      logDebug("###################################################")
    -      logDebug("Gradient boosting tree iteration " + m)
    -      logDebug("###################################################")
    -      val model = new DecisionTree(treeStrategy).run(data)
    -      timer.stop(s"building tree $m")
    -      // Create partial model
    -      baseLearners(m) = model
    -      // Note: The setting of baseLearnerWeights is incorrect for losses other than SquaredError.
    -      //       Technically, the weight should be optimized for the particular loss.
    -      //       However, the behavior should be reasonable, though not optimal.
    -      baseLearnerWeights(m) = learningRate
    -      // Note: A model of type regression is used since we require raw prediction
    -      val partialModel = new GradientBoostedTreesModel(
    -        Regression, baseLearners.slice(0, m + 1),
    -        baseLearnerWeights.slice(0, m + 1))
    +      var m = 1
    +      while (m < numIterations) {
    +        timer.start(s"building tree $m")
    +        logDebug("###################################################")
    +        logDebug("Gradient boosting tree iteration " + m)
    +        logDebug("###################################################")
    +        val model = new DecisionTree(treeStrategy).run(data)
    +        timer.stop(s"building tree $m")
    +        // Create partial model
    +        baseLearners(m) = model
    +        // Note: The setting of baseLearnerWeights is incorrect for losses other than SquaredError.
    +        //       Technically, the weight should be optimized for the particular loss.
    +        //       However, the behavior should be reasonable, though not optimal.
    +        baseLearnerWeights(m) = learningRate
    +        // Note: A model of type regression is used since we require raw prediction
    +        val partialModel = new GradientBoostedTreesModel(
    +          Regression, baseLearners.slice(0, m + 1),
    +          baseLearnerWeights.slice(0, m + 1))
     
    -      predError = GradientBoostedTreesModel.updatePredictionError(
    -        input, predError, baseLearnerWeights(m), baseLearners(m), loss)
    -      logDebug("error of gbt = " + predError.values.mean())
    +        predError = GradientBoostedTreesModel.updatePredictionError(
    +          input, predError, baseLearnerWeights(m), baseLearners(m), loss)
    +        logDebug("error of gbt = " + predError.values.mean())
     
    -      if (validate) {
    -        // Stop training early if
    -        // 1. Reduction in error is less than the validationTol or
    -        // 2. If the error increases, that is if the model is overfit.
    -        // We want the model returned corresponding to the best validation error.
    +        if (validate) {
    +          // Stop training early if
    +          // 1. Reduction in error is less than the validationTol or
    +          // 2. If the error increases, that is if the model is overfit.
    +          // We want the model returned corresponding to the best validation error.
     
    -        validatePredError = GradientBoostedTreesModel.updatePredictionError(
    -          validationInput, validatePredError, baseLearnerWeights(m), baseLearners(m), loss)
    -        val currentValidateError = validatePredError.values.mean()
    -        if (bestValidateError - currentValidateError < validationTol) {
    -          return new GradientBoostedTreesModel(
    -            boostingStrategy.treeStrategy.algo,
    -            baseLearners.slice(0, bestM),
    -            baseLearnerWeights.slice(0, bestM))
    -        } else if (currentValidateError < bestValidateError) {
    -            bestValidateError = currentValidateError
    -            bestM = m + 1
    +          validatePredError = GradientBoostedTreesModel.updatePredictionError(
    +            validationInput, validatePredError, baseLearnerWeights(m), baseLearners(m), loss)
    +          val currentValidateError = validatePredError.values.mean()
    +          if (bestValidateError - currentValidateError < validationTol) {
    +            return new GradientBoostedTreesModel(
    +              boostingStrategy.treeStrategy.algo,
    +              baseLearners.slice(0, bestM),
    +              baseLearnerWeights.slice(0, bestM))
    +          } else if (currentValidateError < bestValidateError) {
    +              bestValidateError = currentValidateError
    +              bestM = m + 1
    +          }
             }
    +        // Update data with pseudo-residuals
    +        data = predError.zip(input).map { case ((pred, _), point) =>
    +          LabeledPoint(-loss.gradient(pred, point.label), point.features)
    +        }
    +        m += 1
           }
    -      // Update data with pseudo-residuals
    -      data = predError.zip(input).map { case ((pred, _), point) =>
    -        LabeledPoint(-loss.gradient(pred, point.label), point.features)
    -      }
    -      m += 1
    -    }
     
    -    timer.stop("total")
    +      timer.stop("total")
    +
    +      logInfo("Internal timing for DecisionTree:")
    +      logInfo(s"$timer")
    +      if (validate) {
    +        new GradientBoostedTreesModel(
    +          boostingStrategy.treeStrategy.algo,
    +          baseLearners.slice(0, bestM),
    +          baseLearnerWeights.slice(0, bestM))
    +      } else {
    +        new GradientBoostedTreesModel(
    +          boostingStrategy.treeStrategy.algo, baseLearners, baseLearnerWeights)
    +      }
     
    -    logInfo("Internal timing for DecisionTree:")
    -    logInfo(s"$timer")
    -    if (validate) {
    -      new GradientBoostedTreesModel(
    -        boostingStrategy.treeStrategy.algo,
    -        baseLearners.slice(0, bestM),
    -        baseLearnerWeights.slice(0, bestM))
    -    } else {
    -      new GradientBoostedTreesModel(
    -        boostingStrategy.treeStrategy.algo, baseLearners, baseLearnerWeights)
    +    } finally {
    +       if (persistedInput) input.unpersist()
    --- End diff --
    
    If GradientBoostedTrees didn't persist it but it unpersists it that could create unexpected results for the person that persisted it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7100][MLLib] Fix persisted RDD leak in ...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5669#discussion_r29105356
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala ---
    @@ -177,102 +177,108 @@ object GradientBoostedTrees extends Logging {
         treeStrategy.assertValid()
     
         // Cache input
    -    if (input.getStorageLevel == StorageLevel.NONE) {
    +    val persistedInput = if (input.getStorageLevel == StorageLevel.NONE) {
           input.persist(StorageLevel.MEMORY_AND_DISK)
    -    }
    +      true
    +    } else false
     
    -    timer.stop("init")
    +    try {
    +      timer.stop("init")
     
    -    logDebug("##########")
    -    logDebug("Building tree 0")
    -    logDebug("##########")
    -    var data = input
    +      logDebug("##########")
    +      logDebug("Building tree 0")
    +      logDebug("##########")
    +      var data = input
     
    -    // Initialize tree
    -    timer.start("building tree 0")
    -    val firstTreeModel = new DecisionTree(treeStrategy).run(data)
    -    val firstTreeWeight = 1.0
    -    baseLearners(0) = firstTreeModel
    -    baseLearnerWeights(0) = firstTreeWeight
    +      // Initialize tree
    +      timer.start("building tree 0")
    +      val firstTreeModel = new DecisionTree(treeStrategy).run(data)
    +      val firstTreeWeight = 1.0
    +      baseLearners(0) = firstTreeModel
    +      baseLearnerWeights(0) = firstTreeWeight
     
    -    var predError: RDD[(Double, Double)] = GradientBoostedTreesModel.
    -      computeInitialPredictionAndError(input, firstTreeWeight, firstTreeModel, loss)
    -    logDebug("error of gbt = " + predError.values.mean())
    +      var predError: RDD[(Double, Double)] = GradientBoostedTreesModel.
    +        computeInitialPredictionAndError(input, firstTreeWeight, firstTreeModel, loss)
    +      logDebug("error of gbt = " + predError.values.mean())
     
    -    // Note: A model of type regression is used since we require raw prediction
    -    timer.stop("building tree 0")
    +      // Note: A model of type regression is used since we require raw prediction
    +      timer.stop("building tree 0")
     
    -    var validatePredError: RDD[(Double, Double)] = GradientBoostedTreesModel.
    -      computeInitialPredictionAndError(validationInput, firstTreeWeight, firstTreeModel, loss)
    -    var bestValidateError = if (validate) validatePredError.values.mean() else 0.0
    -    var bestM = 1
    +      var validatePredError: RDD[(Double, Double)] = GradientBoostedTreesModel.
    +        computeInitialPredictionAndError(validationInput, firstTreeWeight, firstTreeModel, loss)
    +      var bestValidateError = if (validate) validatePredError.values.mean() else 0.0
    +      var bestM = 1
     
    -    // pseudo-residual for second iteration
    -    data = predError.zip(input).map { case ((pred, _), point) =>
    -      LabeledPoint(-loss.gradient(pred, point.label), point.features)
    -    }
    +      // pseudo-residual for second iteration
    +      data = predError.zip(input).map { case ((pred, _), point) =>
    +        LabeledPoint(-loss.gradient(pred, point.label), point.features)
    +      }
     
    -    var m = 1
    -    while (m < numIterations) {
    -      timer.start(s"building tree $m")
    -      logDebug("###################################################")
    -      logDebug("Gradient boosting tree iteration " + m)
    -      logDebug("###################################################")
    -      val model = new DecisionTree(treeStrategy).run(data)
    -      timer.stop(s"building tree $m")
    -      // Create partial model
    -      baseLearners(m) = model
    -      // Note: The setting of baseLearnerWeights is incorrect for losses other than SquaredError.
    -      //       Technically, the weight should be optimized for the particular loss.
    -      //       However, the behavior should be reasonable, though not optimal.
    -      baseLearnerWeights(m) = learningRate
    -      // Note: A model of type regression is used since we require raw prediction
    -      val partialModel = new GradientBoostedTreesModel(
    -        Regression, baseLearners.slice(0, m + 1),
    -        baseLearnerWeights.slice(0, m + 1))
    +      var m = 1
    +      while (m < numIterations) {
    +        timer.start(s"building tree $m")
    +        logDebug("###################################################")
    +        logDebug("Gradient boosting tree iteration " + m)
    +        logDebug("###################################################")
    +        val model = new DecisionTree(treeStrategy).run(data)
    +        timer.stop(s"building tree $m")
    +        // Create partial model
    +        baseLearners(m) = model
    +        // Note: The setting of baseLearnerWeights is incorrect for losses other than SquaredError.
    +        //       Technically, the weight should be optimized for the particular loss.
    +        //       However, the behavior should be reasonable, though not optimal.
    +        baseLearnerWeights(m) = learningRate
    +        // Note: A model of type regression is used since we require raw prediction
    +        val partialModel = new GradientBoostedTreesModel(
    +          Regression, baseLearners.slice(0, m + 1),
    +          baseLearnerWeights.slice(0, m + 1))
     
    -      predError = GradientBoostedTreesModel.updatePredictionError(
    -        input, predError, baseLearnerWeights(m), baseLearners(m), loss)
    -      logDebug("error of gbt = " + predError.values.mean())
    +        predError = GradientBoostedTreesModel.updatePredictionError(
    +          input, predError, baseLearnerWeights(m), baseLearners(m), loss)
    +        logDebug("error of gbt = " + predError.values.mean())
     
    -      if (validate) {
    -        // Stop training early if
    -        // 1. Reduction in error is less than the validationTol or
    -        // 2. If the error increases, that is if the model is overfit.
    -        // We want the model returned corresponding to the best validation error.
    +        if (validate) {
    +          // Stop training early if
    +          // 1. Reduction in error is less than the validationTol or
    +          // 2. If the error increases, that is if the model is overfit.
    +          // We want the model returned corresponding to the best validation error.
     
    -        validatePredError = GradientBoostedTreesModel.updatePredictionError(
    -          validationInput, validatePredError, baseLearnerWeights(m), baseLearners(m), loss)
    -        val currentValidateError = validatePredError.values.mean()
    -        if (bestValidateError - currentValidateError < validationTol) {
    -          return new GradientBoostedTreesModel(
    -            boostingStrategy.treeStrategy.algo,
    -            baseLearners.slice(0, bestM),
    -            baseLearnerWeights.slice(0, bestM))
    -        } else if (currentValidateError < bestValidateError) {
    -            bestValidateError = currentValidateError
    -            bestM = m + 1
    +          validatePredError = GradientBoostedTreesModel.updatePredictionError(
    +            validationInput, validatePredError, baseLearnerWeights(m), baseLearners(m), loss)
    +          val currentValidateError = validatePredError.values.mean()
    +          if (bestValidateError - currentValidateError < validationTol) {
    +            return new GradientBoostedTreesModel(
    +              boostingStrategy.treeStrategy.algo,
    +              baseLearners.slice(0, bestM),
    +              baseLearnerWeights.slice(0, bestM))
    +          } else if (currentValidateError < bestValidateError) {
    +              bestValidateError = currentValidateError
    +              bestM = m + 1
    +          }
             }
    +        // Update data with pseudo-residuals
    +        data = predError.zip(input).map { case ((pred, _), point) =>
    +          LabeledPoint(-loss.gradient(pred, point.label), point.features)
    +        }
    +        m += 1
           }
    -      // Update data with pseudo-residuals
    -      data = predError.zip(input).map { case ((pred, _), point) =>
    -        LabeledPoint(-loss.gradient(pred, point.label), point.features)
    -      }
    -      m += 1
    -    }
     
    -    timer.stop("total")
    +      timer.stop("total")
    +
    +      logInfo("Internal timing for DecisionTree:")
    +      logInfo(s"$timer")
    +      if (validate) {
    +        new GradientBoostedTreesModel(
    +          boostingStrategy.treeStrategy.algo,
    +          baseLearners.slice(0, bestM),
    +          baseLearnerWeights.slice(0, bestM))
    +      } else {
    +        new GradientBoostedTreesModel(
    +          boostingStrategy.treeStrategy.algo, baseLearners, baseLearnerWeights)
    +      }
     
    -    logInfo("Internal timing for DecisionTree:")
    -    logInfo(s"$timer")
    -    if (validate) {
    -      new GradientBoostedTreesModel(
    -        boostingStrategy.treeStrategy.algo,
    -        baseLearners.slice(0, bestM),
    -        baseLearnerWeights.slice(0, bestM))
    -    } else {
    -      new GradientBoostedTreesModel(
    -        boostingStrategy.treeStrategy.algo, baseLearners, baseLearnerWeights)
    +    } finally {
    +       if (persistedInput) input.unpersist()
    --- End diff --
    
    @mengxr what do you think about the `try-catch` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7100][MLLib] Fix persisted RDD leak in ...

Posted by jimfcarroll <gi...@git.apache.org>.
Github user jimfcarroll commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5669#discussion_r28996632
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala ---
    @@ -177,102 +177,108 @@ object GradientBoostedTrees extends Logging {
         treeStrategy.assertValid()
     
         // Cache input
    -    if (input.getStorageLevel == StorageLevel.NONE) {
    +    val persistedInput = if (input.getStorageLevel == StorageLevel.NONE) {
           input.persist(StorageLevel.MEMORY_AND_DISK)
    -    }
    +      true
    +    } else false
     
    -    timer.stop("init")
    +    try {
    +      timer.stop("init")
     
    -    logDebug("##########")
    -    logDebug("Building tree 0")
    -    logDebug("##########")
    -    var data = input
    +      logDebug("##########")
    +      logDebug("Building tree 0")
    +      logDebug("##########")
    +      var data = input
     
    -    // Initialize tree
    -    timer.start("building tree 0")
    -    val firstTreeModel = new DecisionTree(treeStrategy).run(data)
    -    val firstTreeWeight = 1.0
    -    baseLearners(0) = firstTreeModel
    -    baseLearnerWeights(0) = firstTreeWeight
    +      // Initialize tree
    +      timer.start("building tree 0")
    +      val firstTreeModel = new DecisionTree(treeStrategy).run(data)
    +      val firstTreeWeight = 1.0
    +      baseLearners(0) = firstTreeModel
    +      baseLearnerWeights(0) = firstTreeWeight
     
    -    var predError: RDD[(Double, Double)] = GradientBoostedTreesModel.
    -      computeInitialPredictionAndError(input, firstTreeWeight, firstTreeModel, loss)
    -    logDebug("error of gbt = " + predError.values.mean())
    +      var predError: RDD[(Double, Double)] = GradientBoostedTreesModel.
    +        computeInitialPredictionAndError(input, firstTreeWeight, firstTreeModel, loss)
    +      logDebug("error of gbt = " + predError.values.mean())
     
    -    // Note: A model of type regression is used since we require raw prediction
    -    timer.stop("building tree 0")
    +      // Note: A model of type regression is used since we require raw prediction
    +      timer.stop("building tree 0")
     
    -    var validatePredError: RDD[(Double, Double)] = GradientBoostedTreesModel.
    -      computeInitialPredictionAndError(validationInput, firstTreeWeight, firstTreeModel, loss)
    -    var bestValidateError = if (validate) validatePredError.values.mean() else 0.0
    -    var bestM = 1
    +      var validatePredError: RDD[(Double, Double)] = GradientBoostedTreesModel.
    +        computeInitialPredictionAndError(validationInput, firstTreeWeight, firstTreeModel, loss)
    +      var bestValidateError = if (validate) validatePredError.values.mean() else 0.0
    +      var bestM = 1
     
    -    // pseudo-residual for second iteration
    -    data = predError.zip(input).map { case ((pred, _), point) =>
    -      LabeledPoint(-loss.gradient(pred, point.label), point.features)
    -    }
    +      // pseudo-residual for second iteration
    +      data = predError.zip(input).map { case ((pred, _), point) =>
    +        LabeledPoint(-loss.gradient(pred, point.label), point.features)
    +      }
     
    -    var m = 1
    -    while (m < numIterations) {
    -      timer.start(s"building tree $m")
    -      logDebug("###################################################")
    -      logDebug("Gradient boosting tree iteration " + m)
    -      logDebug("###################################################")
    -      val model = new DecisionTree(treeStrategy).run(data)
    -      timer.stop(s"building tree $m")
    -      // Create partial model
    -      baseLearners(m) = model
    -      // Note: The setting of baseLearnerWeights is incorrect for losses other than SquaredError.
    -      //       Technically, the weight should be optimized for the particular loss.
    -      //       However, the behavior should be reasonable, though not optimal.
    -      baseLearnerWeights(m) = learningRate
    -      // Note: A model of type regression is used since we require raw prediction
    -      val partialModel = new GradientBoostedTreesModel(
    -        Regression, baseLearners.slice(0, m + 1),
    -        baseLearnerWeights.slice(0, m + 1))
    +      var m = 1
    +      while (m < numIterations) {
    +        timer.start(s"building tree $m")
    +        logDebug("###################################################")
    +        logDebug("Gradient boosting tree iteration " + m)
    +        logDebug("###################################################")
    +        val model = new DecisionTree(treeStrategy).run(data)
    +        timer.stop(s"building tree $m")
    +        // Create partial model
    +        baseLearners(m) = model
    +        // Note: The setting of baseLearnerWeights is incorrect for losses other than SquaredError.
    +        //       Technically, the weight should be optimized for the particular loss.
    +        //       However, the behavior should be reasonable, though not optimal.
    +        baseLearnerWeights(m) = learningRate
    +        // Note: A model of type regression is used since we require raw prediction
    +        val partialModel = new GradientBoostedTreesModel(
    +          Regression, baseLearners.slice(0, m + 1),
    +          baseLearnerWeights.slice(0, m + 1))
     
    -      predError = GradientBoostedTreesModel.updatePredictionError(
    -        input, predError, baseLearnerWeights(m), baseLearners(m), loss)
    -      logDebug("error of gbt = " + predError.values.mean())
    +        predError = GradientBoostedTreesModel.updatePredictionError(
    +          input, predError, baseLearnerWeights(m), baseLearners(m), loss)
    +        logDebug("error of gbt = " + predError.values.mean())
     
    -      if (validate) {
    -        // Stop training early if
    -        // 1. Reduction in error is less than the validationTol or
    -        // 2. If the error increases, that is if the model is overfit.
    -        // We want the model returned corresponding to the best validation error.
    +        if (validate) {
    +          // Stop training early if
    +          // 1. Reduction in error is less than the validationTol or
    +          // 2. If the error increases, that is if the model is overfit.
    +          // We want the model returned corresponding to the best validation error.
     
    -        validatePredError = GradientBoostedTreesModel.updatePredictionError(
    -          validationInput, validatePredError, baseLearnerWeights(m), baseLearners(m), loss)
    -        val currentValidateError = validatePredError.values.mean()
    -        if (bestValidateError - currentValidateError < validationTol) {
    -          return new GradientBoostedTreesModel(
    -            boostingStrategy.treeStrategy.algo,
    -            baseLearners.slice(0, bestM),
    -            baseLearnerWeights.slice(0, bestM))
    -        } else if (currentValidateError < bestValidateError) {
    -            bestValidateError = currentValidateError
    -            bestM = m + 1
    +          validatePredError = GradientBoostedTreesModel.updatePredictionError(
    +            validationInput, validatePredError, baseLearnerWeights(m), baseLearners(m), loss)
    +          val currentValidateError = validatePredError.values.mean()
    +          if (bestValidateError - currentValidateError < validationTol) {
    +            return new GradientBoostedTreesModel(
    +              boostingStrategy.treeStrategy.algo,
    +              baseLearners.slice(0, bestM),
    +              baseLearnerWeights.slice(0, bestM))
    +          } else if (currentValidateError < bestValidateError) {
    +              bestValidateError = currentValidateError
    +              bestM = m + 1
    +          }
             }
    +        // Update data with pseudo-residuals
    +        data = predError.zip(input).map { case ((pred, _), point) =>
    +          LabeledPoint(-loss.gradient(pred, point.label), point.features)
    +        }
    +        m += 1
           }
    -      // Update data with pseudo-residuals
    -      data = predError.zip(input).map { case ((pred, _), point) =>
    -        LabeledPoint(-loss.gradient(pred, point.label), point.features)
    -      }
    -      m += 1
    -    }
     
    -    timer.stop("total")
    +      timer.stop("total")
    +
    +      logInfo("Internal timing for DecisionTree:")
    +      logInfo(s"$timer")
    +      if (validate) {
    +        new GradientBoostedTreesModel(
    +          boostingStrategy.treeStrategy.algo,
    +          baseLearners.slice(0, bestM),
    +          baseLearnerWeights.slice(0, bestM))
    +      } else {
    +        new GradientBoostedTreesModel(
    +          boostingStrategy.treeStrategy.algo, baseLearners, baseLearnerWeights)
    +      }
     
    -    logInfo("Internal timing for DecisionTree:")
    -    logInfo(s"$timer")
    -    if (validate) {
    -      new GradientBoostedTreesModel(
    -        boostingStrategy.treeStrategy.algo,
    -        baseLearners.slice(0, bestM),
    -        baseLearnerWeights.slice(0, bestM))
    -    } else {
    -      new GradientBoostedTreesModel(
    -        boostingStrategy.treeStrategy.algo, baseLearners, baseLearnerWeights)
    +    } finally {
    +       if (persistedInput) input.unpersist()
    --- End diff --
    
    IOW, it's guaranteed to be persisted when it gets here (given the 'if' above the try block). Its just possible someone else did it before calling boost. In that case I don't think we want to unpersist it on them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7100][MLLib] Fix persisted RDD leak in ...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5669#discussion_r28996818
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala ---
    @@ -177,102 +177,108 @@ object GradientBoostedTrees extends Logging {
         treeStrategy.assertValid()
     
         // Cache input
    -    if (input.getStorageLevel == StorageLevel.NONE) {
    +    val persistedInput = if (input.getStorageLevel == StorageLevel.NONE) {
           input.persist(StorageLevel.MEMORY_AND_DISK)
    -    }
    +      true
    +    } else false
     
    -    timer.stop("init")
    +    try {
    +      timer.stop("init")
     
    -    logDebug("##########")
    -    logDebug("Building tree 0")
    -    logDebug("##########")
    -    var data = input
    +      logDebug("##########")
    +      logDebug("Building tree 0")
    +      logDebug("##########")
    +      var data = input
     
    -    // Initialize tree
    -    timer.start("building tree 0")
    -    val firstTreeModel = new DecisionTree(treeStrategy).run(data)
    -    val firstTreeWeight = 1.0
    -    baseLearners(0) = firstTreeModel
    -    baseLearnerWeights(0) = firstTreeWeight
    +      // Initialize tree
    +      timer.start("building tree 0")
    +      val firstTreeModel = new DecisionTree(treeStrategy).run(data)
    +      val firstTreeWeight = 1.0
    +      baseLearners(0) = firstTreeModel
    +      baseLearnerWeights(0) = firstTreeWeight
     
    -    var predError: RDD[(Double, Double)] = GradientBoostedTreesModel.
    -      computeInitialPredictionAndError(input, firstTreeWeight, firstTreeModel, loss)
    -    logDebug("error of gbt = " + predError.values.mean())
    +      var predError: RDD[(Double, Double)] = GradientBoostedTreesModel.
    +        computeInitialPredictionAndError(input, firstTreeWeight, firstTreeModel, loss)
    +      logDebug("error of gbt = " + predError.values.mean())
     
    -    // Note: A model of type regression is used since we require raw prediction
    -    timer.stop("building tree 0")
    +      // Note: A model of type regression is used since we require raw prediction
    +      timer.stop("building tree 0")
     
    -    var validatePredError: RDD[(Double, Double)] = GradientBoostedTreesModel.
    -      computeInitialPredictionAndError(validationInput, firstTreeWeight, firstTreeModel, loss)
    -    var bestValidateError = if (validate) validatePredError.values.mean() else 0.0
    -    var bestM = 1
    +      var validatePredError: RDD[(Double, Double)] = GradientBoostedTreesModel.
    +        computeInitialPredictionAndError(validationInput, firstTreeWeight, firstTreeModel, loss)
    +      var bestValidateError = if (validate) validatePredError.values.mean() else 0.0
    +      var bestM = 1
     
    -    // pseudo-residual for second iteration
    -    data = predError.zip(input).map { case ((pred, _), point) =>
    -      LabeledPoint(-loss.gradient(pred, point.label), point.features)
    -    }
    +      // pseudo-residual for second iteration
    +      data = predError.zip(input).map { case ((pred, _), point) =>
    +        LabeledPoint(-loss.gradient(pred, point.label), point.features)
    +      }
     
    -    var m = 1
    -    while (m < numIterations) {
    -      timer.start(s"building tree $m")
    -      logDebug("###################################################")
    -      logDebug("Gradient boosting tree iteration " + m)
    -      logDebug("###################################################")
    -      val model = new DecisionTree(treeStrategy).run(data)
    -      timer.stop(s"building tree $m")
    -      // Create partial model
    -      baseLearners(m) = model
    -      // Note: The setting of baseLearnerWeights is incorrect for losses other than SquaredError.
    -      //       Technically, the weight should be optimized for the particular loss.
    -      //       However, the behavior should be reasonable, though not optimal.
    -      baseLearnerWeights(m) = learningRate
    -      // Note: A model of type regression is used since we require raw prediction
    -      val partialModel = new GradientBoostedTreesModel(
    -        Regression, baseLearners.slice(0, m + 1),
    -        baseLearnerWeights.slice(0, m + 1))
    +      var m = 1
    +      while (m < numIterations) {
    +        timer.start(s"building tree $m")
    +        logDebug("###################################################")
    +        logDebug("Gradient boosting tree iteration " + m)
    +        logDebug("###################################################")
    +        val model = new DecisionTree(treeStrategy).run(data)
    +        timer.stop(s"building tree $m")
    +        // Create partial model
    +        baseLearners(m) = model
    +        // Note: The setting of baseLearnerWeights is incorrect for losses other than SquaredError.
    +        //       Technically, the weight should be optimized for the particular loss.
    +        //       However, the behavior should be reasonable, though not optimal.
    +        baseLearnerWeights(m) = learningRate
    +        // Note: A model of type regression is used since we require raw prediction
    +        val partialModel = new GradientBoostedTreesModel(
    +          Regression, baseLearners.slice(0, m + 1),
    +          baseLearnerWeights.slice(0, m + 1))
     
    -      predError = GradientBoostedTreesModel.updatePredictionError(
    -        input, predError, baseLearnerWeights(m), baseLearners(m), loss)
    -      logDebug("error of gbt = " + predError.values.mean())
    +        predError = GradientBoostedTreesModel.updatePredictionError(
    +          input, predError, baseLearnerWeights(m), baseLearners(m), loss)
    +        logDebug("error of gbt = " + predError.values.mean())
     
    -      if (validate) {
    -        // Stop training early if
    -        // 1. Reduction in error is less than the validationTol or
    -        // 2. If the error increases, that is if the model is overfit.
    -        // We want the model returned corresponding to the best validation error.
    +        if (validate) {
    +          // Stop training early if
    +          // 1. Reduction in error is less than the validationTol or
    +          // 2. If the error increases, that is if the model is overfit.
    +          // We want the model returned corresponding to the best validation error.
     
    -        validatePredError = GradientBoostedTreesModel.updatePredictionError(
    -          validationInput, validatePredError, baseLearnerWeights(m), baseLearners(m), loss)
    -        val currentValidateError = validatePredError.values.mean()
    -        if (bestValidateError - currentValidateError < validationTol) {
    -          return new GradientBoostedTreesModel(
    -            boostingStrategy.treeStrategy.algo,
    -            baseLearners.slice(0, bestM),
    -            baseLearnerWeights.slice(0, bestM))
    -        } else if (currentValidateError < bestValidateError) {
    -            bestValidateError = currentValidateError
    -            bestM = m + 1
    +          validatePredError = GradientBoostedTreesModel.updatePredictionError(
    +            validationInput, validatePredError, baseLearnerWeights(m), baseLearners(m), loss)
    +          val currentValidateError = validatePredError.values.mean()
    +          if (bestValidateError - currentValidateError < validationTol) {
    +            return new GradientBoostedTreesModel(
    +              boostingStrategy.treeStrategy.algo,
    +              baseLearners.slice(0, bestM),
    +              baseLearnerWeights.slice(0, bestM))
    +          } else if (currentValidateError < bestValidateError) {
    +              bestValidateError = currentValidateError
    +              bestM = m + 1
    +          }
             }
    +        // Update data with pseudo-residuals
    +        data = predError.zip(input).map { case ((pred, _), point) =>
    +          LabeledPoint(-loss.gradient(pred, point.label), point.features)
    +        }
    +        m += 1
           }
    -      // Update data with pseudo-residuals
    -      data = predError.zip(input).map { case ((pred, _), point) =>
    -        LabeledPoint(-loss.gradient(pred, point.label), point.features)
    -      }
    -      m += 1
    -    }
     
    -    timer.stop("total")
    +      timer.stop("total")
    +
    +      logInfo("Internal timing for DecisionTree:")
    +      logInfo(s"$timer")
    +      if (validate) {
    +        new GradientBoostedTreesModel(
    +          boostingStrategy.treeStrategy.algo,
    +          baseLearners.slice(0, bestM),
    +          baseLearnerWeights.slice(0, bestM))
    +      } else {
    +        new GradientBoostedTreesModel(
    +          boostingStrategy.treeStrategy.algo, baseLearners, baseLearnerWeights)
    +      }
     
    -    logInfo("Internal timing for DecisionTree:")
    -    logInfo(s"$timer")
    -    if (validate) {
    -      new GradientBoostedTreesModel(
    -        boostingStrategy.treeStrategy.algo,
    -        baseLearners.slice(0, bestM),
    -        baseLearnerWeights.slice(0, bestM))
    -    } else {
    -      new GradientBoostedTreesModel(
    -        boostingStrategy.treeStrategy.algo, baseLearners, baseLearnerWeights)
    +    } finally {
    +       if (persistedInput) input.unpersist()
    --- End diff --
    
    Good point. The approach is robust, which is better than in most of the code. I'm not afraid to merge it but just seeking opinions on whether we need to be concerned with this enough compared to the small inconsistency it creates with the rest of the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7100][MLLib] Fix persisted RDD leak in ...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/5669#issuecomment-96842749
  
    OK thank you.  Sometimes, a slight improvement can be outweighed by the extra complexity.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7100][MLLib] Fix persisted RDD leak in ...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/5669#issuecomment-97033634
  
    This test actually passed, but Jenkins failed to post. LGTM.
    
    data: {"body": "  [Test build #723 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/723/consoleFull) for   PR 5669 at commit [`45f4b03`](https://github.com/apache/spark/commit/45f4b03c4bc8f5e2fa06c84cb83ae5db97e6abec).\n * This patch **passes all tests**.\n * This patch merges cleanly.\n * This patch adds no public classes.\nYour branch is ahead of 'origin/master' by 295 commits.
     * This patch does not change any dependencies."}


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7100][MLLib] Fix persisted RDD leak in ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5669#issuecomment-96295778
  
      [Test build #704 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/704/consoleFull) for   PR 5669 at commit [`e5be57c`](https://github.com/apache/spark/commit/e5be57c925f371edb96edbe7385cf62ebd9d575f).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7100][MLLib] Fix persisted RDD leak in ...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5669#discussion_r28995552
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala ---
    @@ -177,102 +177,108 @@ object GradientBoostedTrees extends Logging {
         treeStrategy.assertValid()
     
         // Cache input
    -    if (input.getStorageLevel == StorageLevel.NONE) {
    +    val persistedInput = if (input.getStorageLevel == StorageLevel.NONE) {
           input.persist(StorageLevel.MEMORY_AND_DISK)
    -    }
    +      true
    +    } else false
     
    -    timer.stop("init")
    +    try {
    +      timer.stop("init")
     
    -    logDebug("##########")
    -    logDebug("Building tree 0")
    -    logDebug("##########")
    -    var data = input
    +      logDebug("##########")
    +      logDebug("Building tree 0")
    +      logDebug("##########")
    +      var data = input
     
    -    // Initialize tree
    -    timer.start("building tree 0")
    -    val firstTreeModel = new DecisionTree(treeStrategy).run(data)
    -    val firstTreeWeight = 1.0
    -    baseLearners(0) = firstTreeModel
    -    baseLearnerWeights(0) = firstTreeWeight
    +      // Initialize tree
    +      timer.start("building tree 0")
    +      val firstTreeModel = new DecisionTree(treeStrategy).run(data)
    +      val firstTreeWeight = 1.0
    +      baseLearners(0) = firstTreeModel
    +      baseLearnerWeights(0) = firstTreeWeight
     
    -    var predError: RDD[(Double, Double)] = GradientBoostedTreesModel.
    -      computeInitialPredictionAndError(input, firstTreeWeight, firstTreeModel, loss)
    -    logDebug("error of gbt = " + predError.values.mean())
    +      var predError: RDD[(Double, Double)] = GradientBoostedTreesModel.
    +        computeInitialPredictionAndError(input, firstTreeWeight, firstTreeModel, loss)
    +      logDebug("error of gbt = " + predError.values.mean())
     
    -    // Note: A model of type regression is used since we require raw prediction
    -    timer.stop("building tree 0")
    +      // Note: A model of type regression is used since we require raw prediction
    +      timer.stop("building tree 0")
     
    -    var validatePredError: RDD[(Double, Double)] = GradientBoostedTreesModel.
    -      computeInitialPredictionAndError(validationInput, firstTreeWeight, firstTreeModel, loss)
    -    var bestValidateError = if (validate) validatePredError.values.mean() else 0.0
    -    var bestM = 1
    +      var validatePredError: RDD[(Double, Double)] = GradientBoostedTreesModel.
    +        computeInitialPredictionAndError(validationInput, firstTreeWeight, firstTreeModel, loss)
    +      var bestValidateError = if (validate) validatePredError.values.mean() else 0.0
    +      var bestM = 1
     
    -    // pseudo-residual for second iteration
    -    data = predError.zip(input).map { case ((pred, _), point) =>
    -      LabeledPoint(-loss.gradient(pred, point.label), point.features)
    -    }
    +      // pseudo-residual for second iteration
    +      data = predError.zip(input).map { case ((pred, _), point) =>
    +        LabeledPoint(-loss.gradient(pred, point.label), point.features)
    +      }
     
    -    var m = 1
    -    while (m < numIterations) {
    -      timer.start(s"building tree $m")
    -      logDebug("###################################################")
    -      logDebug("Gradient boosting tree iteration " + m)
    -      logDebug("###################################################")
    -      val model = new DecisionTree(treeStrategy).run(data)
    -      timer.stop(s"building tree $m")
    -      // Create partial model
    -      baseLearners(m) = model
    -      // Note: The setting of baseLearnerWeights is incorrect for losses other than SquaredError.
    -      //       Technically, the weight should be optimized for the particular loss.
    -      //       However, the behavior should be reasonable, though not optimal.
    -      baseLearnerWeights(m) = learningRate
    -      // Note: A model of type regression is used since we require raw prediction
    -      val partialModel = new GradientBoostedTreesModel(
    -        Regression, baseLearners.slice(0, m + 1),
    -        baseLearnerWeights.slice(0, m + 1))
    +      var m = 1
    +      while (m < numIterations) {
    +        timer.start(s"building tree $m")
    +        logDebug("###################################################")
    +        logDebug("Gradient boosting tree iteration " + m)
    +        logDebug("###################################################")
    +        val model = new DecisionTree(treeStrategy).run(data)
    +        timer.stop(s"building tree $m")
    +        // Create partial model
    +        baseLearners(m) = model
    +        // Note: The setting of baseLearnerWeights is incorrect for losses other than SquaredError.
    +        //       Technically, the weight should be optimized for the particular loss.
    +        //       However, the behavior should be reasonable, though not optimal.
    +        baseLearnerWeights(m) = learningRate
    +        // Note: A model of type regression is used since we require raw prediction
    +        val partialModel = new GradientBoostedTreesModel(
    +          Regression, baseLearners.slice(0, m + 1),
    +          baseLearnerWeights.slice(0, m + 1))
     
    -      predError = GradientBoostedTreesModel.updatePredictionError(
    -        input, predError, baseLearnerWeights(m), baseLearners(m), loss)
    -      logDebug("error of gbt = " + predError.values.mean())
    +        predError = GradientBoostedTreesModel.updatePredictionError(
    +          input, predError, baseLearnerWeights(m), baseLearners(m), loss)
    +        logDebug("error of gbt = " + predError.values.mean())
     
    -      if (validate) {
    -        // Stop training early if
    -        // 1. Reduction in error is less than the validationTol or
    -        // 2. If the error increases, that is if the model is overfit.
    -        // We want the model returned corresponding to the best validation error.
    +        if (validate) {
    +          // Stop training early if
    +          // 1. Reduction in error is less than the validationTol or
    +          // 2. If the error increases, that is if the model is overfit.
    +          // We want the model returned corresponding to the best validation error.
     
    -        validatePredError = GradientBoostedTreesModel.updatePredictionError(
    -          validationInput, validatePredError, baseLearnerWeights(m), baseLearners(m), loss)
    -        val currentValidateError = validatePredError.values.mean()
    -        if (bestValidateError - currentValidateError < validationTol) {
    -          return new GradientBoostedTreesModel(
    -            boostingStrategy.treeStrategy.algo,
    -            baseLearners.slice(0, bestM),
    -            baseLearnerWeights.slice(0, bestM))
    -        } else if (currentValidateError < bestValidateError) {
    -            bestValidateError = currentValidateError
    -            bestM = m + 1
    +          validatePredError = GradientBoostedTreesModel.updatePredictionError(
    +            validationInput, validatePredError, baseLearnerWeights(m), baseLearners(m), loss)
    +          val currentValidateError = validatePredError.values.mean()
    +          if (bestValidateError - currentValidateError < validationTol) {
    +            return new GradientBoostedTreesModel(
    +              boostingStrategy.treeStrategy.algo,
    +              baseLearners.slice(0, bestM),
    +              baseLearnerWeights.slice(0, bestM))
    +          } else if (currentValidateError < bestValidateError) {
    +              bestValidateError = currentValidateError
    +              bestM = m + 1
    +          }
             }
    +        // Update data with pseudo-residuals
    +        data = predError.zip(input).map { case ((pred, _), point) =>
    +          LabeledPoint(-loss.gradient(pred, point.label), point.features)
    +        }
    +        m += 1
           }
    -      // Update data with pseudo-residuals
    -      data = predError.zip(input).map { case ((pred, _), point) =>
    -        LabeledPoint(-loss.gradient(pred, point.label), point.features)
    -      }
    -      m += 1
    -    }
     
    -    timer.stop("total")
    +      timer.stop("total")
    +
    +      logInfo("Internal timing for DecisionTree:")
    +      logInfo(s"$timer")
    +      if (validate) {
    +        new GradientBoostedTreesModel(
    +          boostingStrategy.treeStrategy.algo,
    +          baseLearners.slice(0, bestM),
    +          baseLearnerWeights.slice(0, bestM))
    +      } else {
    +        new GradientBoostedTreesModel(
    +          boostingStrategy.treeStrategy.algo, baseLearners, baseLearnerWeights)
    +      }
     
    -    logInfo("Internal timing for DecisionTree:")
    -    logInfo(s"$timer")
    -    if (validate) {
    -      new GradientBoostedTreesModel(
    -        boostingStrategy.treeStrategy.algo,
    -        baseLearners.slice(0, bestM),
    -        baseLearnerWeights.slice(0, bestM))
    -    } else {
    -      new GradientBoostedTreesModel(
    -        boostingStrategy.treeStrategy.algo, baseLearners, baseLearnerWeights)
    +    } finally {
    +       if (persistedInput) input.unpersist()
    --- End diff --
    
    I don't think it hurts to unpersist an RDD that wasn't persisted, so this doesn't have to be tracked explicitly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7100][MLLib] Fix persisted RDD leak in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/5669#issuecomment-95691172
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7100][MLLib] Fix persisted RDD leak in ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/5669#issuecomment-96766695
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7100][MLLib] Fix persisted RDD leak in ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/5669


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-7100][MLLib] Fix persisted RDD leak in ...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5669#discussion_r28995411
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala ---
    @@ -177,102 +177,108 @@ object GradientBoostedTrees extends Logging {
         treeStrategy.assertValid()
     
         // Cache input
    -    if (input.getStorageLevel == StorageLevel.NONE) {
    +    val persistedInput = if (input.getStorageLevel == StorageLevel.NONE) {
           input.persist(StorageLevel.MEMORY_AND_DISK)
    -    }
    +      true
    +    } else false
     
    -    timer.stop("init")
    +    try {
    --- End diff --
    
    @Jkbradley it's an interesting question -- it makes sense to ensure the RDD is unpersisted with `try-finally` but I don't know if any other code does it. I think the assumption has been that the app will soon die anyway if something unexpected is going wrong like an exception, so RDD cleanup isn't that important. Should we stick to that imperfect reasoning here and keep the code simpler?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org