You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Xiaoye Sun (JIRA)" <ji...@apache.org> on 2016/12/06 19:19:58 UTC

[jira] [Comment Edited] (SPARK-18731) Task size in K-means is so large

    [ https://issues.apache.org/jira/browse/SPARK-18731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15726277#comment-15726277 ] 

Xiaoye Sun edited comment on SPARK-18731 at 12/6/16 7:19 PM:
-------------------------------------------------------------

Hi Sean,

Here is a part of the output I collected at the Driver. I found broadcast related things in the log. Do you think the centroids are included in the task closure?

2016/12/06 11:31:28.101 DEBUG ClosureCleaner: +++ Cleaning closure <function2> (org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$31) +++
2016/12/06 11:31:28.102 DEBUG ClosureCleaner:  + declared fields: 1
2016/12/06 11:31:28.102 DEBUG ClosureCleaner:      public static final long org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$31.serialVersionUID
2016/12/06 11:31:28.102 DEBUG ClosureCleaner:  + declared methods: 2
2016/12/06 11:31:28.102 DEBUG ClosureCleaner:      public final java.lang.Object org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$31.apply(java.lang.Object,java.lang.Object)
2016/12/06 11:31:28.102 DEBUG ClosureCleaner:      public final scala.collection.Iterator org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$31.apply(scala.collection.Iterator,scala.collection.Iterator)
2016/12/06 11:31:28.102 DEBUG ClosureCleaner:  + inner classes: 1
2016/12/06 11:31:28.102 DEBUG ClosureCleaner:      org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$31$$anon$1
2016/12/06 11:31:28.102 DEBUG ClosureCleaner:  + outer classes: 0
2016/12/06 11:31:28.102 DEBUG ClosureCleaner:  + outer objects: 0
2016/12/06 11:31:28.103 DEBUG ClosureCleaner:  + populating accessed fields because this is the starting closure
2016/12/06 11:31:28.104 DEBUG ClosureCleaner:  + fields accessed by starting closure: 0
2016/12/06 11:31:28.104 DEBUG ClosureCleaner:  + there are no enclosing objects!
2016/12/06 11:31:28.104 DEBUG ClosureCleaner:  +++ closure <function2> (org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$31) is now cleaned +++
2016/12/06 11:31:28.104 DEBUG ClosureCleaner: +++ Cleaning closure <function1> (org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$1) +++
2016/12/06 11:31:28.105 DEBUG ClosureCleaner:  + declared fields: 3
2016/12/06 11:31:28.105 DEBUG ClosureCleaner:      public static final long org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$1.serialVersionUID
2016/12/06 11:31:28.105 DEBUG ClosureCleaner:      private final org.apache.spark.mllib.clustering.KMeans org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$1.$outer
2016/12/06 11:31:28.105 DEBUG ClosureCleaner:      public final org.apache.spark.broadcast.Broadcast org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$1.bcNewCenters$1
2016/12/06 11:31:28.105 DEBUG ClosureCleaner:  + declared methods: 2
2016/12/06 11:31:28.105 DEBUG ClosureCleaner:      public final java.lang.Object org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$1.apply(java.lang.Object)
2016/12/06 11:31:28.105 DEBUG ClosureCleaner:      public final double[] org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$1.apply(scala.Tuple2)
2016/12/06 11:31:28.105 DEBUG ClosureCleaner:  + inner classes: 1
2016/12/06 11:31:28.105 DEBUG ClosureCleaner:      org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$1$$anonfun$apply$3
2016/12/06 11:31:28.105 DEBUG ClosureCleaner:  + outer classes: 1
2016/12/06 11:31:28.105 DEBUG ClosureCleaner:      org.apache.spark.mllib.clustering.KMeans
2016/12/06 11:31:28.105 DEBUG ClosureCleaner:  + outer objects: 1
2016/12/06 11:31:28.105 DEBUG ClosureCleaner:      org.apache.spark.mllib.clustering.KMeans@41711125
2016/12/06 11:31:28.105 DEBUG ClosureCleaner:  + populating accessed fields because this is the starting closure
2016/12/06 11:31:28.106 DEBUG ClosureCleaner:  + fields accessed by starting closure: 1
2016/12/06 11:31:28.106 DEBUG ClosureCleaner:      (class org.apache.spark.mllib.clustering.KMeans,Set(org$apache$spark$mllib$clustering$KMeans$$runs))
2016/12/06 11:31:28.106 DEBUG ClosureCleaner:  + outermost object is not a closure, so do not clone it: (class org.apache.spark.mllib.clustering.KMeans,org.apache.spark.mllib.clustering.KMeans@41711125)
2016/12/06 11:31:28.106 DEBUG ClosureCleaner:  +++ closure <function1> (org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$1) is now cleaned +++
2016/12/06 11:31:28.107 DEBUG ClosureCleaner: +++ Cleaning closure <function2> (org.apache.spark.mllib.clustering.KMeans$$anonfun$20) +++
2016/12/06 11:31:28.107 DEBUG ClosureCleaner:  + declared fields: 2
2016/12/06 11:31:28.107 DEBUG ClosureCleaner:      public static final long org.apache.spark.mllib.clustering.KMeans$$anonfun$20.serialVersionUID
2016/12/06 11:31:28.107 DEBUG ClosureCleaner:      private final org.apache.spark.mllib.clustering.KMeans org.apache.spark.mllib.clustering.KMeans$$anonfun$20.$outer
2016/12/06 11:31:28.107 DEBUG ClosureCleaner:  + declared methods: 2
2016/12/06 11:31:28.107 DEBUG ClosureCleaner:      public final java.lang.Object org.apache.spark.mllib.clustering.KMeans$$anonfun$20.apply(java.lang.Object,java.lang.Object)
2016/12/06 11:31:28.107 DEBUG ClosureCleaner:      public final double[] org.apache.spark.mllib.clustering.KMeans$$anonfun$20.apply(double[],double[])
2016/12/06 11:31:28.107 DEBUG ClosureCleaner:  + inner classes: 0
2016/12/06 11:31:28.107 DEBUG ClosureCleaner:  + outer classes: 1
2016/12/06 11:31:28.107 DEBUG ClosureCleaner:      org.apache.spark.mllib.clustering.KMeans
2016/12/06 11:31:28.107 DEBUG ClosureCleaner:  + outer objects: 1
2016/12/06 11:31:28.107 DEBUG ClosureCleaner:      org.apache.spark.mllib.clustering.KMeans@41711125
2016/12/06 11:31:28.107 DEBUG ClosureCleaner:  + populating accessed fields because this is the starting closure
2016/12/06 11:31:28.108 DEBUG ClosureCleaner:  + fields accessed by starting closure: 1
2016/12/06 11:31:28.108 DEBUG ClosureCleaner:      (class org.apache.spark.mllib.clustering.KMeans,Set(org$apache$spark$mllib$clustering$KMeans$$runs))
2016/12/06 11:31:28.108 DEBUG ClosureCleaner:  + outermost object is not a closure, so do not clone it: (class org.apache.spark.mllib.clustering.KMeans,org.apache.spark.mllib.clustering.KMeans@41711125)
2016/12/06 11:31:28.108 DEBUG ClosureCleaner:  +++ closure <function2> (org.apache.spark.mllib.clustering.KMeans$$anonfun$20) is now cleaned +++
2016/12/06 11:31:28.108 DEBUG ClosureCleaner: +++ Cleaning closure <function2> (org.apache.spark.mllib.clustering.KMeans$$anonfun$21) +++
2016/12/06 11:31:28.108 DEBUG ClosureCleaner:  + declared fields: 2
2016/12/06 11:31:28.109 DEBUG ClosureCleaner:      public static final long org.apache.spark.mllib.clustering.KMeans$$anonfun$21.serialVersionUID
2016/12/06 11:31:28.109 DEBUG ClosureCleaner:      private final org.apache.spark.mllib.clustering.KMeans org.apache.spark.mllib.clustering.KMeans$$anonfun$21.$outer
2016/12/06 11:31:28.109 DEBUG ClosureCleaner:  + declared methods: 2
2016/12/06 11:31:28.109 DEBUG ClosureCleaner:      public final java.lang.Object org.apache.spark.mllib.clustering.KMeans$$anonfun$21.apply(java.lang.Object,java.lang.Object)
2016/12/06 11:31:28.109 DEBUG ClosureCleaner:      public final double[] org.apache.spark.mllib.clustering.KMeans$$anonfun$21.apply(double[],double[])
2016/12/06 11:31:28.109 DEBUG ClosureCleaner:  + inner classes: 0
2016/12/06 11:31:28.109 DEBUG ClosureCleaner:  + outer classes: 1
2016/12/06 11:31:28.109 DEBUG ClosureCleaner:      org.apache.spark.mllib.clustering.KMeans
2016/12/06 11:31:28.109 DEBUG ClosureCleaner:  + outer objects: 1
2016/12/06 11:31:28.109 DEBUG ClosureCleaner:      org.apache.spark.mllib.clustering.KMeans@41711125
2016/12/06 11:31:28.109 DEBUG ClosureCleaner:  + populating accessed fields because this is the starting closure
2016/12/06 11:31:28.110 DEBUG ClosureCleaner:  + fields accessed by starting closure: 1
2016/12/06 11:31:28.110 DEBUG ClosureCleaner:      (class org.apache.spark.mllib.clustering.KMeans,Set(org$apache$spark$mllib$clustering$KMeans$$runs))
2016/12/06 11:31:28.110 DEBUG losureCleaner:  + outermost object is not a closure, so do not clone it: (class org.apache.spark.mllib.clustering.KMeans,org.apache.spark.mllib.clustering.KMeans@41711125)
2016/12/06 11:31:28.110 DEBUG ClosureCleaner:  +++ closure <function2> (org.apache.spark.mllib.clustering.KMeans$$anonfun$21) is now cleaned +++
2016/12/06 11:31:28.111 DEBUG ClosureCleaner: +++ Cleaning closure <function2> (org.apache.spark.SparkContext$$anonfun$36) +++
2016/12/06 11:31:28.111 DEBUG ClosureCleaner:  + declared fields: 2
2016/12/06 11:31:28.111 DEBUG ClosureCleaner:      public static final long org.apache.spark.SparkContext$$anonfun$36.serialVersionUID
2016/12/06 11:31:28.111 DEBUG ClosureCleaner:      private final scala.Function1 org.apache.spark.SparkContext$$anonfun$36.processPartition$1
2016/12/06 11:31:28.111 DEBUG ClosureCleaner:  + declared methods: 2
2016/12/06 11:31:28.112 DEBUG ClosureCleaner:      public final java.lang.Object org.apache.spark.SparkContext$$anonfun$36.apply(java.lang.Object,java.lang.Object)
2016/12/06 11:31:28.112 DEBUG ClosureCleaner:      public final java.lang.Object org.apache.spark.SparkContext$$anonfun$36.apply(org.apache.spark.TaskContext,scala.collection.Iterator)
2016/12/06 11:31:28.112 DEBUG ClosureCleaner:  + inner classes: 0
2016/12/06 11:31:28.112 DEBUG ClosureCleaner:  + outer classes: 0
2016/12/06 11:31:28.112 DEBUG ClosureCleaner:  + outer objects: 0
2016/12/06 11:31:28.112 DEBUG ClosureCleaner:  + populating accessed fields because this is the starting closure
2016/12/06 11:31:28.112 DEBUG ClosureCleaner:  + fields accessed by starting closure: 0
2016/12/06 11:31:28.112 DEBUG ClosureCleaner:  + there are no enclosing objects!
2016/12/06 11:31:28.112 DEBUG ClosureCleaner:  +++ closure <function2> (org.apache.spark.SparkContext$$anonfun$36) is now cleaned +++


was (Author: sunxiaoye0116):
Hi Sean,

Here is a part of the output I collected at the Driver. I found broadcast related things in the log. Do you think the centroids are included in the task closure?

2016/12/06 11:31:28.101 DEBUG ClosureCleaner: +++ Cleaning closure <function2> (org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$31) +++
2016/12/06 11:31:28.102 DEBUG ClosureCleaner:  + declared fields: 1
2016/12/06 11:31:28.102 DEBUG ClosureCleaner:      public static final long org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$31.serialVersionUID
2016/12/06 11:31:28.102 DEBUG ClosureCleaner:  + declared methods: 2
2016/12/06 11:31:28.102 DEBUG ClosureCleaner:      public final java.lang.Object org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$31.apply(java.lang.Object,java.lang.Object)
2016/12/06 11:31:28.102 DEBUG ClosureCleaner:      public final scala.collection.Iterator org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$31.apply(scala.collection.Iterator,scala.collection.Iterator)
2016/12/06 11:31:28.102 DEBUG ClosureCleaner:  + inner classes: 1
2016/12/06 11:31:28.102 DEBUG ClosureCleaner:      org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$31$$anon$1
2016/12/06 11:31:28.102 DEBUG ClosureCleaner:  + outer classes: 0
2016/12/06 11:31:28.102 DEBUG ClosureCleaner:  + outer objects: 0
2016/12/06 11:31:28.103 DEBUG ClosureCleaner:  + populating accessed fields because this is the starting closure
2016/12/06 11:31:28.104 DEBUG ClosureCleaner:  + fields accessed by starting closure: 0
2016/12/06 11:31:28.104 DEBUG ClosureCleaner:  + there are no enclosing objects!
2016/12/06 11:31:28.104 DEBUG ClosureCleaner:  +++ closure <function2> (org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$31) is now cleaned +++
2016/12/06 11:31:28.104 DEBUG ClosureCleaner: +++ Cleaning closure <function1> (org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$1) +++
2016/12/06 11:31:28.105 DEBUG ClosureCleaner:  + declared fields: 3
2016/12/06 11:31:28.105 DEBUG ClosureCleaner:      public static final long org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$1.serialVersionUID
2016/12/06 11:31:28.105 DEBUG ClosureCleaner:      private final org.apache.spark.mllib.clustering.KMeans org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$1.$outer
2016/12/06 11:31:28.105 DEBUG ClosureCleaner:      public final org.apache.spark.broadcast.Broadcast org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$1.bcNewCenters$1
2016/12/06 11:31:28.105 DEBUG ClosureCleaner:  + declared methods: 2
2016/12/06 11:31:28.105 DEBUG ClosureCleaner:      public final java.lang.Object org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$1.apply(java.lang.Object)
2016/12/06 11:31:28.105 DEBUG ClosureCleaner:      public final double[] org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$1.apply(scala.Tuple2)
2016/12/06 11:31:28.105 DEBUG ClosureCleaner:  + inner classes: 1
2016/12/06 11:31:28.105 DEBUG ClosureCleaner:      org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$1$$anonfun$apply$3
2016/12/06 11:31:28.105 DEBUG ClosureCleaner:  + outer classes: 1
2016/12/06 11:31:28.105 DEBUG ClosureCleaner:      org.apache.spark.mllib.clustering.KMeans
2016/12/06 11:31:28.105 DEBUG ClosureCleaner:  + outer objects: 1
2016/12/06 11:31:28.105 DEBUG ClosureCleaner:      org.apache.spark.mllib.clustering.KMeans@41711125
2016/12/06 11:31:28.105 DEBUG ClosureCleaner:  + populating accessed fields because this is the starting closure
2016/12/06 11:31:28.106 DEBUG ClosureCleaner:  + fields accessed by starting closure: 1
2016/12/06 11:31:28.106 DEBUG ClosureCleaner:      (class org.apache.spark.mllib.clustering.KMeans,Set(org$apache$spark$mllib$clustering$KMeans$$runs))
2016/12/06 11:31:28.106 DEBUG ClosureCleaner:  + outermost object is not a closure, so do not clone it: (class org.apache.spark.mllib.clustering.KMeans,org.apache.spark.mllib.clustering.KMeans@41711125)
2016/12/06 11:31:28.106 DEBUG ClosureCleaner:  +++ closure <function1> (org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$1) is now cleaned +++
2016/12/06 11:31:28.107 DEBUG ClosureCleaner: +++ Cleaning closure <function2> (org.apache.spark.mllib.clustering.KMeans$$anonfun$20) +++
2016/12/06 11:31:28.107 DEBUG ClosureCleaner:  + declared fields: 2
2016/12/06 11:31:28.107 DEBUG ClosureCleaner:      public static final long org.apache.spark.mllib.clustering.KMeans$$anonfun$20.serialVersionUID
2016/12/06 11:31:28.107 DEBUG ClosureCleaner:      private final org.apache.spark.mllib.clustering.KMeans org.apache.spark.mllib.clustering.KMeans$$anonfun$20.$outer
2016/12/06 11:31:28.107 DEBUG ClosureCleaner:  + declared methods: 2
2016/12/06 11:31:28.107 DEBUG ClosureCleaner:      public final java.lang.Object org.apache.spark.mllib.clustering.KMeans$$anonfun$20.apply(java.lang.Object,java.lang.Object)
2016/12/06 11:31:28.107 DEBUG ClosureCleaner:      public final double[] org.apache.spark.mllib.clustering.KMeans$$anonfun$20.apply(double[],double[])
2016/12/06 11:31:28.107 DEBUG ClosureCleaner:  + inner classes: 0
2016/12/06 11:31:28.107 DEBUG ClosureCleaner:  + outer classes: 1
2016/12/06 11:31:28.107 DEBUG ClosureCleaner:      org.apache.spark.mllib.clustering.KMeans
2016/12/06 11:31:28.107 DEBUG ClosureCleaner:  + outer objects: 1
2016/12/06 11:31:28.107 DEBUG ClosureCleaner:      org.apache.spark.mllib.clustering.KMeans@41711125
2016/12/06 11:31:28.107 DEBUG ClosureCleaner:  + populating accessed fields because this is the starting closure
2016/12/06 11:31:28.108 DEBUG ClosureCleaner:  + fields accessed by starting closure: 1
2016/12/06 11:31:28.108 DEBUG ClosureCleaner:      (class org.apache.spark.mllib.clustering.KMeans,Set(org$apache$spark$mllib$clustering$KMeans$$runs))
2016/12/06 11:31:28.108 DEBUG ClosureCleaner:  + outermost object is not a closure, so do not clone it: (class org.apache.spark.mllib.clustering.KMeans,org.apache.spark.mllib.clustering.KMeans@41711125)
2016/12/06 11:31:28.108 DEBUG ClosureCleaner:  +++ closure <function2> (org.apache.spark.mllib.clustering.KMeans$$anonfun$20) is now cleaned +++
2016/12/06 11:31:28.108 DEBUG ClosureCleaner: +++ Cleaning closure <function2> (org.apache.spark.mllib.clustering.KMeans$$anonfun$21) +++
2016/12/06 11:31:28.108 DEBUG ClosureCleaner:  + declared fields: 2
2016/12/06 11:31:28.109 DEBUG ClosureCleaner:      public static final long org.apache.spark.mllib.clustering.KMeans$$anonfun$21.serialVersionUID
2016/12/06 11:31:28.109 DEBUG ClosureCleaner:      private final org.apache.spark.mllib.clustering.KMeans org.apache.spark.mllib.clustering.KMeans$$anonfun$21.$outer
2016/12/06 11:31:28.109 DEBUG ClosureCleaner:  + declared methods: 2
2016/12/06 11:31:28.109 DEBUG ClosureCleaner:      public final java.lang.Object org.apache.spark.mllib.clustering.KMeans$$anonfun$21.apply(java.lang.Object,java.lang.Object)
2016/12/06 11:31:28.109 DEBUG ClosureCleaner:      public final double[] org.apache.spark.mllib.clustering.KMeans$$anonfun$21.apply(double[],double[])
2016/12/06 11:31:28.109 DEBUG ClosureCleaner:  + inner classes: 0
2016/12/06 11:31:28.109 DEBUG ClosureCleaner:  + outer classes: 1
2016/12/06 11:31:28.109 DEBUG ClosureCleaner:      org.apache.spark.mllib.clustering.KMeans
2016/12/06 11:31:28.109 DEBUG ClosureCleaner:  + outer objects: 1
2016/12/06 11:31:28.109 DEBUG ClosureCleaner:      org.apache.spark.mllib.clustering.KMeans@41711125
2016/12/06 11:31:28.109 DEBUG ClosureCleaner:  + populating accessed fields because this is the starting closure
2016/12/06 11:31:28.110 DEBUG ClosureCleaner:  + fields accessed by starting closure: 1
2016/12/06 11:31:28.110 DEBUG ClosureCleaner:      (class org.apache.spark.mllib.clustering.KMeans,Set(org$apache$spark$mllib$clustering$KMeans$$runs))
2016/12/06 11:31:28.110 DEBUG ClosureCleaner:  + outermost object is not a closure, so do not clone it: (class org.apache.spark.mllib.clustering.KMeans,org.apache.spark.mllib.clustering.KMeans@41711125)
2016/12/06 11:31:28.110 DEBUG ClosureCleaner:  +++ closure <function2> (org.apache.spark.mllib.clustering.KMeans$$anonfun$21) is now cleaned +++

> Task size in K-means is so large
> --------------------------------
>
>                 Key: SPARK-18731
>                 URL: https://issues.apache.org/jira/browse/SPARK-18731
>             Project: Spark
>          Issue Type: Improvement
>          Components: MLlib
>    Affects Versions: 1.6.1
>            Reporter: Xiaoye Sun
>            Priority: Minor
>   Original Estimate: 5h
>  Remaining Estimate: 5h
>
> When run the KMeans algorithm for a large model (e.g. 100k features and 100 centers), there will be warning shown for many of the stages saying that the task size is very large. Here is an example warning. 
> WARN TaskSetManager: Stage 23 contains a task of very large size (56256 KB). The maximum recommended task size is 100 KB.
> This could happen at (sum at KMeansModel.scala:88), (takeSample at KMeans.scala:378), (aggregate at KMeans.scala:404) and (collect at KMeans.scala:436). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org