You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Thomas Powell (JIRA)" <ji...@apache.org> on 2016/09/22 16:36:21 UTC

[jira] [Comment Edited] (SPARK-17634) Spark job hangs when using dapply

    [ https://issues.apache.org/jira/browse/SPARK-17634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15513673#comment-15513673 ] 

Thomas Powell edited comment on SPARK-17634 at 9/22/16 4:35 PM:
----------------------------------------------------------------

I also ran this on a row limited version of this dataset. The first 22 tasks completed immediately (since there was no data to process). The final task recorded 13MB of shuffle read, (out of ~300MB of shuffle write in the previous stage), at which point it then stalled.


was (Author: iamthomaspowell):
I also ran this on a row limited version of this dataset. The first 22 tasks completed immediately (since there was no data to process). The final task recorded 32MB of shuffle read, (out of ~300MB of shuffle write in the previous stage), at which point it then stalled.

> Spark job hangs when using dapply
> ---------------------------------
>
>                 Key: SPARK-17634
>                 URL: https://issues.apache.org/jira/browse/SPARK-17634
>             Project: Spark
>          Issue Type: Bug
>          Components: SparkR
>    Affects Versions: 2.0.0
>            Reporter: Thomas Powell
>            Priority: Critical
>
> I'm running into an issue when using dapply on yarn. I have a data frame backed by files in parquet with around 200 files that is around 2GB. When I load this in with the new partition coalescing it ends up having around 20 partitions so each one roughly 100MB. The data frame itself has 4 columns of integers and doubles. If I run a count over this things work fine.
> However, if I add a {{dapply}} in between the read and the {{count}} that just uses an identity function the tasks hang and make no progress. Both the R and Java processes are running on the Spark nodes and are listening on the {{SPARKR_WORKER_PORT}}.
> {{result <- dapply(df, function(x){x}, SparkR::schema(df))}}
> I took a jstack of the Java process and see that it is just listening on the socket but never seems to make any progress. The R process is harder to debug what it is doing.
> {code}
> Thread 112823: (state = IN_NATIVE)
>  - java.net.SocketInputStream.socketRead0(java.io.FileDescriptor, byte[], int, int, int) @bci=0 (Interpreted frame)
>  - java.net.SocketInputStream.socketRead(java.io.FileDescriptor, byte[], int, int, int) @bci=8, line=116 (Interpreted frame)
>  - java.net.SocketInputStream.read(byte[], int, int, int) @bci=79, line=170 (Interpreted frame)
>  - java.net.SocketInputStream.read(byte[], int, int) @bci=11, line=141 (Interpreted frame)
>  - java.io.BufferedInputStream.fill() @bci=214, line=246 (Interpreted frame)
>  - java.io.BufferedInputStream.read() @bci=12, line=265 (Compiled frame)
>  - java.io.DataInputStream.readInt() @bci=4, line=387 (Compiled frame)
>  - org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read() @bci=4, line=212 (Interpreted frame)
>  - org.apache.spark.api.r.RRunner$$anon$1.<init>(org.apache.spark.api.r.RRunner) @bci=25, line=96 (Interpreted frame)
>  - org.apache.spark.api.r.RRunner.compute(scala.collection.Iterator, int) @bci=109, line=87 (Interpreted frame)
>  - org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(scala.collection.Iterator) @bci=322, line=59 (Interpreted frame)
>  - org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(java.lang.Object) @bci=5, line=29 (Interpreted frame)
>  - org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(scala.collection.Iterator) @bci=59, line=178 (Interpreted frame)
>  - org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(java.lang.Object) @bci=5, line=175 (Interpreted frame)
>  - org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(org.apache.spark.TaskContext, int, scala.collection.Iterator) @bci=8, line=784 (Interpreted frame)
>  - org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(java.lang.Object, java.lang.Object, java.lang.Object) @bci=13, line=784 (Interpreted frame)
>  - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=27, line=38 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame)
>  - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame)
>  - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame)
>  - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=168, line=79 (Interpreted frame)
>  - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=2, line=47 (Interpreted frame)
>  - org.apache.spark.scheduler.Task.run(long, int, org.apache.spark.metrics.MetricsSystem) @bci=82, line=85 (Interpreted frame)
>  - org.apache.spark.executor.Executor$TaskRunner.run() @bci=374, line=274 (Interpreted frame)
>  - java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker) @bci=95, line=1142 (Interpreted frame)
>  - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=617 (Interpreted frame)
>  - java.lang.Thread.run() @bci=11, line=745 (Interpreted frame)
> {code}
> Any recommendations on how best to debug? Nothing appears in the logs since the processes don't actually fail. The executors themselves have 4GB of memory which should be more than enough.
> My feeling is this could be something around serialization?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org