You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dong Wang (Jira)" <ji...@apache.org> on 2019/11/10 22:12:00 UTC

[jira] [Updated] (SPARK-29827) Wrong persist strategy in mllib.clustering.BisectingKMeans.run

     [ https://issues.apache.org/jira/browse/SPARK-29827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dong Wang updated SPARK-29827:
------------------------------
    Description: 
There are three persist misuses in mllib.clustering.BisectingKMeans.run.

First, the rdd {{ _input _}} should be persisted, because it is used by action first and actions in the following code.
Second, rdd assignments should be persisted. It is used in summarize() more than once, which has an action on assignment.
Third, persisting rdd norms is unnecessary, because it is just a mediate rdd. Since its child rdd assignments is persisted, it is unnecessary to be persisted.

{code:scala}
  private[spark] def run(
      input: RDD[Vector],
      instr: Option[Instrumentation]): BisectingKMeansModel = {
    if (input.getStorageLevel == StorageLevel.NONE) {
      logWarning(s"The input RDD ${input.id} is not directly cached, which may hurt performance if"
        + " its parent RDDs are also not cached.")
    }
    // Needs to persist input
    val d = input.map(_.size).first() 
    logInfo(s"Feature dimension: $d.")
    val dMeasure: DistanceMeasure = DistanceMeasure.decodeFromString(this.distanceMeasure)
    // Compute and cache vector norms for fast distance computation.
    val norms = input.map(v => Vectors.norm(v, 2.0)).persist(StorageLevel.MEMORY_AND_DISK)  // Unnecessary persist
    val vectors = input.zip(norms).map { case (x, norm) => new VectorWithNorm(x, norm) }
    var assignments = vectors.map(v => (ROOT_INDEX, v))  // Needs to persist
    var activeClusters = summarize(d, assignments, dMeasure)
{code}

This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses.

  was:
There are three persist misuses in mllib.clustering.BisectingKMeans.run.
First, rdd input should be persisted, because it is used by action first and actions in the following code.
Second, rdd assignments should be persisted. It is used in summarize() more than once, which has an action on assignment.
Third, persisting rdd norms is unnecessary, because it is just a mediate rdd. Since its child rdd assignments is persisted, it is unnecessary to be persisted.

{code:scala}
  private[spark] def run(
      input: RDD[Vector],
      instr: Option[Instrumentation]): BisectingKMeansModel = {
    if (input.getStorageLevel == StorageLevel.NONE) {
      logWarning(s"The input RDD ${input.id} is not directly cached, which may hurt performance if"
        + " its parent RDDs are also not cached.")
    }
    // Needs to persist input
    val d = input.map(_.size).first() 
    logInfo(s"Feature dimension: $d.")
    val dMeasure: DistanceMeasure = DistanceMeasure.decodeFromString(this.distanceMeasure)
    // Compute and cache vector norms for fast distance computation.
    val norms = input.map(v => Vectors.norm(v, 2.0)).persist(StorageLevel.MEMORY_AND_DISK)  // Unnecessary persist
    val vectors = input.zip(norms).map { case (x, norm) => new VectorWithNorm(x, norm) }
    var assignments = vectors.map(v => (ROOT_INDEX, v))  // Needs to persist
    var activeClusters = summarize(d, assignments, dMeasure)
{code}

This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses.


> Wrong persist strategy in mllib.clustering.BisectingKMeans.run
> --------------------------------------------------------------
>
>                 Key: SPARK-29827
>                 URL: https://issues.apache.org/jira/browse/SPARK-29827
>             Project: Spark
>          Issue Type: Bug
>          Components: MLlib
>    Affects Versions: 2.4.3
>            Reporter: Dong Wang
>            Priority: Major
>
> There are three persist misuses in mllib.clustering.BisectingKMeans.run.
> First, the rdd {{ _input _}} should be persisted, because it is used by action first and actions in the following code.
> Second, rdd assignments should be persisted. It is used in summarize() more than once, which has an action on assignment.
> Third, persisting rdd norms is unnecessary, because it is just a mediate rdd. Since its child rdd assignments is persisted, it is unnecessary to be persisted.
> {code:scala}
>   private[spark] def run(
>       input: RDD[Vector],
>       instr: Option[Instrumentation]): BisectingKMeansModel = {
>     if (input.getStorageLevel == StorageLevel.NONE) {
>       logWarning(s"The input RDD ${input.id} is not directly cached, which may hurt performance if"
>         + " its parent RDDs are also not cached.")
>     }
>     // Needs to persist input
>     val d = input.map(_.size).first() 
>     logInfo(s"Feature dimension: $d.")
>     val dMeasure: DistanceMeasure = DistanceMeasure.decodeFromString(this.distanceMeasure)
>     // Compute and cache vector norms for fast distance computation.
>     val norms = input.map(v => Vectors.norm(v, 2.0)).persist(StorageLevel.MEMORY_AND_DISK)  // Unnecessary persist
>     val vectors = input.zip(norms).map { case (x, norm) => new VectorWithNorm(x, norm) }
>     var assignments = vectors.map(v => (ROOT_INDEX, v))  // Needs to persist
>     var activeClusters = summarize(d, assignments, dMeasure)
> {code}
> This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org