You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "zhengruifeng (Jira)" <ji...@apache.org> on 2020/08/21 03:31:00 UTC
[jira] [Updated] (SPARK-32676) Fix double caching in
KMeans/BiKMeans
[ https://issues.apache.org/jira/browse/SPARK-32676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
zhengruifeng updated SPARK-32676:
---------------------------------
Description:
In the .mllib side, if the storageLevel of input {{data}} is always ignored and cached twice:
{code:java}
@Since("0.8.0")
def run(data: RDD[Vector]): KMeansModel = {
val instances = data.map(point => (point, 1.0))
runWithWeight(instances, None)
}
{code}
{code:java}
private[spark] def runWithWeight(
data: RDD[(Vector, Double)],
instr: Option[Instrumentation]): KMeansModel = {
// Compute squared norms and cache them.
val norms = data.map { case (v, _) =>
Vectors.norm(v, 2.0)
}
val zippedData = data.zip(norms).map { case ((v, w), norm) =>
new VectorWithNorm(v, norm, w)
}
if (data.getStorageLevel == StorageLevel.NONE) {
zippedData.persist(StorageLevel.MEMORY_AND_DISK)
}
val model = runAlgorithmWithWeight(zippedData, instr)
zippedData.unpersist()
model
} {code}
was:
In the .mllib side, if the storageLevel of input {{data}} is always ignored and cached twice:
{code:java}
@Since("0.8.0")
def run(data: RDD[Vector]): KMeansModel = {
val instances = data.map(point => (point, 1.0))
runWithWeight(instances, None)
}{code}
> Fix double caching in KMeans/BiKMeans
> -------------------------------------
>
> Key: SPARK-32676
> URL: https://issues.apache.org/jira/browse/SPARK-32676
> Project: Spark
> Issue Type: Improvement
> Components: ML
> Affects Versions: 3.0.0, 3.1.0
> Reporter: zhengruifeng
> Priority: Major
>
> In the .mllib side, if the storageLevel of input {{data}} is always ignored and cached twice:
> {code:java}
> @Since("0.8.0")
> def run(data: RDD[Vector]): KMeansModel = {
> val instances = data.map(point => (point, 1.0))
> runWithWeight(instances, None)
> }
> {code}
> {code:java}
> private[spark] def runWithWeight(
> data: RDD[(Vector, Double)],
> instr: Option[Instrumentation]): KMeansModel = {
> // Compute squared norms and cache them.
> val norms = data.map { case (v, _) =>
> Vectors.norm(v, 2.0)
> }
> val zippedData = data.zip(norms).map { case ((v, w), norm) =>
> new VectorWithNorm(v, norm, w)
> }
> if (data.getStorageLevel == StorageLevel.NONE) {
> zippedData.persist(StorageLevel.MEMORY_AND_DISK)
> }
> val model = runAlgorithmWithWeight(zippedData, instr)
> zippedData.unpersist()
> model
> } {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org