You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Shivaram Venkataraman (JIRA)" <ji...@apache.org> on 2016/10/04 22:33:20 UTC

[jira] [Commented] (SPARK-17634) Spark job hangs when using dapply

    [ https://issues.apache.org/jira/browse/SPARK-17634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15546871#comment-15546871 ] 

Shivaram Venkataraman commented on SPARK-17634:
-----------------------------------------------

[~iamthomaspowell] Thanks for the performance profiling and it would be great to have a PR with the code change you have above. I think there are a couple of issues that I would like to understand better:

(a) The base cost of 2 seconds for any num rows < 10000: Does this come from launching the R process or is there some other bottleneck here ? Given that calls to readTypedObject and readType are called linear number of times, I think this either comes down to memory allocation or something like that ?

(b) Slowing down of reads as num rows increases: Does this problem get addressed by your change ? Other than the memory allocation bit I think this could also happen if the OS is out of memory and the R process is thrashing or something like that.

> Spark job hangs when using dapply
> ---------------------------------
>
>                 Key: SPARK-17634
>                 URL: https://issues.apache.org/jira/browse/SPARK-17634
>             Project: Spark
>          Issue Type: Bug
>          Components: SparkR
>    Affects Versions: 2.0.0
>            Reporter: Thomas Powell
>            Priority: Critical
>
> I'm running into an issue when using dapply on yarn. I have a data frame backed by files in parquet with around 200 files that is around 2GB. When I load this in with the new partition coalescing it ends up having around 20 partitions so each one roughly 100MB. The data frame itself has 4 columns of integers and doubles. If I run a count over this things work fine.
> However, if I add a {{dapply}} in between the read and the {{count}} that just uses an identity function the tasks hang and make no progress. Both the R and Java processes are running on the Spark nodes and are listening on the {{SPARKR_WORKER_PORT}}.
> {{result <- dapply(df, function(x){x}, SparkR::schema(df))}}
> I took a jstack of the Java process and see that it is just listening on the socket but never seems to make any progress. The R process is harder to debug what it is doing.
> {code}
> Thread 112823: (state = IN_NATIVE)
>  - java.net.SocketInputStream.socketRead0(java.io.FileDescriptor, byte[], int, int, int) @bci=0 (Interpreted frame)
>  - java.net.SocketInputStream.socketRead(java.io.FileDescriptor, byte[], int, int, int) @bci=8, line=116 (Interpreted frame)
>  - java.net.SocketInputStream.read(byte[], int, int, int) @bci=79, line=170 (Interpreted frame)
>  - java.net.SocketInputStream.read(byte[], int, int) @bci=11, line=141 (Interpreted frame)
>  - java.io.BufferedInputStream.fill() @bci=214, line=246 (Interpreted frame)
>  - java.io.BufferedInputStream.read() @bci=12, line=265 (Compiled frame)
>  - java.io.DataInputStream.readInt() @bci=4, line=387 (Compiled frame)
>  - org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read() @bci=4, line=212 (Interpreted frame)
>  - org.apache.spark.api.r.RRunner$$anon$1.<init>(org.apache.spark.api.r.RRunner) @bci=25, line=96 (Interpreted frame)
>  - org.apache.spark.api.r.RRunner.compute(scala.collection.Iterator, int) @bci=109, line=87 (Interpreted frame)
>  - org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(scala.collection.Iterator) @bci=322, line=59 (Interpreted frame)
>  - org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(java.lang.Object) @bci=5, line=29 (Interpreted frame)
>  - org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(scala.collection.Iterator) @bci=59, line=178 (Interpreted frame)
>  - org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(java.lang.Object) @bci=5, line=175 (Interpreted frame)
>  - org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(org.apache.spark.TaskContext, int, scala.collection.Iterator) @bci=8, line=784 (Interpreted frame)
>  - org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(java.lang.Object, java.lang.Object, java.lang.Object) @bci=13, line=784 (Interpreted frame)
>  - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=27, line=38 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame)
>  - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame)
>  - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame)
>  - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=168, line=79 (Interpreted frame)
>  - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=2, line=47 (Interpreted frame)
>  - org.apache.spark.scheduler.Task.run(long, int, org.apache.spark.metrics.MetricsSystem) @bci=82, line=85 (Interpreted frame)
>  - org.apache.spark.executor.Executor$TaskRunner.run() @bci=374, line=274 (Interpreted frame)
>  - java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker) @bci=95, line=1142 (Interpreted frame)
>  - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=617 (Interpreted frame)
>  - java.lang.Thread.run() @bci=11, line=745 (Interpreted frame)
> {code}
> Any recommendations on how best to debug? Nothing appears in the logs since the processes don't actually fail. The executors themselves have 4GB of memory which should be more than enough.
> My feeling is this could be something around serialization?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org