You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Weichen Xu (JIRA)" <ji...@apache.org> on 2017/09/21 12:51:00 UTC

[jira] [Commented] (SPARK-19141) VectorAssembler metadata causing memory issues

    [ https://issues.apache.org/jira/browse/SPARK-19141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174685#comment-16174685 ] 

Weichen Xu commented on SPARK-19141:
------------------------------------

Maybe we need design a sparse format of AttributeGroup for vector ML column. We don't need create Attribute for each vector dimension. The better way I think is only when needed we create it. But `VectorAssembler` create attribute for each dimension, in any case. Current design looks stupid.


> VectorAssembler metadata causing memory issues
> ----------------------------------------------
>
>                 Key: SPARK-19141
>                 URL: https://issues.apache.org/jira/browse/SPARK-19141
>             Project: Spark
>          Issue Type: Bug
>          Components: ML, MLlib
>    Affects Versions: 1.6.0, 2.0.0, 2.1.0
>         Environment: Windows 10, Ubuntu 16.04.1, Scala 2.11.8, Spark 1.6.0, 2.0.0, 2.1.0
>            Reporter: Antonia Oprescu
>
> VectorAssembler produces unnecessary metadata that overflows the Java heap in the case of sparse vectors. In the example below, the logical length of the vector is 10^6, but the number of non-zero values is only 2.
> The problem arises when the vector assembler creates metadata (ML attributes) for each of the 10^6 slots, even if this metadata didn't exist upstream (i.e. HashingTF doesn't produce metadata per slot). Here is a chunk of metadata it produces:
> {noformat}
> {"ml_attr":{"attrs":{"numeric":[{"idx":0,"name":"HashedFeat_0"},{"idx":1,"name":"HashedFeat_1"},{"idx":2,"name":"HashedFeat_2"},{"idx":3,"name":"HashedFeat_3"},{"idx":4,"name":"HashedFeat_4"},{"idx":5,"name":"HashedFeat_5"},{"idx":6,"name":"HashedFeat_6"},{"idx":7,"name":"HashedFeat_7"},{"idx":8,"name":"HashedFeat_8"},{"idx":9,"name":"HashedFeat_9"},...,{"idx":1000000,"name":"Feat01"}]},"num_attrs":1000001}}
> {noformat}
> In this lightweight example, the feature size limit seems to be 1,000,000 when run locally, but this scales poorly with more complicated routines. With a larger dataset and a learner (say LogisticRegression), it maxes out anywhere between 10k and 100k hash size even on a decent sized cluster.
> I did some digging, and it seems that the only metadata necessary for downstream learners is the one indicating categorical columns. Thus, I thought of the following possible solutions:
> 1. Compact representation of ml attributes metadata (but this seems to be a bigger change)
> 2. Removal of non-categorical tags from the metadata created by the VectorAssembler
> 3. An option on the existent VectorAssembler to skip unnecessary ml attributes or create another transformer altogether
> I would happy to take a stab at any of these solutions, but I need some direction from the Spark community.
> {code:title=VABug.scala |borderStyle=solid}
> import org.apache.spark.SparkConf
> import org.apache.spark.ml.feature.{HashingTF, VectorAssembler}
> import org.apache.spark.sql.SparkSession
> object VARepro {
>   case class Record(Label: Double, Feat01: Double, Feat02: Array[String])
>   def main(args: Array[String]) {
>     val conf = new SparkConf()
>       .setAppName("Vector assembler bug")
>       .setMaster("local[*]")
>     val spark = SparkSession.builder.config(conf).getOrCreate()
>     import spark.implicits._
>     val df = Seq(Record(1.0, 2.0, Array("4daf")), Record(0.0, 3.0, Array("a9ee"))).toDS()
>     val numFeatures = 10000000
>     val hashingScheme = new HashingTF().setInputCol("Feat02").setOutputCol("HashedFeat").setNumFeatures(numFeatures)
>     val hashedData = hashingScheme.transform(df)
>     val vectorAssembler = new VectorAssembler().setInputCols(Array("HashedFeat","Feat01")).setOutputCol("Features")
>     val processedData = vectorAssembler.transform(hashedData).select("Label", "Features")
>     processedData.show()
>   }
> }
> {code}
> *Stacktrace from the example above:*
> Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
> 	at org.apache.spark.ml.attribute.NumericAttribute.copy(attributes.scala:272)
> 	at org.apache.spark.ml.attribute.NumericAttribute.withIndex(attributes.scala:215)
> 	at org.apache.spark.ml.attribute.NumericAttribute.withIndex(attributes.scala:195)
> 	at org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3$$anonfun$apply$1.apply(AttributeGroup.scala:71)
> 	at org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3$$anonfun$apply$1.apply(AttributeGroup.scala:70)
> 	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> 	at scala.collection.IterableLike$class.copyToArray(IterableLike.scala:254)
> 	at scala.collection.SeqViewLike$AbstractTransformed.copyToArray(SeqViewLike.scala:37)
> 	at scala.collection.TraversableOnce$class.copyToArray(TraversableOnce.scala:278)
> 	at scala.collection.SeqViewLike$AbstractTransformed.copyToArray(SeqViewLike.scala:37)
> 	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:286)
> 	at scala.collection.SeqViewLike$AbstractTransformed.toArray(SeqViewLike.scala:37)
> 	at org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3.apply(AttributeGroup.scala:72)
> 	at org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3.apply(AttributeGroup.scala:72)
> 	at scala.Option.map(Option.scala:146)
> 	at org.apache.spark.ml.attribute.AttributeGroup.<init>(AttributeGroup.scala:70)
> 	at org.apache.spark.ml.attribute.AttributeGroup.<init>(AttributeGroup.scala:65)
> 	at org.apache.spark.ml.feature.VectorAssembler.transform(VectorAssembler.scala:95)
> 	at VARepro$.main(VARepro.scala:36)
> *Exception when run in conjuction with a learner on a bigger dataset (~10Gb) on a cluster.*
> : java.lang.OutOfMemoryError: Java heap space
> 	at java.util.Arrays.copyOf(Arrays.java:3236)
> 	at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
> 	at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> 	at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> 	at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
> 	at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> 	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> 	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
> 	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
> 	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
> 	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
> 	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
> 	at org.apache.spark.SparkContext.clean(SparkContext.scala:2054)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org