You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Muttineni, Vinay" <vm...@ebay.com> on 2014/06/19 00:06:45 UTC
java.lang.OutOfMemoryError with saveAsTextFile
Hi,
I have a 5 million record, 300 column data set.
I am running a spark job in yarn-cluster mode, with the following args
--driver-memory 11G --executor-memory 11G --executor-cores 16 --num-executors 500
The spark job replaces all categorical variables with some integers.
I am getting the below error when I try to save the transformed data set.
java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead limit exceeded)
java.util.Arrays.copyOfRange(Arrays.java:3209)
java.lang.String.<init>(String.java:215)
java.lang.StringBuilder.toString(StringBuilder.java:430)
java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(ObjectInputStream.java:3023)
java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2819)
java.io.ObjectInputStream.readString(ObjectInputStream.java:1598)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1319)
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176)
scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
scala.collection.AbstractIterator.to(Iterator.scala:1157)
scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:257)
scala.collection.AbstractIterator.toList(Iterator.scala:1157)
scala.collection.immutable.List.$plus$plus(List.scala:193)
DataProcessor.DataTypeConverter2$$anonfun$6.apply(DataTypeConverter2.scala:137)
DataProcessor.DataTypeConverter2$$anonfun$6.apply(DataTypeConverter2.scala:137)
The code is as follows:
val transformedData = splitFileWithHeader.flatMap(rowArray => {
try {
if (rowArray.sameElements(header.value)) {
None
} else {
val transformedArray: Array[String] = new Array[String](rowArray.length)
for (i <- 0 until rowArray.length) {
// Check 1 to see if the value should be replaced, Check 2 to see if its a null value in which case, we do not update the value
if (broadcastReplacements.value(i) != null && rowArray(i).trim.toString != "") {
transformedArray.update(i, broadcastReplacements.value(i)(rowArray(i).trim.toString).toString)
} else {
transformedArray.update(i, rowArray(i).trim.toString)
}
}
Array(transformedArray.deep.mkString(","))
}
}
catch
{
case _: Throwable => {
println("Failure in transforming the file, 1 line, Around Line 131")
None
}
}
}).coalesce(1, true).mapPartitions( it => (Seq(headerLine.value) ++ it).iterator,true).coalesce(500)
// Save the Transformed Data File
transformedData.saveAsTextFile(outputFileLocation)
Any idea how I can resolve this error?
Previous stages have completed successfully.
Thank You!
Vinay
Prior Stages
val dataFile = sc.textFile(args(1),500)
//Get the first line which is the header, which would also contain the column type
val columnDefinition = dataFile.first
val headerLine = sc.broadcast(columnDefinition)
val header = sc.broadcast(columnDefinition.split(",",-1))
// Remove the Header
val modifiedDataFile = dataFile.filter(line => line != headerLine.value)
val onlySplitFile = modifiedDataFile.flatMap(line =>
{
try {
// println(line.split(' ').length)
// println(line.split(' '))
if (line.split(',').length < 1 || line.split(',').sameElements(Array(""))) {
None
} else {
Array(line.split(",",-1))
}
} catch {
case _: Throwable => None
}
})
modifiedDataFile.unpersist(true)
val currentColumn = sc.broadcast(i)
val distinctValues = onlySplitFile.flatMap(rowArray =>
{
try {
Array(rowArray(currentColumn.value).toString.trim.toString)
}
catch
{
case _: Throwable => {
println("Failure in Finding the Map: Around Line 72")
None
}
}
}).distinct(500).collect