You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by AnthonyTruchet <gi...@git.apache.org> on 2016/07/21 09:19:44 UTC

[GitHub] spark pull request #14299: Ensure broadcasted variables are destroyed even i...

GitHub user AnthonyTruchet opened a pull request:

    https://github.com/apache/spark/pull/14299

    Ensure broadcasted variables are destroyed even in case of exception

    ## What changes were proposed in this pull request?
    
    Ensure broadcasted variable are destroyed even in case of exception
    
    ## How was this patch tested?
    
    Word2VecSuite was run locally
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/criteo-forks/spark SPARK-16440

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/14299.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #14299
    
----
commit 4ad38360290d59fdf25a009bc65823553cea9b10
Author: Anthony Truchet <a....@criteo.com>
Date:   2016-07-08T12:54:24Z

    [SPARK-16440][MLlib] Destroy broadcasted variables even on driver
    
    This contribution is on done on behalf of Criteo, according to the
    terms of the Apache license.

commit 53911e04f82f58ee936edededea1d8e72bcb4ea8
Author: Anthony Truchet <a....@criteo.com>
Date:   2016-07-19T17:42:26Z

    [SPARK-16440][MLlib] Destroy broadcasted variables in a try finally
    
    This contribution is on done on behalf of Criteo, according to the
    terms of the Apache license.

