You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dong Wang (Jira)" <ji...@apache.org> on 2019/11/10 13:35:00 UTC

[jira] [Created] (SPARK-29827) Wrong persist strategy in mllib.clustering.BisectingKMeans.run

Dong Wang created SPARK-29827:
---------------------------------

             Summary: Wrong persist strategy in mllib.clustering.BisectingKMeans.run
                 Key: SPARK-29827
                 URL: https://issues.apache.org/jira/browse/SPARK-29827
             Project: Spark
          Issue Type: Bug
          Components: MLlib
    Affects Versions: 2.4.3
            Reporter: Dong Wang


There are three persist misuses in mllib.clustering.BisectingKMeans.run.
First, rdd input should be persisted, because it is used by action first and actions in the following code.
Second, rdd assignments should be persisted. It is used in summarize() more than once, which has an action on assignment.
Third, persisting rdd norms is unnecessary, because it is just a mediate rdd. Since its child rdd assignments is persisted, it is unnecessary to be persisted.

{code:scala}
  private[spark] def run(
      input: RDD[Vector],
      instr: Option[Instrumentation]): BisectingKMeansModel = {
    if (input.getStorageLevel == StorageLevel.NONE) {
      logWarning(s"The input RDD ${input.id} is not directly cached, which may hurt performance if"
        + " its parent RDDs are also not cached.")
    }
    // Needs to persist input
    val d = input.map(_.size).first() 
    logInfo(s"Feature dimension: $d.")
    val dMeasure: DistanceMeasure = DistanceMeasure.decodeFromString(this.distanceMeasure)
    // Compute and cache vector norms for fast distance computation.
    val norms = input.map(v => Vectors.norm(v, 2.0)).persist(StorageLevel.MEMORY_AND_DISK)  // Unnecessary persist
    val vectors = input.zip(norms).map { case (x, norm) => new VectorWithNorm(x, norm) }
    var assignments = vectors.map(v => (ROOT_INDEX, v))  // Needs to persist
    var activeClusters = summarize(d, assignments, dMeasure)
{code}

This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org