You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean R. Owen (Jira)" <ji...@apache.org> on 2019/11/13 16:47:00 UTC

[jira] [Resolved] (SPARK-29872) Improper cache strategy in examples

     [ https://issues.apache.org/jira/browse/SPARK-29872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean R. Owen resolved SPARK-29872.
----------------------------------
    Resolution: Won't Fix

> Improper cache strategy in examples
> -----------------------------------
>
>                 Key: SPARK-29872
>                 URL: https://issues.apache.org/jira/browse/SPARK-29872
>             Project: Spark
>          Issue Type: Improvement
>          Components: Examples
>    Affects Versions: 3.0.0
>            Reporter: Dong Wang
>            Priority: Minor
>
> 1. Improper cache in examples.SparkTC
> The RDD edges should be cached because it is used multiple times in while loop. And it should be unpersisted before the last action tc.count(), because tc has been persisted.
> On the other hand, many tc objects is cached in while loop but never uncached, which will waste memory.
> {code:scala}
>     val edges = tc.map(x => (x._2, x._1)) // Edges should be cached
>     // This join is iterated until a fixed point is reached.
>     var oldCount = 0L
>     var nextCount = tc.count()
>     do { 
>       oldCount = nextCount
>       // Perform the join, obtaining an RDD of (y, (z, x)) pairs,
>       // then project the result to obtain the new (x, z) paths.
>       tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct().cache()
>       nextCount = tc.count()
>     } while (nextCount != oldCount)
>     println(s"TC has ${tc.count()} edges.")
> {code}
> 2. Cache needed in examples.ml.LogisticRegressionSummary
> The DataFrame fMeasure should be cached.
> {code:scala}
>     // Set the model threshold to maximize F-Measure
>     val fMeasure = trainingSummary.fMeasureByThreshold // fMeasures should be cached
>     val maxFMeasure = fMeasure.select(max("F-Measure")).head().getDouble(0)
>     val bestThreshold = fMeasure.where($"F-Measure" === maxFMeasure)
>       .select("threshold").head().getDouble(0)
>     lrModel.setThreshold(bestThreshold)
> {code}
> 3. Cache needed in examples.sql.SparkSQLExample
> {code:scala}
>     val peopleDF = spark.sparkContext
>       .textFile("examples/src/main/resources/people.txt")
>       .map(_.split(","))
>       .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) // This RDD should be cahced
>       .toDF()
>     // Register the DataFrame as a temporary view
>     peopleDF.createOrReplaceTempView("people")
>     val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
>     teenagersDF.map(teenager => "Name: " + teenager(0)).show()
>     teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
>     implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
>     teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
> {code}
> This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org