You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ravindra <ra...@gmail.com> on 2017/03/24 10:40:45 UTC

Spark 2.0.2 : Hang at "org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:623)"

Hi All,

My Spark job hangs here... Looking into the thread dump I noticed that it
hangs here (stack trace given below) on the count action on dataframe
(given below). Data is very small. Its actually not more than even 10 rows.

I noticed some JIRAs about this issue but all are resolved-closed in
previous versions.

Its running with 1 executor. Also noticed that the Storage tab is empty so
no dataframe is cached.

Looking into the DAGScheduler, I notice its stuck at runJob, probably its
trying to run tasks concurrently and waiting here

sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153)
*org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:623)*
org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1913)
org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
org.apache.spark.rdd.RDD.collect(RDD.scala:911)
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290)
org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
org.apache.spark.sql.Dataset.org
$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
org.apache.spark.sql.Dataset.org
$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2227)
org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2226)
org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2559)
org.apache.spark.sql.Dataset.count(Dataset.scala:2226)


Spark Properties
NameValue
spark.app.id local-1490350724879
spark.app.name TestJob
spark.default.parallelism 1
spark.driver.allowMultipleContexts true
spark.driver.host localhost
spark.driver.memory 4g
spark.driver.port 63405
spark.executor.id driver
spark.executor.memory 4g
spark.hadoop.validateOutputSpecs false
spark.master local[2]
spark.scheduler.mode FIFO
spark.sql.catalogImplementation hive
spark.sql.crossJoin.enabled true
spark.sql.shuffle.partitions 1
spark.sql.warehouse.dir /tmp/hive/spark-warehouse
spark.ui.enabled true
spark.yarn.executor.memoryOverhead 2048

