You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Balazs Meszaros <me...@zhaw.ch> on 2015/03/19 09:09:19 UTC

OutOfMemoryError during reduce tasks

Hi,

I am trying to evaluate performance aspects of Spark in respect to 
various memory settings. What makes it more difficult is that I'm new to 
Python, but the problem at hand doesn't seem to originate from that.

I'm running a wordcount script [1] with different amounts of input data. 
There is always an OutOfMemoryError at the end of the reduce tasks [2] 
when I'm using a 1g input while 100m of data don't make a problem. Spark 
is v1.2.1 (but with v1.3 I'm having the same problem) and it runs on a 
VM with Ubuntu 14.04, 8G RAM and 4VCPU. (If something else is of 
interest, please ask)

On 
http://spark.apache.org/docs/1.2.1/tuning.html#memory-usage-of-reduce-tasks 
it's suggested to increase the parallelism which I've tried (even with 
over 4000 tasks) but nothing's changed. Other efforts with 
spark.executor.memory, spark.python.worker.memory and extraJavaOptions 
with -Xmx4g (see code below) didn't solve the problem either.

What do you suggest to get rid of the Java heap filling up completely?

Thanks
Balazs


[1] Wordcount script

import sys
import time
from operator import add
from pyspark import SparkContext, SparkConf
from signal import signal, SIGPIPE, SIG_DFL

def encode(text):
     """
     For printing unicode characters to the console.
     """
     return text.encode('utf-8')

if __name__ == "__main__":
     start_time = time.time()

     if len(sys.argv) != 2:
         print >> sys.stderr, "Usage: wordcount <file>"
         exit(-1)
     conf = (SparkConf()
              .setMaster("local")
              .setAppName("PythonWordCount")
              .set("spark.executor.memory", "6g")
              .set("spark.python.worker.memory","6g")
              .set("spark.default.parallelism",120)
              .set("spark.driver.extraJavaOptions","-Xmx4g"))
     sc = SparkContext(conf = conf)
     lines = sc.textFile(sys.argv[1], 1)
     counts = lines.flatMap(lambda x: x.split(' ')) \
                   .map(lambda x: (x, 1)) \
                   .reduceByKey(add)
     output = counts.collect()
# output would take too long and the important thing is the processing time
#    for (word, count) in output:
#        print encode("%s: %i" % (word, count))
#    print("%f seconds" % (time.time() - start_time))

     sc.stop()

     print("%f seconds" % (time.time() - start_time))

[2] OutOfMemoryError at reduce tasks
...
15/03/19 07:58:52 INFO ShuffleBlockFetcherIterator: Getting 30 non-empty 
blocks out of 30 blocks
15/03/19 07:58:52 INFO ShuffleBlockFetcherIterator: Started 0 remote 
fetches in 0 ms
15/03/19 07:58:52 INFO TaskSetManager: Finished task 99.0 in stage 1.0 
(TID 129) in 1096 ms on localhost (100/120)
15/03/19 07:58:52 INFO PythonRDD: Times: total = 351, boot = -530, init 
= 534, finish = 347
15/03/19 07:58:52 ERROR Executor: Exception in task 100.0 in stage 1.0 
(TID 130)
java.lang.OutOfMemoryError: Java heap space
     at java.util.Arrays.copyOf(Arrays.java:2271)
     at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
     at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
     at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
     at 
java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1852)
     at java.io.ObjectOutputStream.write(ObjectOutputStream.java:708)
     at org.apache.spark.util.Utils$.writeByteBuffer(Utils.scala:164)
     at 
org.apache.spark.scheduler.DirectTaskResult$$anonfun$writeExternal$1.apply$mcV$sp(TaskResult.scala:48)
     at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1137)
     at 
org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:45)
     at 
java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1458)
     at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
     at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
     at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
     at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
     at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:226)
     at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
     at java.lang.Thread.run(Thread.java:745)
15/03/19 07:58:52 ERROR SparkUncaughtExceptionHandler: Uncaught 
exception in thread Thread[Executor task launch worker-3,5,main]
...

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org