You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Muttineni, Vinay" <vm...@ebay.com> on 2014/06/19 00:06:45 UTC

java.lang.OutOfMemoryError with saveAsTextFile

Hi,
I have a 5 million record, 300 column data set.
I am running a spark job in yarn-cluster mode, with the following args
--driver-memory 11G --executor-memory 11G --executor-cores 16  --num-executors 500
The spark job replaces all categorical variables with some integers.
I am getting the below error when I try to save the transformed data set.

java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead limit exceeded)
java.util.Arrays.copyOfRange(Arrays.java:3209)
java.lang.String.<init>(String.java:215)
java.lang.StringBuilder.toString(StringBuilder.java:430)
java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(ObjectInputStream.java:3023)
java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2819)
java.io.ObjectInputStream.readString(ObjectInputStream.java:1598)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1319)
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176)
scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
scala.collection.AbstractIterator.to(Iterator.scala:1157)
scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:257)
scala.collection.AbstractIterator.toList(Iterator.scala:1157)
scala.collection.immutable.List.$plus$plus(List.scala:193)
DataProcessor.DataTypeConverter2$$anonfun$6.apply(DataTypeConverter2.scala:137)
DataProcessor.DataTypeConverter2$$anonfun$6.apply(DataTypeConverter2.scala:137)

The code is as follows:
  val transformedData = splitFileWithHeader.flatMap(rowArray => {
      try {
        if (rowArray.sameElements(header.value)) {
          None
        } else {
          val transformedArray: Array[String] = new Array[String](rowArray.length)
          for (i <- 0 until rowArray.length) {
            //          Check 1 to see if the value should be replaced, Check 2 to see if its a null value in which case, we do not update the value
            if (broadcastReplacements.value(i) != null && rowArray(i).trim.toString != "") {
              transformedArray.update(i, broadcastReplacements.value(i)(rowArray(i).trim.toString).toString)
            } else {
              transformedArray.update(i, rowArray(i).trim.toString)
            }
          }
          Array(transformedArray.deep.mkString(","))
        }
      }
      catch
        {

          case _: Throwable => {
            println("Failure in transforming the file, 1 line, Around Line 131")
            None
          }

        }

    }).coalesce(1, true).mapPartitions( it => (Seq(headerLine.value) ++ it).iterator,true).coalesce(500)

    //    Save the Transformed Data File
    transformedData.saveAsTextFile(outputFileLocation)


Any idea how I can resolve this error?
Previous stages have completed successfully.
Thank You!
Vinay



Prior Stages

   val dataFile = sc.textFile(args(1),500)
    //Get the first line which is the header, which would also contain the column type
    val columnDefinition = dataFile.first
    val headerLine = sc.broadcast(columnDefinition)
    val header = sc.broadcast(columnDefinition.split(",",-1))
    //      Remove the Header
    val modifiedDataFile = dataFile.filter(line => line != headerLine.value)
    val onlySplitFile = modifiedDataFile.flatMap(line =>
      {
        try {
          //                println(line.split(' ').length)
          //                println(line.split(' '))
          if (line.split(',').length < 1 || line.split(',').sameElements(Array(""))) {
            None
          } else {
            Array(line.split(",",-1))
          }

        } catch {
          case _: Throwable => None
        }
      })
    modifiedDataFile.unpersist(true)

val currentColumn = sc.broadcast(i)
val distinctValues = onlySplitFile.flatMap(rowArray =>
          {
            try {
              Array(rowArray(currentColumn.value).toString.trim.toString)
            }
            catch
              {
                case _: Throwable => {
                  println("Failure in Finding the Map: Around Line 72")
                  None
                }
              }

          }).distinct(500).collect