You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by pthai <th...@gmail.com> on 2014/11/21 18:44:42 UTC

JVM Memory Woes

Hello!

I am debugging a reduceByKey job in Spark v0.9.1
The main issue is that an executor dies after the reduceByKey job succeeds.
This is detected by Spark, which then starts re-running all the tasks for
that stage.

I've been working at this for over a day now, so any help would be
fantastic!

*Let me provide some details about my job & cluster settings:*
8 workers each with 32 cores & 60GB ram
SPARK_MEM=48g
SPARK_WORKER_MEM=48g
-Dspark.shuffle.memoryFraction=0.2
-Dspark.storage.memoryFraction=0.2
-Dspark.default.parallelism=1024

*I can't figure out why the executor dies. Here is the stdout from that
executor that died:
*
OpenJDK 64-Bit Server VM warning: INFO:
os::commit_memory(0x00007fbb39280000, 42991616, 0) failed; error='Cannot
allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (malloc) failed to allocate 42991616 bytes for
committing reserved memory.
# An error report file with more information is saved as:
# /tmp/spark/work/app-20141121172318-0005/1/hs_err_pid25755.log


*Here're the significant cuts of the output from the driver when the
executor dies after reduceByKey finishes:
*
14/11/21 06:47:22 INFO scheduler.TaskSetManager: Finished TID 4482 in 21681
ms on ip-10-144-235-194.ec2.internal (progress: 4608/4608)
14/11/21 06:47:22 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(3,
4331)
14/11/21 06:47:22 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 3.0,
whose tasks have all completed, from pool 
14/11/21 06:47:22 INFO scheduler.DAGScheduler: Stage 3 (reduceByKey at
routines.scala:278) finished in 185.384 s
14/11/21 06:47:22 INFO scheduler.DAGScheduler: looking for newly runnable
stages
14/11/21 06:47:22 INFO scheduler.DAGScheduler: running: Set()
14/11/21 06:47:22 INFO scheduler.DAGScheduler: waiting: Set(Stage 2)
14/11/21 06:47:22 INFO scheduler.DAGScheduler: failed: Set()
14/11/21 06:47:22 INFO scheduler.DAGScheduler: Missing parents for Stage 2:
List()
14/11/21 06:47:22 INFO scheduler.DAGScheduler: Submitting Stage 2
(FilteredRDD[18] at filter at routines.scala:279), which is now runnable
14/11/21 06:47:22 INFO scheduler.DAGScheduler: Submitting 1024 missing tasks
from Stage 2 (FilteredRDD[18] at filter at routines.scala:279)
14/11/21 06:47:22 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with
1024 tasks
14/11/21 06:47:22 INFO scheduler.TaskSetManager: Starting task 2.0:0 as TID
4756 on executor 2: ip-10-113-180-17.ec2.internal (PROCESS_LOCAL)
14/11/21 06:47:22 INFO scheduler.TaskSetManager: Serialized task 2.0:0 as
2194 bytes in 0 ms
14/11/21 06:47:22 INFO scheduler.TaskSetManager: Starting task 2.0:1 as TID
4757 on executor 5: ip-10-5-150-185.ec2.internal (PROCESS_LOCAL)
14/11/21 06:47:22 INFO scheduler.TaskSetManager: Serialized task 2.0:1 as
2194 bytes in 0 ms
14/11/21 06:47:22 INFO scheduler.TaskSetManager: Starting task 2.0:2 as TID
4758 on executor 1: ip-10-37-170-43.ec2.internal (PROCESS_LOCAL)
[ truncate ]
14/11/21 06:47:22 INFO scheduler.TaskSetManager: Serialized task 2.0:222 as
2194 bytes in 0 ms
14/11/21 06:47:22 INFO scheduler.TaskSetManager: Starting task 2.0:223 as
TID 4979 on executor 3: ip-10-101-204-143.ec2.internal (PROCESS_LOCAL)
14/11/21 06:47:22 INFO scheduler.TaskSetManager: Serialized task 2.0:223 as
2194 bytes in 0 ms
14/11/21 06:47:22 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 0 to spark@ip-10-7-149-203.ec2.internal:54264
14/11/21 06:47:23 INFO spark.MapOutputTrackerMaster: Size of output statuses
for shuffle 0 is 1585800 bytes
14/11/21 06:47:23 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 0 to spark@ip-10-101-204-143.ec2.internal:36102
14/11/21 06:47:23 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 0 to spark@ip-10-5-150-185.ec2.internal:50972
14/11/21 06:47:23 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 0 to spark@ip-10-37-170-43.ec2.internal:36199
14/11/21 06:47:23 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 0 to spark@ip-10-11-163-49.ec2.internal:51003
14/11/21 06:47:23 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 0 to spark@ip-10-144-235-194.ec2.internal:40101
14/11/21 06:47:23 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 0 to spark@ip-10-113-180-17.ec2.internal:35881
14/11/21 06:47:27 INFO cluster.SparkDeploySchedulerBackend: Executor 5
disconnected, so removing it
14/11/21 06:47:27 INFO client.AppClient$ClientActor: Executor updated:
app-20141121064325-0001/5 is now FAILED (Command exited with code 1)
14/11/21 06:47:27 ERROR scheduler.TaskSchedulerImpl: Lost executor 5 on
ip-10-5-150-185.ec2.internal: remote Akka client disassociated
14/11/21 06:47:27 INFO cluster.SparkDeploySchedulerBackend: Executor
app-20141121064325-0001/5 removed: Command exited with code 1
14/11/21 06:47:27 INFO scheduler.TaskSetManager: Re-queueing tasks for 5
from TaskSet 2.0
14/11/21 06:47:27 WARN scheduler.TaskSetManager: Lost TID 4946 (task
2.0:190)
14/11/21 06:47:27 WARN scheduler.TaskSetManager: Lost TID 4883 (task
2.0:127)
14/11/21 06:47:27 WARN scheduler.TaskSetManager: Lost TID 4967 (task
2.0:211)
14/11/21 06:47:27 WARN scheduler.TaskSetManager: Lost TID 4841 (task 2.0:85)
14/11/21 06:47:27 WARN scheduler.TaskSetManager: Lost TID 4904 (task
2.0:148)
14/11/21 06:47:27 WARN scheduler.TaskSetManager: Lost TID 4799 (task 2.0:43)
[ truncate ]
14/11/21 06:47:27 WARN scheduler.TaskSetManager: Lost TID 4764 (task 2.0:8)
14/11/21 06:47:27 WARN scheduler.TaskSetManager: Lost TID 4925 (task
2.0:169)
14/11/21 06:47:27 INFO client.AppClient$ClientActor: Executor added:
app-20141121064325-0001/7 on
worker-20141121063736-ip-10-5-150-185.ec2.internal-7078
(ip-10-5-150-185.ec2.internal:7078) with 32 cores
14/11/21 06:47:27 INFO cluster.SparkDeploySchedulerBackend: Granted executor
ID app-20141121064325-0001/7 on hostPort ip-10-5-150-185.ec2.internal:7078
with 32 cores, 30.0 GB RAM
14/11/21 06:47:27 INFO client.AppClient$ClientActor: Executor updated:
app-20141121064325-0001/7 is now RUNNING
14/11/21 06:47:27 INFO scheduler.DAGScheduler: Executor lost: 5 (epoch 1)
14/11/21 06:47:27 INFO storage.BlockManagerMasterActor: Trying to remove
executor 5 from BlockManagerMaster.
14/11/21 06:47:27 INFO storage.BlockManagerMaster: Removed 5 successfully in
removeExecutor
14/11/21 06:47:27 INFO scheduler.Stage: Stage 3 is now unavailable on
executor 5 (3927/4608, false)
14/11/21 06:47:27 INFO cluster.SparkDeploySchedulerBackend: Executor 1
disconnected, so removing it
14/11/21 06:47:27 ERROR scheduler.TaskSchedulerImpl: Lost executor 1 on
ip-10-37-170-43.ec2.internal: remote Akka client disassociated
14/11/21 06:47:27 INFO scheduler.TaskSetManager: Re-queueing tasks for 1
from TaskSet 2.0
14/11/21 06:47:27 WARN scheduler.TaskSetManager: Lost TID 4919 (task
2.0:163)
14/11/21 06:47:27 WARN scheduler.TaskSetManager: Lost TID 4940 (task
2.0:184)
14/11/21 06:47:27 WARN scheduler.TaskSetManager: Lost TID 4814 (task 2.0:58)
[ truncate ]
14/11/21 06:47:27 WARN scheduler.TaskSetManager: Lost TID 4779 (task 2.0:23)
14/11/21 06:47:27 WARN scheduler.TaskSetManager: Lost TID 4961 (task
2.0:205)
14/11/21 06:47:27 INFO scheduler.DAGScheduler: Executor lost: 1 (epoch 2)
14/11/21 06:47:27 INFO storage.BlockManagerMasterActor: Trying to remove
executor 1 from BlockManagerMaster.
14/11/21 06:47:27 INFO storage.BlockManagerMaster: Removed 1 successfully in
removeExecutor
14/11/21 06:47:27 INFO client.AppClient$ClientActor: Executor updated:
app-20141121064325-0001/1 is now FAILED (Command exited with code 1)
14/11/21 06:47:27 INFO cluster.SparkDeploySchedulerBackend: Executor
app-20141121064325-0001/1 removed: Command exited with code 1
14/11/21 06:47:27 INFO client.AppClient$ClientActor: Executor added:
app-20141121064325-0001/8 on
worker-20141121063752-ip-10-37-170-43.ec2.internal-7078
(ip-10-37-170-43.ec2.internal:7078) with 32 cores
14/11/21 06:47:27 INFO cluster.SparkDeploySchedulerBackend: Granted executor
ID app-20141121064325-0001/8 on hostPort ip-10-37-170-43.ec2.internal:7078
with 32 cores, 30.0 GB RAM
14/11/21 06:47:27 INFO client.AppClient$ClientActor: Executor updated:
app-20141121064325-0001/8 is now RUNNING
14/11/21 06:47:27 INFO scheduler.Stage: Stage 3 is now unavailable on
executor 1 (3278/4608, false)
14/11/21 06:47:27 INFO scheduler.TaskSetManager: Starting task 2.0:205 as
TID 4980 on executor 6: ip-10-7-149-203.ec2.internal (PROCESS_LOCAL)
14/11/21 06:47:27 INFO scheduler.TaskSetManager: Serialized task 2.0:205 as
2194 bytes in 0 ms
14/11/21 06:47:27 INFO scheduler.TaskSetManager: Starting task 2.0:23 as TID
4981 on executor 6: ip-10-7-149-203.ec2.internal (PROCESS_LOCAL)
14/11/21 06:47:27 INFO scheduler.TaskSetManager: Serialized task 2.0:23 as
2194 bytes in 0 ms
14/11/21 06:47:27 WARN scheduler.TaskSetManager: Lost TID 4837 (task 2.0:81)
14/11/21 06:47:27 WARN scheduler.TaskSetManager: Loss was due to fetch
failure from BlockManagerId(5, ip-10-5-150-185.ec2.internal, 46714, 0)
14/11/21 06:47:27 WARN scheduler.TaskSetManager: Loss was due to fetch
failure from BlockManagerId(5, ip-10-5-150-185.ec2.internal, 46714, 0)
14/11/21 06:47:27 INFO scheduler.DAGScheduler: Marking Stage 2 (foreach at
routines.scala:294) for resubmision due to a fetch failure
14/11/21 06:47:27 INFO scheduler.DAGScheduler: The failed fetch was from
Stage 3 (reduceByKey at routines.scala:278); marking it for resubmission
14/11/21 06:47:27 WARN scheduler.TaskSetManager: Loss was due to fetch
failure from BlockManagerId(5, ip-10-5-150-185.ec2.internal, 46714, 0)
14/11/21 06:47:27 INFO scheduler.DAGScheduler: Marking Stage 2 (foreach at
routines.scala:294) for resubmision due to a fetch failure
14/11/21 06:47:27 INFO scheduler.DAGScheduler: The failed fetch was from
Stage 3 (reduceByKey at routines.scala:278); marking it for resubmission
14/11/21 06:47:27 INFO scheduler.DAGScheduler: Marking Stage 2 (foreach at
routines.scala:294) for resubmision due to a fetch failure
14/11/21 06:47:27 INFO scheduler.DAGScheduler: The failed fetch was from
Stage 3 (reduceByKey at routines.scala:278); marking it for resubmission
14/11/21 06:47:27 WARN scheduler.TaskSetManager: Loss was due to fetch
failure from BlockManagerId(5, ip-10-5-150-185.ec2.internal, 46714, 0)



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/JVM-Memory-Woes-tp19496.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: JVM Memory Woes

