You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Laurens (Jira)" <ji...@apache.org> on 2021/03/10 13:56:00 UTC

[jira] [Comment Edited] (SPARK-34680) Spark hangs when out of diskspace

    [ https://issues.apache.org/jira/browse/SPARK-34680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17298849#comment-17298849 ] 

Laurens edited comment on SPARK-34680 at 3/10/21, 1:55 PM:
-----------------------------------------------------------

Setup code (master is determined by slurm like 'node008`, num_machines is 11 (one drive, 10 workers), spark_version is '3.1.1'
{code:java}
findspark.init(f'/var/scratch/big-data-frameworks/spark-{spark_version}') 
spark = SparkSession.builder \
	.master("spark://" + master_node + ":7077") \
	.appName("Analysis") \
	.config("spark.executor.memory", "60G") \
	.config("spark.executor.cores", "16") \
	.config("spark.executor.instances", "1") \
	.config("spark.driver.memory", "60G") \
	.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
	.config("spark.sql.execution.arrow.pyspark.fallback.enabled", "true") \
	.config("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \
	.config("spark.executor.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \
	.config("spark.local.dir", "/local/spark, /tmp") \
	.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
	.config("spark.kryoserializer.buffer.max", "128m") \
	.config('spark.memory.storageFraction', 0.4) \
	.config('spark.memory.fraction', 0.5) \
	.config('spark.rdd.compress', 'true') \
	.config('spark.checkpoint.compress', 'true') \
	.config('spark.sql.shuffle.partitions', 4*(num_machines-1)*16) \
	.getOrCreate()
{code}
{code:java}
kdf = ks.read_parquet(os.path.join(hdfs_path, folder, "tasks", "schema-1.0"),
			 columns=[
				 "workflow_id", "id", "task_id", "children", "parents", "ts_submit", "runtime"
			 ], pandas_metadata=False, engine='pyarrow')
	
os.makedirs(output_location_look_ahead, exist_ok=True)
kdf = kdf.rename(columns={"task_id": "id"})
# Downcast columns that can be downcast to save RAM.
kdf = kdf.astype({"ts_submit":'int32', "runtime":'int32'})
kdf = kdf[(kdf['children'].map(len) > 0) | (kdf['parents'].map(len) > 0)]
grouped_df = kdf.groupby("workflow_id")
grouped_df.apply(lookahead_newer) \
	.to_parquet(output_location_look_ahead, compression='snappy', engine='pyarrow')
{code}
lookahead_newer is a UDF that given a (Pandas) DataFrame from [this dataset|http://wta.atlarge-research.com/alibaba2018.html] performs some computations (I do not think these are relevant, besides it consuming memory).


was (Author: lversluis):
Setup code (master is determined by slurm like 'node008`, num_machines is 11 (one drive, 10 workers), spark_version is '3.1.1'
{code:java}
findspark.init(f'/var/scratch/big-data-frameworks/spark-{spark_version}') 
spark = SparkSession.builder \
	.master("spark://" + master_node + ":7077") \
	.appName("Analysis") \
	.config("spark.executor.memory", "60G") \
	.config("spark.executor.cores", "16") \
	.config("spark.executor.instances", "1") \
	.config("spark.driver.memory", "60G") \
	.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
	.config("spark.sql.execution.arrow.pyspark.fallback.enabled", "true") \
	.config("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \
	.config("spark.executor.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \
	.config("spark.local.dir", "/local/spark, /tmp") \
	.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
	.config("spark.kryoserializer.buffer.max", "128m") \
	.config('spark.memory.storageFraction', 0.4) \
	.config('spark.memory.fraction', 0.5) \
	.config('spark.rdd.compress', 'true') \
	.config('spark.checkpoint.compress', 'true') \
	.config('spark.sql.shuffle.partitions', 4*(num_machines-1)*16) \
	.getOrCreate()
{code}
{code:java}
kdf = ks.read_parquet(os.path.join(hdfs_path, folder, "tasks", "schema-1.0"),
			 columns=[
				 "workflow_id", "id", "task_id", "children", "parents", "ts_submit", "runtime"
			 ], pandas_metadata=False, engine='pyarrow')
	
os.makedirs(output_location_look_ahead, exist_ok=True)
kdf = kdf.rename(columns={"task_id": "id"})
# Downcast columns that can be downcast to save RAM.
kdf = kdf.astype({"ts_submit":'int32', "runtime":'int32'})
kdf = kdf[(kdf['children'].map(len) > 0) | (kdf['parents'].map(len) > 0)]
grouped_df = kdf.groupby("workflow_id")os.makedirs(output_location_look_ahead, exist_ok=True)
grouped_df.apply(lookahead_newer) \
	.to_parquet(output_location_look_ahead, compression='snappy', engine='pyarrow')
{code}
lookahead_newer is a UDF that given a (Pandas) DataFrame from [this dataset|http://wta.atlarge-research.com/alibaba2018.html] performs some computations (I do not think these are relevant, besides it consuming memory).

> Spark hangs when out of diskspace
> ---------------------------------
>
>                 Key: SPARK-34680
>                 URL: https://issues.apache.org/jira/browse/SPARK-34680
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.0.1, 3.1.1
>         Environment: Running Spark and Pyspark 3.1.1. with Hadoop 3.2.2 and Koalas 1.6.0.
> Some environment variables:
> |Java Home|/usr/lib/jvm/java-11-openjdk-11.0.3.7-0.el7_6.x86_64|
> |Java Version|11.0.3 (Oracle Corporation)|
> |Scala Version|version 2.12.10|
>            Reporter: Laurens
>            Priority: Major
>
> Parsing a workflow using Koalas, I noticed a stage is hanging for 8 hours already. I checked the logs and the last output is:
> {code:java}
> 21/03/09 13:50:31 ERROR TaskMemoryManager: error while calling spill() on org.apache.spark.shuffle.sort.ShuffleExternalSorter@4127a515
> java.io.IOException: No space left on device
>  at java.base/java.io.FileOutputStream.writeBytes(Native Method)
>  at java.base/java.io.FileOutputStream.write(FileOutputStream.java:354)
>  at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:59)
>  at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
>  at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:127)
>  at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:223)
>  at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:176)
>  at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:260)
>  at org.apache.spark.shuffle.sort.ShuffleExternalSorter.writeSortedFile(ShuffleExternalSorter.java:218)
>  at org.apache.spark.shuffle.sort.ShuffleExternalSorter.spill(ShuffleExternalSorter.java:276)
>  at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:208)
>  at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:289)
>  at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:116)
>  at org.apache.spark.shuffle.sort.ShuffleExternalSorter.acquireNewPageIfNecessary(ShuffleExternalSorter.java:385)
>  at org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:409)
>  at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:249)
>  at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:178)
>  at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
>  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
>  at org.apache.spark.scheduler.Task.run(Task.scala:131)
>  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
>  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:834)
>  Suppressed: java.io.IOException: No space left on device
>  at java.base/java.io.FileOutputStream.writeBytes(Native Method)
>  at java.base/java.io.FileOutputStream.write(FileOutputStream.java:354)
>  at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:59)
>  at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
>  at java.base/java.io.BufferedOutputStream.flush(BufferedOutputStream.java:142)
>  at net.jpountz.lz4.LZ4BlockOutputStream.flush(LZ4BlockOutputStream.java:243)
>  at org.apache.spark.serializer.DummySerializerInstance$1.flush(DummySerializerInstance.java:50)
>  at org.apache.spark.storage.DiskBlockObjectWriter.commitAndGet(DiskBlockObjectWriter.scala:173)
>  at org.apache.spark.storage.DiskBlockObjectWriter.$anonfun$close$1(DiskBlockObjectWriter.scala:156)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
>  at org.apache.spark.storage.DiskBlockObjectWriter.close(DiskBlockObjectWriter.scala:158)
>  at org.apache.spark.shuffle.sort.ShuffleExternalSorter.writeSortedFile(ShuffleExternalSorter.java:226)
>  ... 18 more
>  Suppressed: java.io.IOException: No space left on device
>  at java.base/java.io.FileOutputStream.writeBytes(Native Method)
>  at java.base/java.io.FileOutputStream.write(FileOutputStream.java:354)
>  at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:59)
>  at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
>  at java.base/java.io.BufferedOutputStream.flush(BufferedOutputStream.java:142)
>  at java.base/java.io.FilterOutputStream.close(FilterOutputStream.java:182)
>  at org.apache.spark.storage.DiskBlockObjectWriter$ManualCloseBufferedOutputStream$1.org$apache$spark$storage$DiskBlockObjectWriter$ManualCloseOutputStream$$super$close(DiskBlockObjectWriter.scala:108)
>  at org.apache.spark.storage.DiskBlockObjectWriter$ManualCloseOutputStream.manualClose(DiskBlockObjectWriter.scala:65)
>  at org.apache.spark.storage.DiskBlockObjectWriter$ManualCloseOutputStream.manualClose$(DiskBlockObjectWriter.scala:64)
>  at org.apache.spark.storage.DiskBlockObjectWriter$ManualCloseBufferedOutputStream$1.manualClose(DiskBlockObjectWriter.scala:108)
>  at org.apache.spark.storage.DiskBlockObjectWriter.$anonfun$closeResources$1(DiskBlockObjectWriter.scala:135)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
>  at org.apache.spark.storage.DiskBlockObjectWriter.closeResources(DiskBlockObjectWriter.scala:136)
>  at org.apache.spark.storage.DiskBlockObjectWriter.$anonfun$close$2(DiskBlockObjectWriter.scala:158)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1448)
>  ... 20 more
> 21/03/09 13:50:31 INFO TaskMemoryManager: Memory used in task 1255
> 21/03/09 13:50:31 INFO TaskMemoryManager: Acquired by HybridRowQueue(org.apache.spark.memory.TaskMemoryManager@394bad48,/local/anonymized/spark/spark-4b70492b-8f2e-4108-b6a0-6ed423a98bd9/executor-b88a6782-4592-45c0-a484-73a2f642cb3e/spark-c20b49eb-83d4-4145-b07a-fe6fddef7ffe,7,org.apache.spark.serializer.SerializerManager@59dd92e8): 105.5 MiB
> 21/03/09 13:50:31 INFO TaskMemoryManager: Acquired by org.apache.spark.shuffle.sort.ShuffleExternalSorter@4127a515: 14.4 GiB
> 21/03/09 13:50:31 INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@34a4b163: 15.1 GiB
> 21/03/09 13:50:31 INFO TaskMemoryManager: 67108864 bytes of memory were used by task 1255 but are not associated with specific consumers
> 21/03/09 13:50:31 INFO TaskMemoryManager: 31853114929 bytes of memory are used for execution and 526799 bytes of memory are used for storage{code}
> Local time is 21/03/09 21:13:00, so it appears the worker is stuck and the stage is not terminating unsuccessfully.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org