You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean R. Owen (Jira)" <ji...@apache.org> on 2019/11/13 16:47:00 UTC
[jira] [Resolved] (SPARK-29872) Improper cache strategy in examples
[ https://issues.apache.org/jira/browse/SPARK-29872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean R. Owen resolved SPARK-29872.
----------------------------------
Resolution: Won't Fix
> Improper cache strategy in examples
> -----------------------------------
>
> Key: SPARK-29872
> URL: https://issues.apache.org/jira/browse/SPARK-29872
> Project: Spark
> Issue Type: Improvement
> Components: Examples
> Affects Versions: 3.0.0
> Reporter: Dong Wang
> Priority: Minor
>
> 1. Improper cache in examples.SparkTC
> The RDD edges should be cached because it is used multiple times in while loop. And it should be unpersisted before the last action tc.count(), because tc has been persisted.
> On the other hand, many tc objects is cached in while loop but never uncached, which will waste memory.
> {code:scala}
> val edges = tc.map(x => (x._2, x._1)) // Edges should be cached
> // This join is iterated until a fixed point is reached.
> var oldCount = 0L
> var nextCount = tc.count()
> do {
> oldCount = nextCount
> // Perform the join, obtaining an RDD of (y, (z, x)) pairs,
> // then project the result to obtain the new (x, z) paths.
> tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct().cache()
> nextCount = tc.count()
> } while (nextCount != oldCount)
> println(s"TC has ${tc.count()} edges.")
> {code}
> 2. Cache needed in examples.ml.LogisticRegressionSummary
> The DataFrame fMeasure should be cached.
> {code:scala}
> // Set the model threshold to maximize F-Measure
> val fMeasure = trainingSummary.fMeasureByThreshold // fMeasures should be cached
> val maxFMeasure = fMeasure.select(max("F-Measure")).head().getDouble(0)
> val bestThreshold = fMeasure.where($"F-Measure" === maxFMeasure)
> .select("threshold").head().getDouble(0)
> lrModel.setThreshold(bestThreshold)
> {code}
> 3. Cache needed in examples.sql.SparkSQLExample
> {code:scala}
> val peopleDF = spark.sparkContext
> .textFile("examples/src/main/resources/people.txt")
> .map(_.split(","))
> .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) // This RDD should be cahced
> .toDF()
> // Register the DataFrame as a temporary view
> peopleDF.createOrReplaceTempView("people")
> val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
> teenagersDF.map(teenager => "Name: " + teenager(0)).show()
> teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
> implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
> teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
> {code}
> This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org