You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/15 08:46:01 UTC

[jira] [Updated] (SPARK-17634) Spark job hangs when using dapply

     [ https://issues.apache.org/jira/browse/SPARK-17634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon updated SPARK-17634:
---------------------------------
    Priority: Major  (was: Critical)

> Spark job hangs when using dapply
> ---------------------------------
>
>                 Key: SPARK-17634
>                 URL: https://issues.apache.org/jira/browse/SPARK-17634
>             Project: Spark
>          Issue Type: Bug
>          Components: SparkR
>    Affects Versions: 2.0.0
>            Reporter: Thomas Powell
>            Priority: Major
>
> I'm running into an issue when using dapply on yarn. I have a data frame backed by files in parquet with around 200 files that is around 2GB. When I load this in with the new partition coalescing it ends up having around 20 partitions so each one roughly 100MB. The data frame itself has 4 columns of integers and doubles. If I run a count over this things work fine.
> However, if I add a {{dapply}} in between the read and the {{count}} that just uses an identity function the tasks hang and make no progress. Both the R and Java processes are running on the Spark nodes and are listening on the {{SPARKR_WORKER_PORT}}.
> {{result <- dapply(df, function(x){x}, SparkR::schema(df))}}
> I took a jstack of the Java process and see that it is just listening on the socket but never seems to make any progress. The R process is harder to debug what it is doing.
> {code}
> Thread 112823: (state = IN_NATIVE)
>  - java.net.SocketInputStream.socketRead0(java.io.FileDescriptor, byte[], int, int, int) @bci=0 (Interpreted frame)
>  - java.net.SocketInputStream.socketRead(java.io.FileDescriptor, byte[], int, int, int) @bci=8, line=116 (Interpreted frame)
>  - java.net.SocketInputStream.read(byte[], int, int, int) @bci=79, line=170 (Interpreted frame)
>  - java.net.SocketInputStream.read(byte[], int, int) @bci=11, line=141 (Interpreted frame)
>  - java.io.BufferedInputStream.fill() @bci=214, line=246 (Interpreted frame)
>  - java.io.BufferedInputStream.read() @bci=12, line=265 (Compiled frame)
>  - java.io.DataInputStream.readInt() @bci=4, line=387 (Compiled frame)
>  - org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read() @bci=4, line=212 (Interpreted frame)
>  - org.apache.spark.api.r.RRunner$$anon$1.<init>(org.apache.spark.api.r.RRunner) @bci=25, line=96 (Interpreted frame)
>  - org.apache.spark.api.r.RRunner.compute(scala.collection.Iterator, int) @bci=109, line=87 (Interpreted frame)
>  - org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(scala.collection.Iterator) @bci=322, line=59 (Interpreted frame)
>  - org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(java.lang.Object) @bci=5, line=29 (Interpreted frame)
>  - org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(scala.collection.Iterator) @bci=59, line=178 (Interpreted frame)
>  - org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(java.lang.Object) @bci=5, line=175 (Interpreted frame)
>  - org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(org.apache.spark.TaskContext, int, scala.collection.Iterator) @bci=8, line=784 (Interpreted frame)
>  - org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(java.lang.Object, java.lang.Object, java.lang.Object) @bci=13, line=784 (Interpreted frame)
>  - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=27, line=38 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame)
>  - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame)
>  - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame)
>  - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=168, line=79 (Interpreted frame)
>  - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=2, line=47 (Interpreted frame)
>  - org.apache.spark.scheduler.Task.run(long, int, org.apache.spark.metrics.MetricsSystem) @bci=82, line=85 (Interpreted frame)
>  - org.apache.spark.executor.Executor$TaskRunner.run() @bci=374, line=274 (Interpreted frame)
>  - java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker) @bci=95, line=1142 (Interpreted frame)
>  - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=617 (Interpreted frame)
>  - java.lang.Thread.run() @bci=11, line=745 (Interpreted frame)
> {code}
> Any recommendations on how best to debug? Nothing appears in the logs since the processes don't actually fail. The executors themselves have 4GB of memory which should be more than enough.
> My feeling is this could be something around serialization?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org