You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by qqsun8819 <gi...@git.apache.org> on 2014/03/24 07:33:20 UTC

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

GitHub user qqsun8819 opened a pull request:

    https://github.com/apache/spark/pull/214

    [SPARK-1141] [WIP] Parallelize Task Serialization 

    https://spark-project.atlassian.net/browse/SPARK-1141
    @kayousterhout
    copied from JIRA(design doc in JIRA is old, I'll update it later)
    TaskSetManager.resourceOffer will return a TaskDescWithoutSerializeTask object , this object will be a half-copy of TaskDescrption exception _serializedTask ByteBffer, instead, it will contain a Task object and seriailze part inside TaskSetManager.resourceOffer will be moved to TaskSchedulerImpl's "Runnable" working thread which will be placed inside threadpool.
    
    DriverSuite failed in my own env. Working on fixing

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/qqsun8819/spark task-serialize

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/214.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #214
    
----
commit 53795965dd16c54a4981ef4ee754f326663f9795
Author: Ouyang Jin <ji...@alibaba-inc.com>
Date:   2014-03-16T15:57:43Z

    Initial version of Parallelize Task Serialization in dev code, but this version has a chance to hang in multi-task execution and needs debug

commit 0bb37447d403c63b21b06cf15a612eb363c701da
Author: OuYang Jin <ji...@alibaba-inc.com>
Date:   2014-03-23T14:47:56Z

    Merge remote-tracking branch 'upstream/master' into task-serialize

commit 177195d20ddef34d339f6385d50382944c9c149d
Author: OuYang Jin <ji...@alibaba-inc.com>
Date:   2014-03-24T06:16:27Z

    Modify asychroniazed sleep wait to pass job running case

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by qqsun8819 <gi...@git.apache.org>.
Github user qqsun8819 commented on the pull request:

    https://github.com/apache/spark/pull/214#issuecomment-38651743
  
    @CodingCat @kayousterhout @mridulm Thanks very much for your review. 
    I think @kayousterhout  state clear in her last two comments what the ideal implementation looks like . And I know what you what this function to be like. So I have no doubt to this
    I'll update my patch according to this.Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request #214: [SPARK-1141] [WIP] Parallelize Task Serialization

Posted by pzz2011 <gi...@git.apache.org>.
Github user pzz2011 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r75575806
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -275,9 +275,9 @@ private[spark] class TaskSchedulerImpl(
           } while (launchedTask)
         }
     
    -    while(!serializingTask.isEmpty) {
    -      Thread.sleep(1000)
    -    }
    +    do {
    +      Thread.sleep(1)
    +    } while(!serializingTask.isEmpty) 
    --- End diff --
    
    excuse me, why did you change here? I cannot understand what's the difference @qqsun8819 (may be you have forget it,hahhh


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by qqsun8819 <gi...@git.apache.org>.
Github user qqsun8819 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r10918762
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -198,6 +201,13 @@ private[spark] class TaskSchedulerImpl(
        */
       def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
         SparkEnv.set(sc.env)
    +    // Make thread pool local for shutdown before the function returns
    +    // This is for driver can exit normally which not call sc.stop or sys.exit
    +    val serializeWorkerPool = new ThreadPoolExecutor(
    --- End diff --
    
    org.apache.spark.DriverSuite only have one case it it...you can check the source code in core/src/test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r11102892
  
    --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
    @@ -149,6 +151,21 @@ private[spark] object Utils extends Logging {
         buf
       }
     
    +  def serializeTask(taskNoSer: TaskDescWithoutSerializedTask, sc: SparkContext,
    +                    serializer: SerializerInstance) : TaskDescription = {
    +    val startTime = System.currentTimeMillis()
    +    // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
    --- End diff --
    
    I checked the DAGScheduler code, L793, I think it's pretty straightforward to move that functionality to here, as you will have an unified implementation of SerializerAndRunner for all scheduler backend....https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L770, that line is actually just for checking if the task is serializable......


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by qqsun8819 <gi...@git.apache.org>.
Github user qqsun8819 commented on the pull request:

    https://github.com/apache/spark/pull/214#issuecomment-39079998
  
    @CodingCat Thanks very much for your review.I found out that you main concern concentrate on two points:1 Merge the two SerializerRunner in two scheduler backend into one source file. 2. Move the preempt serialization try catch logic in DAGScheduler into real serialization  util method. 
    Actually the first point came out to me when I write out this patch . But at last I didn't do it because actually they do different things, and the real serialization part is pulled out to Utils . So..  But I'll double check it again
    For the second point , actually preempt serialization logic stayed in DAGScheduler for a long time (as far as I first met spark source code), so I don't know whether there is risk or other problems for scheduler. So I'll check the code again, and ,always , welcome any other review advice 
    For time limit, I didn't write test case for this new feature. And I welcome any test design advice .Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by qqsun8819 <gi...@git.apache.org>.
Github user qqsun8819 commented on the pull request:

    https://github.com/apache/spark/pull/214#issuecomment-38463531
  
    Fix DriverSuite case fail 
    put threadpool inside resourceoffer and shutdown it before it return
    some other fix according to @CodingCat 's review 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/214


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by qqsun8819 <gi...@git.apache.org>.
Github user qqsun8819 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r10888611
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -30,6 +30,9 @@ import scala.util.Random
     import org.apache.spark._
     import org.apache.spark.TaskState.TaskState
     import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
    +import org.apache.spark.util.Utils
    +import scala.collection.mutable
    --- End diff --
    
    sorry for code style here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r11102737
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala ---
    @@ -62,6 +65,30 @@ private[spark] class MesosSchedulerBackend(
     
       var classLoader: ClassLoader = null
     
    +  val serializeWorkerPool = Utils.newDaemonFixedThreadPool(
    +    scheduler.sc.conf.getInt("spark.scheduler.task.serialize.threads", 4), "task-serialization")
    +
    +  val env = SparkEnv.get
    +  protected val serializer = new ThreadLocal[SerializerInstance] {
    +    override def initialValue(): SerializerInstance = {
    +      env.closureSerializer.newInstance()
    +    }
    +  }
    +
    +  class TaskMesosSerializedRunner(taskNoSer: TaskDescWithoutSerializedTask,
    --- End diff --
    
    I just noticed that @mridulm proposed the same thing in his last comment, I think it will be great if we can do in that way


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r10949633
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -219,18 +229,40 @@ private[spark] class TaskSchedulerImpl(
             taskSet.parent.name, taskSet.name, taskSet.runningTasks))
         }
     
    +    val ser = SparkEnv.get.closureSerializer.newInstance()
         // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
         // of locality levels so that it gets a chance to launch local tasks on all of them.
         var launchedTask = false
    +    val serializingTaskNum = new AtomicLong(0)
         for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
           do {
             launchedTask = false
             for (i <- 0 until shuffledOffers.size) {
               val execId = shuffledOffers(i).executorId
               val host = shuffledOffers(i).host
    -          for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) {
    -            tasks(i) += task
    -            val tid = task.taskId
    +          for (taskDesc <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) {
    +            serializingTaskNum.getAndIncrement()
    +            serializeWorkerPool.execute(new Runnable {
    +              override def run() {
    +                // Serialize and return the task
    +                val startTime = System.currentTimeMillis()
    +                // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
    +                // we assume the task can be serialized without exceptions.
    +                val serializedTask = Task.serializeWithDependencies(
    +                  taskDesc.taskObject, sc.addedFiles, sc.addedJars, ser)
    +                val timeTaken = System.currentTimeMillis() - startTime
    +                logInfo("Serialized task %s as %d bytes in %d ms".format(
    +                  taskDesc.taskName, serializedTask.limit, timeTaken))
    +                val task = new TaskDescription(taskDesc.taskId, taskDesc.executorId,
    +                    taskDesc.taskName, taskDesc.index, serializedTask)
    +                tasks.synchronized {
    --- End diff --
    
    hmm, since it is running in a different thread, no need to set spark env, etc ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the pull request:

    https://github.com/apache/spark/pull/214#issuecomment-38612685
  
    @kayousterhout the backend assumes that there is only a single thread which is executing inside the actor at a given point of time. We will be changing this assumption.
    For example, updating freeCores (and other manipulations of freeCores/etc when responses come into the actor).
    This change will mean we will need to break that implicit assumption.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r10884300
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -219,18 +226,43 @@ private[spark] class TaskSchedulerImpl(
             taskSet.parent.name, taskSet.name, taskSet.runningTasks))
         }
     
    +    val ser = SparkEnv.get.closureSerializer.newInstance()
         // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
         // of locality levels so that it gets a chance to launch local tasks on all of them.
         var launchedTask = false
    +    val serializingTask = new HashSet[Long]
    --- End diff --
    
    as you are not fetching serialize task from this HashSet, but just use taskDesc in L250, can we just replace this with an integer, in L280, when this integer is not zero, keep sleeping.....


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/214#issuecomment-38881858
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the pull request:

    https://github.com/apache/spark/pull/214#issuecomment-38612977
  
    freeCores would need to be updated by the makeOffers() method and before
    the tasks get serialized (otherwise we can have race conditions where we
    assign the same core to multiple tasks).  I don't think the thread that
    serializes and launches tasks needs to access any internal state of the
    CGSB, so I don't think this is an issue.
    
    
    On Tue, Mar 25, 2014 at 12:57 PM, Mridul Muralidharan <
    notifications@github.com> wrote:
    
    > @kayousterhout <https://github.com/kayousterhout> the backend assumes
    > that there is only a single thread which is executing inside the actor at a
    > given point of time. We will be changing this assumption.
    > For example, updating freeCores (and other manipulations of freeCores/etc
    > when responses come into the actor).
    > This change will mean we will need to break that implicit assumption.
    >
    > --
    > Reply to this email directly or view it on GitHub<https://github.com/apache/spark/pull/214#issuecomment-38612685>
    > .
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by qqsun8819 <gi...@git.apache.org>.
Github user qqsun8819 commented on the pull request:

    https://github.com/apache/spark/pull/214#issuecomment-39048706
  
    patch updated 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by qqsun8819 <gi...@git.apache.org>.
Github user qqsun8819 commented on the pull request:

    https://github.com/apache/spark/pull/214#issuecomment-38532473
  
    Thanks for your advice @kayousterhout   . And my understand for what you mean is create a TaskResultGetter-like class, and this class main a threadpool inside it , and everytime resourceOffer gets a task to serialize , it just equeue it ,and when serialization finished, it just call TaskScheduler to handle .For doing this, this two class just achieve asynchronous status. Is that what you mean? If you think so , I can update my patch according to this 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by qqsun8819 <gi...@git.apache.org>.
Github user qqsun8819 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r10888735
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -93,6 +96,10 @@ private[spark] class TaskSchedulerImpl(
       val mapOutputTracker = SparkEnv.get.mapOutputTracker
     
       var schedulableBuilder: SchedulableBuilder = null
    +
    +  private val serializeWorkerPool = new ThreadPoolExecutor(20, 60, 60, TimeUnit.SECONDS,
    --- End diff --
    
    Thank for pulling these! I fix these argument for convenience of debug. I'll replace it with conf


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r11102691
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala ---
    @@ -62,6 +65,30 @@ private[spark] class MesosSchedulerBackend(
     
       var classLoader: ClassLoader = null
     
    +  val serializeWorkerPool = Utils.newDaemonFixedThreadPool(
    +    scheduler.sc.conf.getInt("spark.scheduler.task.serialize.threads", 4), "task-serialization")
    +
    +  val env = SparkEnv.get
    +  protected val serializer = new ThreadLocal[SerializerInstance] {
    +    override def initialValue(): SerializerInstance = {
    +      env.closureSerializer.newInstance()
    +    }
    +  }
    +
    +  class TaskMesosSerializedRunner(taskNoSer: TaskDescWithoutSerializedTask,
    --- End diff --
    
    Is it possible to merge  TaskMesosSerializedRunner and TaskCGSerializedRunner in the same class, or at least declare them in the same source file, so that we don't need to walk through multiple files in future updates?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the pull request:

    https://github.com/apache/spark/pull/214#issuecomment-38613780
  
    In coarse grained scheduler, freeCores is updated once the task desc's are returned - in launchTasks; and expected to be used within the actor thread (so MT-unsafe).
    Also, it is private (and impl detail) of the scheduler backend; task scheduler should be agnostic to it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r10884645
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -30,6 +30,9 @@ import scala.util.Random
     import org.apache.spark._
     import org.apache.spark.TaskState.TaskState
     import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
    +import org.apache.spark.util.Utils
    +import scala.collection.mutable
    --- End diff --
    
    Do you mind adjusting the import statements order, see Contributing to Spark wiki page


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r10884928
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -219,18 +226,43 @@ private[spark] class TaskSchedulerImpl(
             taskSet.parent.name, taskSet.name, taskSet.runningTasks))
         }
     
    +    val ser = SparkEnv.get.closureSerializer.newInstance()
         // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
         // of locality levels so that it gets a chance to launch local tasks on all of them.
         var launchedTask = false
    +    val serializingTask = new HashSet[Long]
    --- End diff --
    
    just take care of the concurrency issue here....


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the pull request:

    https://github.com/apache/spark/pull/214#issuecomment-61938307
  
    Hi @qqsun8819, as Matei mentioned, Spark now broadcasts RDD objects, so it's very unlikely for task serialization to become a bottleneck.  I closed the associated JIRA -- would you mind closing this pull request?
    
    Thanks for your work on this and sorry that it didn't end up being needed!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r10884042
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -93,6 +96,10 @@ private[spark] class TaskSchedulerImpl(
       val mapOutputTracker = SparkEnv.get.mapOutputTracker
     
       var schedulableBuilder: SchedulableBuilder = null
    +
    +  private val serializeWorkerPool = new ThreadPoolExecutor(20, 60, 60, TimeUnit.SECONDS,
    --- End diff --
    
    also KeepAliveTime


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/214#issuecomment-38416907
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r10949757
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -198,6 +201,13 @@ private[spark] class TaskSchedulerImpl(
        */
       def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
         SparkEnv.set(sc.env)
    +    // Make thread pool local for shutdown before the function returns
    +    // This is for driver can exit normally which not call sc.stop or sys.exit
    +    val serializeWorkerPool = new ThreadPoolExecutor(
    +      conf.getInt("spark.scheduler.task.serialize.threads.min", 20),
    +      conf.getInt("spark.scheduler.task.serialize.threads.max", 60),
    +      conf.getInt("spark.scheduler.task.serialize.threads.keepalive", 60), TimeUnit.SECONDS,
    +      new LinkedBlockingDeque[Runnable]())
    --- End diff --
    
    Maybe do this only if there is expected to be an overhead ? Say number of tasks which can be submitted > some threshold ?
    Also, move the pool out of the method as others have suggested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the pull request:

    https://github.com/apache/spark/pull/214#issuecomment-38615338
  
    Ah, I see what you mean - pull all of the logic within successful schedule of resourceOffer into the caller.
    Yeah, that should work fine (with the caveat of setting SparkEnv in the threadpool thead :-) )
    Neat, I like the proposal.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r11102844
  
    --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
    @@ -149,6 +151,21 @@ private[spark] object Utils extends Logging {
         buf
       }
     
    +  def serializeTask(taskNoSer: TaskDescWithoutSerializedTask, sc: SparkContext,
    +                    serializer: SerializerInstance) : TaskDescription = {
    +    val startTime = System.currentTimeMillis()
    +    // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
    --- End diff --
    
    is it difficult to capture the non-serializable objects here? as @kayousterhout said in one of my PRs, DAGScheduler knows too much about task-level details now...maybe we can do something to make the situation better.....


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the pull request:

    https://github.com/apache/spark/pull/214#issuecomment-38609414
  
    I thought about this a bit more and I think it makes sense to do something similar to what @CodingCat suggested: in CoarseGrainedSchedulerBackend, when we call scheduler.resourceOffers(), we get back a list of tasks but where we don't have the serialized task yet.  At this point, the makeOffers() method passes the tasks off to a thread pool that serializes the task and then launches it on the appropriate executor.  We could pass the task and executor actor to the thread pool, so that it doesn't have to synchronize on the CGSB at all (and this way we don't need to add a new message to CGSB).  Does this make sense / seem reasonable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r10884189
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -243,12 +275,18 @@ private[spark] class TaskSchedulerImpl(
           } while (launchedTask)
         }
     
    +    do {
    +      Thread.sleep(1)
    +    } while(!serializingTask.isEmpty) 
    +
         if (tasks.size > 0) {
           hasLaunchedTask = true
         }
         return tasks
       }
     
    +
    --- End diff --
    
    extra line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by qqsun8819 <gi...@git.apache.org>.
Github user qqsun8819 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r10918799
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -198,6 +201,13 @@ private[spark] class TaskSchedulerImpl(
        */
       def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
         SparkEnv.set(sc.env)
    +    // Make thread pool local for shutdown before the function returns
    +    // This is for driver can exit normally which not call sc.stop or sys.exit
    +    val serializeWorkerPool = new ThreadPoolExecutor(
    --- End diff --
    
    And for what you mentioned of Backend--Scheduler asynchronous process, I 'm not sure if adding a message here is reasonable. Direct call to scheduler after serialization thread complete also make sense I think. I'll check the code


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r10884024
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -93,6 +96,10 @@ private[spark] class TaskSchedulerImpl(
       val mapOutputTracker = SparkEnv.get.mapOutputTracker
     
       var schedulableBuilder: SchedulableBuilder = null
    +
    +  private val serializeWorkerPool = new ThreadPoolExecutor(20, 60, 60, TimeUnit.SECONDS,
    --- End diff --
    
    Shall we make the ThreadPool corePoolSize and maximumPoolSize configurable? see ConnectionManager implementation...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on the pull request:

    https://github.com/apache/spark/pull/214#issuecomment-38556226
  
    Hey, @qqsun8819 , after the second thought on whether task serialization function should call the  function directly or send a message to the ClusterSchedulerBackend, I think sending a message is a better choice in my view, not only for keeping consistency with the current implementation
    
    if you call the function directly, the thread will only return after the LaunchTask message is sent to the Executor, potentially prolonging the queuing time of other serialize tasks, if you use fixed threadpool (if you use cachedthreadpool, it will slow down the resource release speed). Also, calling the function in multiple external threads may bring more concurrency issues...
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on the pull request:

    https://github.com/apache/spark/pull/214#issuecomment-38533621
  
    Oh, sorry, it's DAG


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the pull request:

    https://github.com/apache/spark/pull/214#issuecomment-38615396
  
    Btw, we might want to make it some util method somewhere - so that the various backends dont need to duplicate this code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the pull request:

    https://github.com/apache/spark/pull/214#issuecomment-38615455
  
    Yeah totally agree about the util method!!
    
    
    On Tue, Mar 25, 2014 at 1:21 PM, Mridul Muralidharan <
    notifications@github.com> wrote:
    
    > Btw, we might want to make it some util method somewhere - so that the
    > various backends dont need to duplicate this code.
    >
    > --
    > Reply to this email directly or view it on GitHub<https://github.com/apache/spark/pull/214#issuecomment-38615396>
    > .
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on the pull request:

    https://github.com/apache/spark/pull/214#issuecomment-39050291
  
    @qqsun8819 Good job, just gave my thoughts on the current solution, I'm actually far from an expert, expecting others' feedback.....


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r11102770
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala ---
    @@ -62,6 +65,30 @@ private[spark] class MesosSchedulerBackend(
     
       var classLoader: ClassLoader = null
     
    +  val serializeWorkerPool = Utils.newDaemonFixedThreadPool(
    +    scheduler.sc.conf.getInt("spark.scheduler.task.serialize.threads", 4), "task-serialization")
    +
    +  val env = SparkEnv.get
    +  protected val serializer = new ThreadLocal[SerializerInstance] {
    +    override def initialValue(): SerializerInstance = {
    +      env.closureSerializer.newInstance()
    +    }
    +  }
    +
    +  class TaskMesosSerializedRunner(taskNoSer: TaskDescWithoutSerializedTask,
    --- End diff --
    
    I'm also have some hesitations about the class name...serializedRunner....any better name? not a native speaker....@kayousterhout @mridulm ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r10918503
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -198,6 +201,13 @@ private[spark] class TaskSchedulerImpl(
        */
       def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
         SparkEnv.set(sc.env)
    +    // Make thread pool local for shutdown before the function returns
    +    // This is for driver can exit normally which not call sc.stop or sys.exit
    +    val serializeWorkerPool = new ThreadPoolExecutor(
    --- End diff --
    
    which case in DriverSuite? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/214#issuecomment-53514576
  
    @qqsun8819 given the recent patch in 1.1 to broadcast RDD objects (and hence not have to serialize them when we send each task), do you think this patch is still needed? Unfortunately it's fallen behind master quite a bit now and I'm not sure it would add a huge performance benefit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the pull request:

    https://github.com/apache/spark/pull/214#issuecomment-38496480
  
    Echoing what @CodingCat said, I think this solution has the same problem that I mentioned in response to your design posted in the JIRA (https://spark-project.atlassian.net/browse/SPARK-1141).  Often, when the cluster is busy, resourceOffer() gets called with just one machine at a time (because all of the machines are busy, and then a single task finished, so resourceOffer() is called for the slot that the task just freed up).  With your solution, in that case, the serialization won't be asynchronous: one of the task set managers will accept the resource offer, and then the DAGScheduler will block waiting for the serialization to finish.  
    
    I think ideally what we'd like here is something similar to TaskResultGetter, which is also used by TaskSchedulerImpl.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r10919090
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -198,6 +201,13 @@ private[spark] class TaskSchedulerImpl(
        */
       def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
         SparkEnv.set(sc.env)
    +    // Make thread pool local for shutdown before the function returns
    +    // This is for driver can exit normally which not call sc.stop or sys.exit
    +    val serializeWorkerPool = new ThreadPoolExecutor(
    --- End diff --
    
    I propose using message just because I saw DAGScheduler uses message to handle everything, and CSB has the similar architecture, an inner actor to handle everything...
    
    I was thinking that ReviveOffer is just to get some workerOffers, a new LaunchTask message is to launch new tasks to the executor, or you can encapsulate the message sending process in a function of CSB....en...the later one seems more consistent with the current architecture....


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r11102700
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala ---
    @@ -29,9 +29,12 @@ import org.apache.mesos.{Scheduler => MScheduler}
     import org.apache.mesos._
     import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
     
    -import org.apache.spark.{Logging, SparkContext, SparkException, TaskState}
    -import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
    +import org.apache.spark.{SparkEnv, Logging, SparkContext, SparkException, TaskState}
    +import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SchedulerBackend, SlaveLost}
    +import org.apache.spark.scheduler.{TaskDescWithoutSerializedTask, TaskDescription, TaskSchedulerImpl, WorkerOffer}
     import org.apache.spark.util.Utils
    +import org.apache.spark.serializer.SerializerInstance
    +
    --- End diff --
    
    extra line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r11102803
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala ---
    @@ -46,6 +47,7 @@ private[spark] class LocalActor(
     
       private val localExecutorId = "localhost"
       private val localExecutorHostname = "localhost"
    +  val ser = scheduler.sc.env.closureSerializer.newInstance()
    --- End diff --
    
    en...can we just make it as private?  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on the pull request:

    https://github.com/apache/spark/pull/214#issuecomment-39083121
  
    Hey @qqsun8819 , Finally find that, there have been some discussions about removing dagScheduler's serializability checking https://github.com/apache/spark/pull/143


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by qqsun8819 <gi...@git.apache.org>.
Github user qqsun8819 commented on the pull request:

    https://github.com/apache/spark/pull/214#issuecomment-38651617
  
    @CodingCat @mridulm @kayousterhout Thanks very much for your review
    I looked through your discussion, and basically understand what you mean .So you all agree on moving the parallelize to Backend's makeOffer(after resourceOffer return a set of task which do not contain serialized task) , and send the message inside thread to launch task . 
    I'll update my patch according to your review. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on the pull request:

    https://github.com/apache/spark/pull/214#issuecomment-38533226
  
    Hi, @kayousterhout, you mean CoarseClusterSchedulerBackend block, instead of DAG?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r10884378
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -219,18 +226,43 @@ private[spark] class TaskSchedulerImpl(
             taskSet.parent.name, taskSet.name, taskSet.runningTasks))
         }
     
    +    val ser = SparkEnv.get.closureSerializer.newInstance()
         // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
         // of locality levels so that it gets a chance to launch local tasks on all of them.
         var launchedTask = false
    +    val serializingTask = new HashSet[Long]
    --- End diff --
    
    oh, some concurrency issue here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by qqsun8819 <gi...@git.apache.org>.
Github user qqsun8819 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r10886619
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -219,18 +226,43 @@ private[spark] class TaskSchedulerImpl(
             taskSet.parent.name, taskSet.name, taskSet.runningTasks))
         }
     
    +    val ser = SparkEnv.get.closureSerializer.newInstance()
         // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
         // of locality levels so that it gets a chance to launch local tasks on all of them.
         var launchedTask = false
    +    val serializingTask = new HashSet[Long]
    --- End diff --
    
    Good point here, I'll try to replace it with AtomicLong


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by qqsun8819 <gi...@git.apache.org>.
Github user qqsun8819 commented on the pull request:

    https://github.com/apache/spark/pull/214#issuecomment-38532790
  
    Also,if we use TaskResultGetter-like mechanism , we can create threapool inside it using FixPool from Util just as ResultGetter does


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by qqsun8819 <gi...@git.apache.org>.
Github user qqsun8819 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r10918103
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -198,6 +201,13 @@ private[spark] class TaskSchedulerImpl(
        */
       def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
         SparkEnv.set(sc.env)
    +    // Make thread pool local for shutdown before the function returns
    +    // This is for driver can exit normally which not call sc.stop or sys.exit
    +    val serializeWorkerPool = new ThreadPoolExecutor(
    --- End diff --
    
    Actually, my first patch just did what you said. But a case called DriverSuite failed because of this, as I stated in my code comments. This case new a SparkContext and didn't call SparkContext.stop, so TaskScheduler.stop has no chance to run, and this threadpool has no chance to be shutdown, and the main thread just can't exit. I also know that new a Threadpool in every call is not that good, but currently I can't come up other good idea. Any other advice?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r10949666
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -243,9 +275,16 @@ private[spark] class TaskSchedulerImpl(
           } while (launchedTask)
         }
     
    +    do {
    +      Thread.sleep(1)
    +    } while(serializingTaskNum.get() != 0)
    +
    --- End diff --
    
    use a latch


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r10918466
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -198,6 +201,13 @@ private[spark] class TaskSchedulerImpl(
        */
       def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
         SparkEnv.set(sc.env)
    +    // Make thread pool local for shutdown before the function returns
    +    // This is for driver can exit normally which not call sc.stop or sys.exit
    +    val serializeWorkerPool = new ThreadPoolExecutor(
    --- End diff --
    
    Hi, I didn't check your previous discussion in JIRA, but according to Kay, the ideal case is to make the process of serializing tasks asynchronous; in your current approach, it's actually synchronous (L278 - L281); 
    
    
    you can check how TaskResultGetter works: the taskRunner finishes the task and sends a message to the driver, CoarseSchedulerBackend (CSB)...CSB receives that and then taskScheduler -> taskManager -> dagScheduler (simply function call, no message) to mark the task as finished. The Async process is achieved by the message
    
    I think @kayousterhout 's idea is that you develop new messages in your patch, when the serialization is finished, it notifies the CSB to launch the task....


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/214#issuecomment-54694752
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the pull request:

    https://github.com/apache/spark/pull/214#issuecomment-38614287
  
    Yeah exactly -- so my proposal was something like, inn CBSG.makeOffers():
    -still do scheduler.resourceOffers(), only now this returns unserialized tasks
    -update freeCores (as is currently done in launchTasks)
    -add the list of tasks and associated actors to a thread pool that (asynchronously) serializes and launches each task
    
    Here, freeCores is still being updated by the main CBSG actor thread.  The only thing that's happening in a separate thread is the serialization and sending a message to the executor actor, neither of which need to be synchronized on the state in CGSB.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r10896811
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -198,6 +201,13 @@ private[spark] class TaskSchedulerImpl(
        */
       def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
         SparkEnv.set(sc.env)
    +    // Make thread pool local for shutdown before the function returns
    +    // This is for driver can exit normally which not call sc.stop or sys.exit
    +    val serializeWorkerPool = new ThreadPoolExecutor(
    --- End diff --
    
    I'm not sure if new a thread pool for every call of resourceOffer() is a good choice, this function is called for every second by default...is this overhead necessary? I think it's OK to build a threadpool running in the lifecycle of taskscheduler, once the taskscheduler is shutdown, (SparkContext.stop() is called), you can shutdown the thread pool
    
    Glad to hear others' voice


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r10899913
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -31,6 +32,7 @@ import org.apache.spark._
     import org.apache.spark.TaskState.TaskState
     import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
     
    +
    --- End diff --
    
    extra line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1141] [WIP] Parallelize Task Serializat...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on a diff in the pull request:

    https://github.com/apache/spark/pull/214#discussion_r10884203
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -243,12 +275,18 @@ private[spark] class TaskSchedulerImpl(
           } while (launchedTask)
         }
     
    +    do {
    +      Thread.sleep(1)
    +    } while(!serializingTask.isEmpty) 
    +
         if (tasks.size > 0) {
           hasLaunchedTask = true
         }
         return tasks
       }
     
    +
    +
    --- End diff --
    
    extra line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---