commit 568e4915f6b1c3cd30c1b9796764f543e27f91fc
Author: Anthony Truchet <a....@criteo.com>
Date:   2016-07-21T08:45:44Z

    Merge remote-tracking branch 'apache/master' into SPARK-16440

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by AnthonyTruchet <gi...@git.apache.org>.
Github user AnthonyTruchet commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    I'm fixing this (issue compiling Spark locally delay me). The whole point is that they are *not* destroyed within the method in case of exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    Oh, these broadcasts are already destroyed actually, inside the method. This isn't needed then. I kinda thought we had taken care of most or all of these already.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    ok to test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14299: [SPARK-16440][MLlib] Ensure broadcasted variables...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14299#discussion_r103420891
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala ---
    @@ -314,6 +315,20 @@ class Word2Vec extends Serializable with Logging {
         val expTable = sc.broadcast(createExpTable())
         val bcVocab = sc.broadcast(vocab)
         val bcVocabHash = sc.broadcast(vocabHash)
    +      do_fit(dataset, sc, expTable, bcVocab, bcVocabHash)
    --- End diff --
    
    Are we missing a try? The formatting should be
    
    ```
    try {
      ...
    } finally {
      ...
    }
    ```
    
    Also use camelCase rather than underscore_naming


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    Yes, but they are still cleaned up (eventually). It can only happen one place. Unless the exception path is common I don't know if it's worth changing this everywhere, because for consistency it would really apply everywhere, and we declined to do that before.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    Merged to master


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14299: Ensure broadcasted variables are destroyed even i...

Posted by AnthonyTruchet <gi...@git.apache.org>.
Github user AnthonyTruchet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14299#discussion_r71723751
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala ---
    @@ -313,133 +313,139 @@ class Word2Vec extends Serializable with Logging {
         val expTable = sc.broadcast(createExpTable())
         val bcVocab = sc.broadcast(vocab)
         val bcVocabHash = sc.broadcast(vocabHash)
    -    // each partition is a collection of sentences,
    -    // will be translated into arrays of Index integer
    -    val sentences: RDD[Array[Int]] = dataset.mapPartitions { sentenceIter =>
    -      // Each sentence will map to 0 or more Array[Int]
    -      sentenceIter.flatMap { sentence =>
    -        // Sentence of words, some of which map to a word index
    -        val wordIndexes = sentence.flatMap(bcVocabHash.value.get)
    -        // break wordIndexes into trunks of maxSentenceLength when has more
    -        wordIndexes.grouped(maxSentenceLength).map(_.toArray)
    +
    +    try {
    +      // each partition is a collection of sentences,
    +      // will be translated into arrays of Index integer
    +      val sentences: RDD[Array[Int]] = dataset.mapPartitions { sentenceIter =>
    +        // Each sentence will map to 0 or more Array[Int]
    +        sentenceIter.flatMap { sentence =>
    +          // Sentence of words, some of which map to a word index
    +          val wordIndexes = sentence.flatMap(bcVocabHash.value.get)
    +          // break wordIndexes into trunks of maxSentenceLength when has more
    +          wordIndexes.grouped(maxSentenceLength).map(_.toArray)
    +        }
           }
    -    }
     
    -    val newSentences = sentences.repartition(numPartitions).cache()
    -    val initRandom = new XORShiftRandom(seed)
    +      val newSentences = sentences.repartition(numPartitions).cache()
    +      val initRandom = new XORShiftRandom(seed)
     
    -    if (vocabSize.toLong * vectorSize >= Int.MaxValue) {
    -      throw new RuntimeException("Please increase minCount or decrease vectorSize in Word2Vec" +
    -        " to avoid an OOM. You are highly recommended to make your vocabSize*vectorSize, " +
    -        "which is " + vocabSize + "*" + vectorSize + " for now, less than `Int.MaxValue`.")
    -    }
    +      if (vocabSize.toLong * vectorSize >= Int.MaxValue) {
    +        throw new RuntimeException("Please increase minCount or decrease vectorSize in Word2Vec" +
    +          " to avoid an OOM. You are highly recommended to make your vocabSize*vectorSize, " +
    +          "which is " + vocabSize + "*" + vectorSize + " for now, less than `Int.MaxValue`.")
    +      }
     
    -    val syn0Global =
    -      Array.fill[Float](vocabSize * vectorSize)((initRandom.nextFloat() - 0.5f) / vectorSize)
    -    val syn1Global = new Array[Float](vocabSize * vectorSize)
    -    var alpha = learningRate
    -
    -    for (k <- 1 to numIterations) {
    -      val bcSyn0Global = sc.broadcast(syn0Global)
    -      val bcSyn1Global = sc.broadcast(syn1Global)
    -      val partial = newSentences.mapPartitionsWithIndex { case (idx, iter) =>
    -        val random = new XORShiftRandom(seed ^ ((idx + 1) << 16) ^ ((-k - 1) << 8))
    -        val syn0Modify = new Array[Int](vocabSize)
    -        val syn1Modify = new Array[Int](vocabSize)
    -        val model = iter.foldLeft((bcSyn0Global.value, bcSyn1Global.value, 0L, 0L)) {
    -          case ((syn0, syn1, lastWordCount, wordCount), sentence) =>
    -            var lwc = lastWordCount
    -            var wc = wordCount
    -            if (wordCount - lastWordCount > 10000) {
    -              lwc = wordCount
    -              // TODO: discount by iteration?
    -              alpha =
    -                learningRate * (1 - numPartitions * wordCount.toDouble / (trainWordsCount + 1))
    -              if (alpha < learningRate * 0.0001) alpha = learningRate * 0.0001
    -              logInfo("wordCount = " + wordCount + ", alpha = " + alpha)
    -            }
    -            wc += sentence.length
    -            var pos = 0
    -            while (pos < sentence.length) {
    -              val word = sentence(pos)
    -              val b = random.nextInt(window)
    -              // Train Skip-gram
    -              var a = b
    -              while (a < window * 2 + 1 - b) {
    -                if (a != window) {
    -                  val c = pos - window + a
    -                  if (c >= 0 && c < sentence.length) {
    -                    val lastWord = sentence(c)
    -                    val l1 = lastWord * vectorSize
    -                    val neu1e = new Array[Float](vectorSize)
    -                    // Hierarchical softmax
    -                    var d = 0
    -                    while (d < bcVocab.value(word).codeLen) {
    -                      val inner = bcVocab.value(word).point(d)
    -                      val l2 = inner * vectorSize
    -                      // Propagate hidden -> output
    -                      var f = blas.sdot(vectorSize, syn0, l1, 1, syn1, l2, 1)
    -                      if (f > -MAX_EXP && f < MAX_EXP) {
    -                        val ind = ((f + MAX_EXP) * (EXP_TABLE_SIZE / MAX_EXP / 2.0)).toInt
    -                        f = expTable.value(ind)
    -                        val g = ((1 - bcVocab.value(word).code(d) - f) * alpha).toFloat
    -                        blas.saxpy(vectorSize, g, syn1, l2, 1, neu1e, 0, 1)
    -                        blas.saxpy(vectorSize, g, syn0, l1, 1, syn1, l2, 1)
    -                        syn1Modify(inner) += 1
    +      val syn0Global =
    +        Array.fill[Float](vocabSize * vectorSize)((initRandom.nextFloat() - 0.5f) / vectorSize)
    +      val syn1Global = new Array[Float](vocabSize * vectorSize)
    +      var alpha = learningRate
    +
    +      for (k <- 1 to numIterations) {
    +        val bcSyn0Global = sc.broadcast(syn0Global)
    +        val bcSyn1Global = sc.broadcast(syn1Global)
    +        val partial = newSentences.mapPartitionsWithIndex { case (idx, iter) =>
    +          val random = new XORShiftRandom(seed ^ ((idx + 1) << 16) ^ ((-k - 1) << 8))
    +          val syn0Modify = new Array[Int](vocabSize)
    +          val syn1Modify = new Array[Int](vocabSize)
    +          val model = iter.foldLeft((bcSyn0Global.value, bcSyn1Global.value, 0L, 0L)) {
    +            case ((syn0, syn1, lastWordCount, wordCount), sentence) =>
    +              var lwc = lastWordCount
    +              var wc = wordCount
    +              if (wordCount - lastWordCount > 10000) {
    +                lwc = wordCount
    +                // TODO: discount by iteration?
    +                alpha =
    +                  learningRate * (1 - numPartitions * wordCount.toDouble / (trainWordsCount + 1))
    +                if (alpha < learningRate * 0.0001) alpha = learningRate * 0.0001
    +                logInfo("wordCount = " + wordCount + ", alpha = " + alpha)
    +              }
    +              wc += sentence.length
    +              var pos = 0
    +              while (pos < sentence.length) {
    +                val word = sentence(pos)
    +                val b = random.nextInt(window)
    +                // Train Skip-gram
    +                var a = b
    +                while (a < window * 2 + 1 - b) {
    +                  if (a != window) {
    +                    val c = pos - window + a
    +                    if (c >= 0 && c < sentence.length) {
    +                      val lastWord = sentence(c)
    +                      val l1 = lastWord * vectorSize
    +                      val neu1e = new Array[Float](vectorSize)
    +                      // Hierarchical softmax
    +                      var d = 0
    +                      while (d < bcVocab.value(word).codeLen) {
    +                        val inner = bcVocab.value(word).point(d)
    +                        val l2 = inner * vectorSize
    +                        // Propagate hidden -> output
    +                        var f = blas.sdot(vectorSize, syn0, l1, 1, syn1, l2, 1)
    +                        if (f > -MAX_EXP && f < MAX_EXP) {
    +                          val ind = ((f + MAX_EXP) * (EXP_TABLE_SIZE / MAX_EXP / 2.0)).toInt
    +                          f = expTable.value(ind)
    +                          val g = ((1 - bcVocab.value(word).code(d) - f) * alpha).toFloat
    +                          blas.saxpy(vectorSize, g, syn1, l2, 1, neu1e, 0, 1)
    +                          blas.saxpy(vectorSize, g, syn0, l1, 1, syn1, l2, 1)
    +                          syn1Modify(inner) += 1
    +                        }
    +                        d += 1
                           }
    -                      d += 1
    +                      blas.saxpy(vectorSize, 1.0f, neu1e, 0, 1, syn0, l1, 1)
    +                      syn0Modify(lastWord) += 1
                         }
    -                    blas.saxpy(vectorSize, 1.0f, neu1e, 0, 1, syn0, l1, 1)
    -                    syn0Modify(lastWord) += 1
                       }
    +                  a += 1
                     }
    -                a += 1
    +                pos += 1
                   }
    -              pos += 1
    +              (syn0, syn1, lwc, wc)
    +          }
    +          val syn0Local = model._1
    +          val syn1Local = model._2
    +          // Only output modified vectors.
    +          Iterator.tabulate(vocabSize) { index =>
    +            if (syn0Modify(index) > 0) {
    +              Some((index, syn0Local.slice(index * vectorSize, (index + 1) * vectorSize)))
    +            } else {
    +              None
    +            }
    +          }.flatten ++ Iterator.tabulate(vocabSize) { index =>
    +            if (syn1Modify(index) > 0) {
    +              Some((index + vocabSize, syn1Local.slice(index * vectorSize, (index + 1) * vectorSize)))
    +            } else {
    +              None
                 }
    -            (syn0, syn1, lwc, wc)
    +          }.flatten
             }
    -        val syn0Local = model._1
    -        val syn1Local = model._2
    -        // Only output modified vectors.
    -        Iterator.tabulate(vocabSize) { index =>
    -          if (syn0Modify(index) > 0) {
    -            Some((index, syn0Local.slice(index * vectorSize, (index + 1) * vectorSize)))
    -          } else {
    -            None
    -          }
    -        }.flatten ++ Iterator.tabulate(vocabSize) { index =>
    -          if (syn1Modify(index) > 0) {
    -            Some((index + vocabSize, syn1Local.slice(index * vectorSize, (index + 1) * vectorSize)))
    -          } else {
    -            None
    -          }
    -        }.flatten
    -      }
    -      val synAgg = partial.reduceByKey { case (v1, v2) =>
    +        val synAgg = partial.reduceByKey { case (v1, v2) =>
               blas.saxpy(vectorSize, 1.0f, v2, 1, v1, 1)
               v1
    -      }.collect()
    -      var i = 0
    -      while (i < synAgg.length) {
    -        val index = synAgg(i)._1
    -        if (index < vocabSize) {
    -          Array.copy(synAgg(i)._2, 0, syn0Global, index * vectorSize, vectorSize)
    -        } else {
    -          Array.copy(synAgg(i)._2, 0, syn1Global, (index - vocabSize) * vectorSize, vectorSize)
    +        }.collect()
    +        var i = 0
    +        while (i < synAgg.length) {
    +          val index = synAgg(i)._1
    +          if (index < vocabSize) {
    +            Array.copy(synAgg(i)._2, 0, syn0Global, index * vectorSize, vectorSize)
    +          } else {
    +            Array.copy(synAgg(i)._2, 0, syn1Global, (index - vocabSize) * vectorSize, vectorSize)
    +          }
    +          i += 1
             }
    -        i += 1
    +        bcSyn0Global.unpersist(false)
    +        bcSyn1Global.unpersist(false)
           }
    -      bcSyn0Global.unpersist(false)
    -      bcSyn1Global.unpersist(false)
    -    }
    -    newSentences.unpersist()
    -    expTable.destroy()
    -    bcVocab.destroy()
    -    bcVocabHash.destroy()
    +      newSentences.unpersist()
     
    -    val wordArray = vocab.map(_.word)
    -    new Word2VecModel(wordArray.zipWithIndex.toMap, syn0Global)
    +      val wordArray = vocab.map(_.word)
    +      new Word2VecModel(wordArray.zipWithIndex.toMap, syn0Global)
    +    }
    +    finally
    --- End diff --
    
    btw. are you aare of http://jsuereth.com/scala-arm/ which might prove damn useful to ease resource management without clustering (much) the syntax. It is akin to using clauses in C# or context managers in Python.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    **[Test build #73699 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73699/testReport)** for PR 14299 at commit [`6b8ae85`](https://github.com/apache/spark/commit/6b8ae85dc362ebef0f8d416a8e35970f57130a9f).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by AnthonyTruchet <gi...@git.apache.org>.
Github user AnthonyTruchet commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    For the "normal failure case", the one we encounter is that when the vocabulary is too small the embedding learning raises. And it is pretty legitimate in our use cases to have some "too small vocabularies". 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by AnthonyTruchet <gi...@git.apache.org>.
Github user AnthonyTruchet commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    Ok, thanks. Do you  sen any missing task for it to be merged then ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    I am still not getting why the failure of this method is a normal execution path - when would it fail repeatedly?
    
    Yes the destroy should be non-blocking for consistency.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    **[Test build #73600 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73600/testReport)** for PR 14299 at commit [`e716700`](https://github.com/apache/spark/commit/e716700a5b684d6a985860ea164996471a7341ef).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73699/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by AnthonyTruchet <gi...@git.apache.org>.
Github user AnthonyTruchet commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    @vanzin The ticket was filled a long time ago, I updated the PR to make it clearer. Is any manual linking in some other forge needed ?
    
    @thunterdb I have updated the review following your suggestion, without using ARM: ready fo review.
    
    I'll check with the team working on W2V how to could contribute to  make this piece of code "follow better engineering practice" :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: Ensure broadcasted variables are destroyed even in case ...

Posted by AnthonyTruchet <gi...@git.apache.org>.
Github user AnthonyTruchet commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    @thunterdb Copy that, working on it and sorry for the acknowledge delay.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14299: [SPARK-16440][MLlib] Ensure broadcasted variables...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/14299


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    **[Test build #73699 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73699/testReport)** for PR 14299 at commit [`6b8ae85`](https://github.com/apache/spark/commit/6b8ae85dc362ebef0f8d416a8e35970f57130a9f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73770/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    I wonder how that happens and why that fixes this though. It would be the same before and after except for the blocking flag. That should be false like all the other calls ... Unless that is somehow part of the issue?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: Ensure broadcasted variables are destroyed even in case ...

Posted by AnthonyTruchet <gi...@git.apache.org>.
Github user AnthonyTruchet commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    @pzz2011 Nothing _that_ bad if your application is just learning once and saving / using the embeddings. But if you run hundreds or thousands of learning, you'll just crash because of uncontrolled memory usage... and that's bad !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    **[Test build #3588 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3588/testReport)** for PR 14299 at commit [`6b8ae85`](https://github.com/apache/spark/commit/6b8ae85dc362ebef0f8d416a8e35970f57130a9f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14299: [SPARK-16440][MLlib] Ensure broadcasted variables...

Posted by AnthonyTruchet <gi...@git.apache.org>.
Github user AnthonyTruchet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14299#discussion_r103657660
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala ---
    @@ -314,6 +315,21 @@ class Word2Vec extends Serializable with Logging {
         val expTable = sc.broadcast(createExpTable())
         val bcVocab = sc.broadcast(vocab)
         val bcVocabHash = sc.broadcast(vocabHash)
    +    try {
    +      doFit(dataset, sc, expTable, bcVocab, bcVocabHash)
    +    }
    +    finally
    --- End diff --
    
    Formatting and scala style check fixed.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by AnthonyTruchet <gi...@git.apache.org>.
Github user AnthonyTruchet commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    The blocking flag is not the issue.
    The issue is that there was not try / finally and the destruction was only done if the fitting succeded and skipped otherwise.
    
    In this PR I move the destruction into a try / finally construct, all the while extracting the actual fitting into a dedicated method to keep nesting under control.
    
    Whether the destruction should be blocking or not is pretty debatable and I do not have any strong mind on the question, I choose to make them blocking "just to be sure" this is actually performed. If you'd rather keep the non blocking best effort current behaviour I'll perform the change. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: Ensure broadcasted variables are destroyed even in case ...

Posted by pzz2011 <gi...@git.apache.org>.
Github user pzz2011 commented on the issue:

    https://github.com/apache/spark/pull/14299
  
     @AnthonyTruchet \u3000If not Ensure broadcasted variables destoryed, what will happend?(this is my person confusion:)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: Ensure broadcasted variables are destroyed even in case ...

Posted by thunterdb <gi...@git.apache.org>.
Github user thunterdb commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    @AnthonyTruchet thank you for the PR. This is definitely worth fixing for large deployments. Now, as you noticed, this portion of code does not quite abide by the best engineering practices... Instead of adding an extra layer of nesting, would you mind make the following changes?
    
    ```scala
      def fit[S <: Iterable[String]](dataset: RDD[S]): Word2VecModel = {
        ...
        val expTable = sc.broadcast(createExpTable())
        val bcVocab = sc.broadcast(vocab)
        val bcVocabHash = sc.broadcast(vocabHash)
        try { fit0(expTable, ...) } finally {
          ...
        }
       }
       private final def fit0(...) { 
         // Put all the content here.
         // Note that the inner code also includes some broadcasts, you may want to fix these as well if you can
       }
    ```
    
    I personally agree about resource management and scala-arm. We try to keep scala dependencies to a minimum, unfortunately, because they can be very tedious to move from one scala version to another.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    **[Test build #73600 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73600/testReport)** for PR 14299 at commit [`e716700`](https://github.com/apache/spark/commit/e716700a5b684d6a985860ea164996471a7341ef).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73685/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14299: [SPARK-16440][MLlib] Ensure broadcasted variables...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14299#discussion_r103526867
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala ---
    @@ -314,6 +315,21 @@ class Word2Vec extends Serializable with Logging {
         val expTable = sc.broadcast(createExpTable())
         val bcVocab = sc.broadcast(vocab)
         val bcVocabHash = sc.broadcast(vocabHash)
    +    try {
    +      doFit(dataset, sc, expTable, bcVocab, bcVocabHash)
    +    }
    +    finally
    --- End diff --
    
    Put this on one line but otherwise seems reasonable. The broadcast is unused after fitting. Are there more instances like this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    **[Test build #73770 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73770/testReport)** for PR 14299 at commit [`65a238d`](https://github.com/apache/spark/commit/65a238d1cd09c96b41337d3d8f5162fca9faf2f4).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    **[Test build #73685 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73685/testReport)** for PR 14299 at commit [`9a0675f`](https://github.com/apache/spark/commit/9a0675f4fbf34d2095710628fb5ca3adf98f7f5b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: Ensure broadcasted variables are destroyed even in case ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: Ensure broadcasted variables are destroyed even in case ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    You also should file a bug and reference it from the PR title.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    **[Test build #3588 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3588/testReport)** for PR 14299 at commit [`6b8ae85`](https://github.com/apache/spark/commit/6b8ae85dc362ebef0f8d416a8e35970f57130a9f).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14299: Ensure broadcasted variables are destroyed even i...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14299#discussion_r71703375
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala ---
    @@ -313,133 +313,139 @@ class Word2Vec extends Serializable with Logging {
         val expTable = sc.broadcast(createExpTable())
         val bcVocab = sc.broadcast(vocab)
         val bcVocabHash = sc.broadcast(vocabHash)
    -    // each partition is a collection of sentences,
    -    // will be translated into arrays of Index integer
    -    val sentences: RDD[Array[Int]] = dataset.mapPartitions { sentenceIter =>
    -      // Each sentence will map to 0 or more Array[Int]
    -      sentenceIter.flatMap { sentence =>
    -        // Sentence of words, some of which map to a word index
    -        val wordIndexes = sentence.flatMap(bcVocabHash.value.get)
    -        // break wordIndexes into trunks of maxSentenceLength when has more
    -        wordIndexes.grouped(maxSentenceLength).map(_.toArray)
    +
    +    try {
    +      // each partition is a collection of sentences,
    +      // will be translated into arrays of Index integer
    +      val sentences: RDD[Array[Int]] = dataset.mapPartitions { sentenceIter =>
    +        // Each sentence will map to 0 or more Array[Int]
    +        sentenceIter.flatMap { sentence =>
    +          // Sentence of words, some of which map to a word index
    +          val wordIndexes = sentence.flatMap(bcVocabHash.value.get)
    +          // break wordIndexes into trunks of maxSentenceLength when has more
    +          wordIndexes.grouped(maxSentenceLength).map(_.toArray)
    +        }
           }
    -    }
     
    -    val newSentences = sentences.repartition(numPartitions).cache()
    -    val initRandom = new XORShiftRandom(seed)
    +      val newSentences = sentences.repartition(numPartitions).cache()
    +      val initRandom = new XORShiftRandom(seed)
     
    -    if (vocabSize.toLong * vectorSize >= Int.MaxValue) {
    -      throw new RuntimeException("Please increase minCount or decrease vectorSize in Word2Vec" +
    -        " to avoid an OOM. You are highly recommended to make your vocabSize*vectorSize, " +
    -        "which is " + vocabSize + "*" + vectorSize + " for now, less than `Int.MaxValue`.")
    -    }
    +      if (vocabSize.toLong * vectorSize >= Int.MaxValue) {
    +        throw new RuntimeException("Please increase minCount or decrease vectorSize in Word2Vec" +
    +          " to avoid an OOM. You are highly recommended to make your vocabSize*vectorSize, " +
    +          "which is " + vocabSize + "*" + vectorSize + " for now, less than `Int.MaxValue`.")
    +      }
     
    -    val syn0Global =
    -      Array.fill[Float](vocabSize * vectorSize)((initRandom.nextFloat() - 0.5f) / vectorSize)
    -    val syn1Global = new Array[Float](vocabSize * vectorSize)
    -    var alpha = learningRate
    -
    -    for (k <- 1 to numIterations) {
    -      val bcSyn0Global = sc.broadcast(syn0Global)
    -      val bcSyn1Global = sc.broadcast(syn1Global)
    -      val partial = newSentences.mapPartitionsWithIndex { case (idx, iter) =>
    -        val random = new XORShiftRandom(seed ^ ((idx + 1) << 16) ^ ((-k - 1) << 8))
    -        val syn0Modify = new Array[Int](vocabSize)
    -        val syn1Modify = new Array[Int](vocabSize)
    -        val model = iter.foldLeft((bcSyn0Global.value, bcSyn1Global.value, 0L, 0L)) {
    -          case ((syn0, syn1, lastWordCount, wordCount), sentence) =>
    -            var lwc = lastWordCount
    -            var wc = wordCount
    -            if (wordCount - lastWordCount > 10000) {
    -              lwc = wordCount
    -              // TODO: discount by iteration?
    -              alpha =
    -                learningRate * (1 - numPartitions * wordCount.toDouble / (trainWordsCount + 1))
    -              if (alpha < learningRate * 0.0001) alpha = learningRate * 0.0001
    -              logInfo("wordCount = " + wordCount + ", alpha = " + alpha)
    -            }
    -            wc += sentence.length
    -            var pos = 0
    -            while (pos < sentence.length) {
    -              val word = sentence(pos)
    -              val b = random.nextInt(window)
    -              // Train Skip-gram
    -              var a = b
    -              while (a < window * 2 + 1 - b) {
    -                if (a != window) {
    -                  val c = pos - window + a
    -                  if (c >= 0 && c < sentence.length) {
    -                    val lastWord = sentence(c)
    -                    val l1 = lastWord * vectorSize
    -                    val neu1e = new Array[Float](vectorSize)
    -                    // Hierarchical softmax
    -                    var d = 0
    -                    while (d < bcVocab.value(word).codeLen) {
    -                      val inner = bcVocab.value(word).point(d)
    -                      val l2 = inner * vectorSize
    -                      // Propagate hidden -> output
    -                      var f = blas.sdot(vectorSize, syn0, l1, 1, syn1, l2, 1)
    -                      if (f > -MAX_EXP && f < MAX_EXP) {
    -                        val ind = ((f + MAX_EXP) * (EXP_TABLE_SIZE / MAX_EXP / 2.0)).toInt
    -                        f = expTable.value(ind)
    -                        val g = ((1 - bcVocab.value(word).code(d) - f) * alpha).toFloat
    -                        blas.saxpy(vectorSize, g, syn1, l2, 1, neu1e, 0, 1)
    -                        blas.saxpy(vectorSize, g, syn0, l1, 1, syn1, l2, 1)
    -                        syn1Modify(inner) += 1
    +      val syn0Global =
    +        Array.fill[Float](vocabSize * vectorSize)((initRandom.nextFloat() - 0.5f) / vectorSize)
    +      val syn1Global = new Array[Float](vocabSize * vectorSize)
    +      var alpha = learningRate
    +
    +      for (k <- 1 to numIterations) {
    +        val bcSyn0Global = sc.broadcast(syn0Global)
    +        val bcSyn1Global = sc.broadcast(syn1Global)
    +        val partial = newSentences.mapPartitionsWithIndex { case (idx, iter) =>
    +          val random = new XORShiftRandom(seed ^ ((idx + 1) << 16) ^ ((-k - 1) << 8))
    +          val syn0Modify = new Array[Int](vocabSize)
    +          val syn1Modify = new Array[Int](vocabSize)
    +          val model = iter.foldLeft((bcSyn0Global.value, bcSyn1Global.value, 0L, 0L)) {
    +            case ((syn0, syn1, lastWordCount, wordCount), sentence) =>
    +              var lwc = lastWordCount
    +              var wc = wordCount
    +              if (wordCount - lastWordCount > 10000) {
    +                lwc = wordCount
    +                // TODO: discount by iteration?
    +                alpha =
    +                  learningRate * (1 - numPartitions * wordCount.toDouble / (trainWordsCount + 1))
    +                if (alpha < learningRate * 0.0001) alpha = learningRate * 0.0001
    +                logInfo("wordCount = " + wordCount + ", alpha = " + alpha)
    +              }
    +              wc += sentence.length
    +              var pos = 0
    +              while (pos < sentence.length) {
    +                val word = sentence(pos)
    +                val b = random.nextInt(window)
    +                // Train Skip-gram
    +                var a = b
    +                while (a < window * 2 + 1 - b) {
    +                  if (a != window) {
    +                    val c = pos - window + a
    +                    if (c >= 0 && c < sentence.length) {
    +                      val lastWord = sentence(c)
    +                      val l1 = lastWord * vectorSize
    +                      val neu1e = new Array[Float](vectorSize)
    +                      // Hierarchical softmax
    +                      var d = 0
    +                      while (d < bcVocab.value(word).codeLen) {
    +                        val inner = bcVocab.value(word).point(d)
    +                        val l2 = inner * vectorSize
    +                        // Propagate hidden -> output
    +                        var f = blas.sdot(vectorSize, syn0, l1, 1, syn1, l2, 1)
    +                        if (f > -MAX_EXP && f < MAX_EXP) {
    +                          val ind = ((f + MAX_EXP) * (EXP_TABLE_SIZE / MAX_EXP / 2.0)).toInt
    +                          f = expTable.value(ind)
    +                          val g = ((1 - bcVocab.value(word).code(d) - f) * alpha).toFloat
    +                          blas.saxpy(vectorSize, g, syn1, l2, 1, neu1e, 0, 1)
    +                          blas.saxpy(vectorSize, g, syn0, l1, 1, syn1, l2, 1)
    +                          syn1Modify(inner) += 1
    +                        }
    +                        d += 1
                           }
    -                      d += 1
    +                      blas.saxpy(vectorSize, 1.0f, neu1e, 0, 1, syn0, l1, 1)
    +                      syn0Modify(lastWord) += 1
                         }
    -                    blas.saxpy(vectorSize, 1.0f, neu1e, 0, 1, syn0, l1, 1)
    -                    syn0Modify(lastWord) += 1
                       }
    +                  a += 1
                     }
    -                a += 1
    +                pos += 1
                   }
    -              pos += 1
    +              (syn0, syn1, lwc, wc)
    +          }
    +          val syn0Local = model._1
    +          val syn1Local = model._2
    +          // Only output modified vectors.
    +          Iterator.tabulate(vocabSize) { index =>
    +            if (syn0Modify(index) > 0) {
    +              Some((index, syn0Local.slice(index * vectorSize, (index + 1) * vectorSize)))
    +            } else {
    +              None
    +            }
    +          }.flatten ++ Iterator.tabulate(vocabSize) { index =>
    +            if (syn1Modify(index) > 0) {
    +              Some((index + vocabSize, syn1Local.slice(index * vectorSize, (index + 1) * vectorSize)))
    +            } else {
    +              None
                 }
    -            (syn0, syn1, lwc, wc)
    +          }.flatten
             }
    -        val syn0Local = model._1
    -        val syn1Local = model._2
    -        // Only output modified vectors.
    -        Iterator.tabulate(vocabSize) { index =>
    -          if (syn0Modify(index) > 0) {
    -            Some((index, syn0Local.slice(index * vectorSize, (index + 1) * vectorSize)))
    -          } else {
    -            None
    -          }
    -        }.flatten ++ Iterator.tabulate(vocabSize) { index =>
    -          if (syn1Modify(index) > 0) {
    -            Some((index + vocabSize, syn1Local.slice(index * vectorSize, (index + 1) * vectorSize)))
    -          } else {
    -            None
    -          }
    -        }.flatten
    -      }
    -      val synAgg = partial.reduceByKey { case (v1, v2) =>
    +        val synAgg = partial.reduceByKey { case (v1, v2) =>
               blas.saxpy(vectorSize, 1.0f, v2, 1, v1, 1)
               v1
    -      }.collect()
    -      var i = 0
    -      while (i < synAgg.length) {
    -        val index = synAgg(i)._1
    -        if (index < vocabSize) {
    -          Array.copy(synAgg(i)._2, 0, syn0Global, index * vectorSize, vectorSize)
    -        } else {
    -          Array.copy(synAgg(i)._2, 0, syn1Global, (index - vocabSize) * vectorSize, vectorSize)
    +        }.collect()
    +        var i = 0
    +        while (i < synAgg.length) {
    +          val index = synAgg(i)._1
    +          if (index < vocabSize) {
    +            Array.copy(synAgg(i)._2, 0, syn0Global, index * vectorSize, vectorSize)
    +          } else {
    +            Array.copy(synAgg(i)._2, 0, syn1Global, (index - vocabSize) * vectorSize, vectorSize)
    +          }
    +          i += 1
             }
    -        i += 1
    +        bcSyn0Global.unpersist(false)
    +        bcSyn1Global.unpersist(false)
           }
    -      bcSyn0Global.unpersist(false)
    -      bcSyn1Global.unpersist(false)
    -    }
    -    newSentences.unpersist()
    -    expTable.destroy()
    -    bcVocab.destroy()
    -    bcVocabHash.destroy()
    +      newSentences.unpersist()
     
    -    val wordArray = vocab.map(_.word)
    -    new Word2VecModel(wordArray.zipWithIndex.toMap, syn0Global)
    +      val wordArray = vocab.map(_.word)
    +      new Word2VecModel(wordArray.zipWithIndex.toMap, syn0Global)
    +    }
    +    finally
    --- End diff --
    
    Not sure this will pass the style checker. My only hesitation is that really if we do this one place we should do it in 100 places, and the exception path here is not a usual one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    **[Test build #73685 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73685/testReport)** for PR 14299 at commit [`9a0675f`](https://github.com/apache/spark/commit/9a0675f4fbf34d2095710628fb5ca3adf98f7f5b).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73600/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    OK, if this happens normally enough that you would accumulate a lot of broadcasts that aren't going out of scope and hogging resources, I can see special-casing this with try-finally. I'm still surprised it's so common so quickly but am OK with it -- worst case it just doesn't hurt consistency much to worry about.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14299: [SPARK-16440][MLlib] Ensure broadcasted variables...

Posted by AnthonyTruchet <gi...@git.apache.org>.
Github user AnthonyTruchet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14299#discussion_r103656554
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala ---
    @@ -314,6 +315,21 @@ class Word2Vec extends Serializable with Logging {
         val expTable = sc.broadcast(createExpTable())
         val bcVocab = sc.broadcast(vocab)
         val bcVocabHash = sc.broadcast(vocabHash)
    +    try {
    +      doFit(dataset, sc, expTable, bcVocab, bcVocabHash)
    +    }
    +    finally
    --- End diff --
    
    I already spotted and corrected a few instances of forgotten broadcasted variables. I've already reported and patched all I was aware off.
    
    I was suggesting introducing an ARM pattern and making broadcasted variable ARM aware (see comment above). This is by no way a general solution but it might help reduce the issue when the use of a broadcasted variable is compatibble with a syntactic scope. @thunterdb legitimately raised concern about introducing new dependencies (to scala-arm specifically). What is your insight wrt to this ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    **[Test build #73770 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73770/testReport)** for PR 14299 at commit [`65a238d`](https://github.com/apache/spark/commit/65a238d1cd09c96b41337d3d8f5162fca9faf2f4).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by AnthonyTruchet <gi...@git.apache.org>.
Github user AnthonyTruchet commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    Actually we *did* observe memory leak (severe enough to lead to application failure) due to this when doing thousands of Word2Vec learning in one Spark application, a few of them failing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14299: [SPARK-16440][MLlib] Ensure broadcasted variables are de...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14299
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14299: [SPARK-16440][MLlib] Ensure broadcasted variables...

Posted by AnthonyTruchet <gi...@git.apache.org>.
Github user AnthonyTruchet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14299#discussion_r103421239
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala ---
    @@ -314,6 +315,20 @@ class Word2Vec extends Serializable with Logging {
         val expTable = sc.broadcast(createExpTable())
         val bcVocab = sc.broadcast(vocab)
         val bcVocabHash = sc.broadcast(vocabHash)
    +      do_fit(dataset, sc, expTable, bcVocab, bcVocabHash)
    --- End diff --
    
    my bad, local rebase issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org