Posted by Peter Thai <th...@gmail.com>.
Quick update: 
It is a filter job that creates the error above, not the reduceByKey

Why would a filter cause an out of memory? 

Here is my code

val inputgsup
="hdfs://"+sparkmasterip+"/user/sense/datasets/gsup/binary/30/2014/11/0[1-9]/part*";
val gsupfile =
sc.newAPIHadoopFile[BytesWritable,BytesWritable,SequenceFileAsBinaryInputFormat](inputgsup)
val gsup = gsupfile.map(x => (GsupHandler.DeserializeKey( x._1.getBytes
),GsupHandler.DeserializeValue( x._2.getBytes ))).map(x =>
(x._1._1,x._1._2,x._2._1, x._2._2))
val gsup_results_geod = gsup.flatMap(x=> doQueryGSUP(has_expo_criteria,
has_fence_criteria, timerange_start_expo, timerange_end_expo,
timerange_start_fence, timerange_end_fence, expo_pois, fence_pois,x))
val gsup_results_reduced =
gsup_results_geod.reduceByKey((a,b)=>((a._1.toShort | b._1.toShort).toByte,
a._2+b._2))

*val gsup_results = gsup_results_reduced.filter(x=>(criteria_filter.value
contains x._2._1.toInt))*



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/JVM-Memory-Woes-tp19496p19510.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org