Its count action on the given below dataframe -
== Parsed Logical Plan ==
Project [hash_composite_keys#27440L, id#27441, name#27442, master#27439,
created_time#27448]
+- Filter (rn#27493 = 1)
   +- Project [hash_composite_keys#27440L, id#27441, name#27442,
master#27439, created_time#27448, rn#27493]
      +- Project [hash_composite_keys#27440L, id#27441, name#27442,
master#27439, created_time#27448, rn#27493, rn#27493]
         +- Window [rownumber()
windowspecdefinition(hash_composite_keys#27440L, created_time#27448 ASC,
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rn#27493],
[hash_composite_keys#27440L], [created_time#27448 ASC]
            +- Project [hash_composite_keys#27440L, id#27441, name#27442,
master#27439, created_time#27448]
               +- Union
                  :- Project [hash_composite_keys#27440L, id#27441,
name#27442, master#27439, cast(0 as timestamp) AS created_time#27448]
                  :  +- Project [hash_composite_keys#27440L, id#27441,
name#27442, master#27439]
                  :     +- SubqueryAlias table1
                  :        +-
Relation[hash_composite_keys#27440L,id#27441,name#27442,master#27439]
parquet
                  +- Project [hash_composite_keys#27391L, id#27397,
name#27399, master#27401, created_time#27414]
                     +- Project [id#27397, name#27399,
hash_composite_keys#27391L, master#27401, cast(if
(isnull(created_time#27407L)) null else UDF(created_time#27407L) as
timestamp) AS created_time#27414]
                        +- Project [id#27397, name#27399,
hash_composite_keys#27391L, master#27401, 1490350732895 AS
created_time#27407L]
                           +- Project [id#27397, name#27399,
hash_composite_keys#27391L, master AS master#27401]
                              +- Aggregate [hash_composite_keys#27391L],
[first(id#27389, false) AS id#27397, first(name#27390, false) AS
name#27399, hash_composite_keys#27391L]
                                 +-
Relation[id#27389,name#27390,hash_composite_keys#27391L] parquet

Re: Spark 2.0.2 : Hang at "org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:623)"

Posted by Ravindra <ra...@gmail.com>.
Also noticed that there are 8 - "dispatcher-event-loop-0 .... 7" and 8 -
"map-output-dispatcher-0 .... 7" all waiting at the same location in the
code that is -
*sun.misc.Unsafe.park(Native Method)*
*java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)*
*java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)*
*java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)*
org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:338)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

So clearly there is a race condition. May be only option is to avoid it...
but how ??


On Fri, Mar 24, 2017 at 5:40 PM Ravindra <ra...@gmail.com> wrote:

> Hi All,
>
> My Spark job hangs here... Looking into the thread dump I noticed that it
> hangs here (stack trace given below) on the count action on dataframe
> (given below). Data is very small. Its actually not more than even 10 rows.
>
> I noticed some JIRAs about this issue but all are resolved-closed in
> previous versions.
>
> Its running with 1 executor. Also noticed that the Storage tab is empty so
> no dataframe is cached.
>
> Looking into the DAGScheduler, I notice its stuck at runJob, probably its
> trying to run tasks concurrently and waiting here
>
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
> scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153)
> *org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:623)*
> org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)
> org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)
> org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)
> org.apache.spark.SparkContext.runJob(SparkContext.scala:1913)
> org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)
>
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
> org.apache.spark.rdd.RDD.collect(RDD.scala:911)
>
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290)
>
> org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
>
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
> org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
> org.apache.spark.sql.Dataset.org
> $apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
> org.apache.spark.sql.Dataset.org
> $apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
> org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2227)
> org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2226)
> org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2559)
> org.apache.spark.sql.Dataset.count(Dataset.scala:2226)
>
>
> Spark Properties
> NameValue
> spark.app.id local-1490350724879
> spark.app.name TestJob
> spark.default.parallelism 1
> spark.driver.allowMultipleContexts true
> spark.driver.host localhost
> spark.driver.memory 4g
> spark.driver.port 63405
> spark.executor.id driver
> spark.executor.memory 4g
> spark.hadoop.validateOutputSpecs false
> spark.master local[2]
> spark.scheduler.mode FIFO
> spark.sql.catalogImplementation hive
> spark.sql.crossJoin.enabled true
> spark.sql.shuffle.partitions 1
> spark.sql.warehouse.dir /tmp/hive/spark-warehouse
> spark.ui.enabled true
> spark.yarn.executor.memoryOverhead 2048
>
> Its count action on the given below dataframe -
> == Parsed Logical Plan ==
> Project [hash_composite_keys#27440L, id#27441, name#27442, master#27439,
> created_time#27448]
> +- Filter (rn#27493 = 1)
>    +- Project [hash_composite_keys#27440L, id#27441, name#27442,
> master#27439, created_time#27448, rn#27493]
>       +- Project [hash_composite_keys#27440L, id#27441, name#27442,
> master#27439, created_time#27448, rn#27493, rn#27493]
>          +- Window [rownumber()
> windowspecdefinition(hash_composite_keys#27440L, created_time#27448 ASC,
> ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rn#27493],
> [hash_composite_keys#27440L], [created_time#27448 ASC]
>             +- Project [hash_composite_keys#27440L, id#27441, name#27442,
> master#27439, created_time#27448]
>                +- Union
>                   :- Project [hash_composite_keys#27440L, id#27441,
> name#27442, master#27439, cast(0 as timestamp) AS created_time#27448]
>                   :  +- Project [hash_composite_keys#27440L, id#27441,
> name#27442, master#27439]
>                   :     +- SubqueryAlias table1
>                   :        +-
> Relation[hash_composite_keys#27440L,id#27441,name#27442,master#27439]
> parquet
>                   +- Project [hash_composite_keys#27391L, id#27397,
> name#27399, master#27401, created_time#27414]
>                      +- Project [id#27397, name#27399,
> hash_composite_keys#27391L, master#27401, cast(if
> (isnull(created_time#27407L)) null else UDF(created_time#27407L) as
> timestamp) AS created_time#27414]
>                         +- Project [id#27397, name#27399,
> hash_composite_keys#27391L, master#27401, 1490350732895 AS
> created_time#27407L]
>                            +- Project [id#27397, name#27399,
> hash_composite_keys#27391L, master AS master#27401]
>                               +- Aggregate [hash_composite_keys#27391L],
> [first(id#27389, false) AS id#27397, first(name#27390, false) AS
> name#27399, hash_composite_keys#27391L]
>                                  +-
> Relation[id#27389,name#27390,hash_composite_keys#27391L] parquet
>
>
>
>