You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by YanTangZhai <gi...@git.apache.org> on 2014/07/23 16:04:58 UTC

[GitHub] spark pull request: [SPARK-2647] DAGScheduler plugs other JobSubmi...

GitHub user YanTangZhai opened a pull request:

    https://github.com/apache/spark/pull/1548

    [SPARK-2647] DAGScheduler plugs other JobSubmitted events when processing one JobSubmitted event

    If a few of jobs are submitted, DAGScheduler plugs other JobSubmitted events when processing one JobSubmitted event.
    For example ont JobSubmitted event is processed as follows and costs much time
    "spark-akka.actor.default-dispatcher-67" daemon prio=10 tid=0x00007f75ec001000 nid=0x7dd6 in Object.wait() [0x00007f76063e1000]
    java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Object.java:503)
    at org.apache.hadoopcdh3.ipc.Client.call(Client.java:1130)
    locked <0x0000000783b17330> (a org.apache.hadoopcdh3.ipc.Client$Call)
    at org.apache.hadoopcdh3.ipc.RPC$Invoker.invoke(RPC.java:241)
    at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
    at sun.reflect.GeneratedMethodAccessor86.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoopcdh3.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:83)
    at org.apache.hadoopcdh3.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:60)
    at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
    at org.apache.hadoopcdh3.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1472)
    at org.apache.hadoopcdh3.hdfs.DFSClient.getBlockLocations(DFSClient.java:1498)
    at org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem$1.doCall(Cdh3DistributedFileSystem.java:208)
    at org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem$1.doCall(Cdh3DistributedFileSystem.java:204)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem.getFileBlockLocations(Cdh3DistributedFileSystem.java:204)
    at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1812)
    at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1797)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:233)
    at StorageEngineClient.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:141)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:172)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
    at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:54)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:54)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:54)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:197)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:272)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:269)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:269)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:274)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:269)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:269)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:274)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:269)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:269)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:274)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:269)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:269)
    at org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:279)
    at org.apache.spark.scheduler.DAGScheduler.newStage(DAGScheduler.scala:219)
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:676)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1180)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    The other JobSubmitted event is hung as follows:
    "pool-8-thread-31" prio=10 tid=0x00007f78a8287800 nid=0x8cc in Object.wait() [0x00007f7585654000]
    java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Object.java:503)
    at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
    locked <0x00000007a59453d8> (a org.apache.spark.scheduler.JobWaiter)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:451)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1048)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1066)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1087)
    at shark.execution.FileSinkOperator.execute(FileSinkOperator.scala:165)
    at shark.execution.SparkTask.execute(SparkTask.scala:99)
    at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:144)
    at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:57)
    at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1362)
    at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1146)
    at org.apache.hadoop.hive.ql.Driver.run(Driver.java:952)
    at shark.SharkServerHandler.execute(SharkServer.scala:284)
    at shark.GatedSharkServerHandler.execute(SharkServer.scala:240)
    at org.apache.hadoop.hive.service.ThriftHive$Processor$execute.getResult(ThriftHive.java:644)
    at org.apache.hadoop.hive.service.ThriftHive$Processor$execute.getResult(ThriftHive.java:628)
    at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
    at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
    at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:206)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
    I think DAGScheduler could use one thread to handleJobSubmitted.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/YanTangZhai/spark SPARK-2647

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/1548.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1548
    
----
commit f3b5566ef0f04d5f2b42f7fc1dc41d3fcc69af8b
Author: YanTangZhai <ha...@tencent.com>
Date:   2014-07-23T14:01:58Z

    Update DAGScheduler.scala

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2647] DAGScheduler plugs other JobSubmi...

Posted by YanTangZhai <gi...@git.apache.org>.
Github user YanTangZhai commented on the pull request:

    https://github.com/apache/spark/pull/1548#issuecomment-50292640
  
    @markhamstra Ok. Thank you very much.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2647] DAGScheduler plugs other JobSubmi...

Posted by YanTangZhai <gi...@git.apache.org>.
Github user YanTangZhai commented on the pull request:

    https://github.com/apache/spark/pull/1548#issuecomment-50000727
  
    Hi @markhamstra , you are right. I will think of other ways to solve this problem. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2647] DAGScheduler plugs other JobSubmi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1548#issuecomment-49878116
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2647] DAGScheduler plugs other JobSubmi...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on the pull request:

    https://github.com/apache/spark/pull/1548#issuecomment-50174114
  
    @YanTangZhai If you are searching for another solution and abandoning this PR, could you please close this PR and open a new one when you have something different for us to look at?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2647] DAGScheduler plugs other JobSubmi...

Posted by YanTangZhai <gi...@git.apache.org>.
Github user YanTangZhai closed the pull request at:

    https://github.com/apache/spark/pull/1548


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2647] DAGScheduler plugs other JobSubmi...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1548#discussion_r15289573
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1202,8 +1202,12 @@ private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGSchedule
        */
       def receive = {
         case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
    -      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
    -        listener, properties)
    +      new Thread("JobSubmitted for " + jobId) {
    +        override def run() {
    +          dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
    +            listener, properties)
    +        }
    +      }.start()
    --- End diff --
    
    Starting a new thread within eventProcessorActor to concurrently manipulate the DAGScheduler's state strikes me as a really bad idea.  The actor model and the DAGScheduler are built around the fundamental assumption that only one event will be processed at a time.  Breaking that model opens us up to numerous potential race conditions, and I am a long way from being convinced that this PR is either safe of desirable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---