You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by raggy <ra...@gmail.com> on 2015/03/30 03:34:50 UTC

Task result in Spark Worker Node

I am a PhD student working on a research project related to Apache Spark. I
am trying to modify some of the spark source code such that instead of
sending the final result RDD from the worker nodes to a master node, I want
to send the final result RDDs to some different node. In order to do this, I
have been trying to identify at which point the Spark worker nodes broadcast
the results of a job back to the master. 

So far, I understand that in Spark, the master serializes the RDD and the
functions to be applied on them and sends them over to the worker nodes. In
the context of reduce, it serializes the RDD partition and the reduce
function and sends them to the worker nodes. However, my understanding of
how things happen at the worker node is very limited and I would appreciate
it if someone could help me identify where the process of broadcasting the
results of local worker computations back to the master node takes place. 

This is some of the limited knowledge that I have about the worker nodes: 

Each job gets divided into smaller sets of tasks called stages. Each Stage
is either a Shuffle Map Stage or Result Stage. In a Shuffle Map Stage, the
task results are used as input for another stage. The result stage uses the
RDD to compute the action that initiated the job. So, this result stage
executes the last task for the job on the worker node. I would assume after
this is done, it gets the result and broadcasts it to the driver
application(the master). 

In ResultTask.scala(spark-core src/main/scala org.apache.spark.scheduler) it
states "A task that sends back the output to the driver application.".
However, I don't see when or where this happens in the source code. I would
very much appreciate it if someone could help me identify where this happens
in the Spark source code. 



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Task-result-in-Spark-Worker-Node-tp22283.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Task result in Spark Worker Node

Posted by Imran Rashid <ir...@cloudera.com>.
hard to say for sure, but tasks get serialized along with a partition,
which might be the info that you are missing.

I dont' know what you are trying to build, so I don't mean to be too
discouraging, but it sounds like you are trying to do some unusual hybrid
that will be very tough.  It might make more sense to fork spark and just
modify it for your project; or it might make more sense to just do
something new.  Pulling out bits and pieces like this is going to be very
tough unless you understand the system really well.

On Fri, Apr 17, 2015 at 2:35 AM, Raghav Shankar <ra...@gmail.com>
wrote:

> My apologies, I had pasted the wrong exception trace in the previous
> email. Here is the actual exception that I am receiving.
>
> Exception in thread "main" java.lang.NullPointerException
> at org.apache.spark.rdd.ParallelCollectionRDD$.slice(
> ParallelCollectionRDD.scala:154)
> at org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(
> ParallelCollectionRDD.scala:97)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>
> On Apr 17, 2015, at 2:30 AM, Raghav Shankar <ra...@gmail.com>
> wrote:
>
> Hey Imran,
>
>  Thanks for the great explanation! This cleared up a lot of things for me.
> I am actually trying to utilize some of the features within Spark for a
> system I am developing. I am currently working on developing a subsystem
> that can be integrated within Spark and other Big Data solutions. In order
> to integrate it within Spark, I am trying to utilize the rdds and functions
> provided to the reduce method on my system. My system is developed in Scala
> and Java. In Spark, I have seen that the function provided to the reduce
> method, along with the RDD, gets serialized and sent to the worker nodes.
> The worker nodes are able to deserialize them and then execute the task on
> them. I see this happening in ResultTask.scala. When I try to do something
> similar, I get exceptions. The system I am developing has Spark jars in its
> build path, so it is able to create a SparkContext etc.
>
> When I do,
>
> val bytes = closureSerializer.serialize((rdd, func) : AnyRef).array()
> (similar to DAGScheduler.scala)
> val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int],
> (TaskContext, Iterator[Int]) => Int)](
>       ByteBuffer.wrap(bytes), Thread.currentThread.getContextClassLoader)
> println(func2(context, rdd2.iterator(rdd2.partitions(1), context)));
>
> I get the proper result and can print it out.
>
> But when I involve the network by serializing the data, using the network
> to send it to a different program, then deserialize the data and use the
> function, I get the following error:
>
> Exception in thread "main" java.lang.NullPointerException
> at org.apache.spark.InterruptibleIterator.hasNext(
> InterruptibleIterator.scala:36)
> at SimpleApp$$anonfun$1.apply(SimpleApp.scala:31)
> at SimpleApp$$anonfun$1.apply(SimpleApp.scala:30)
> at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37)
> at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37)
> at SimpleApp$.net(SimpleApp.scala:71)
> at SimpleApp$.main(SimpleApp.scala:76)
> at SimpleApp.main(SimpleApp.scala)
>
> I have also made sure that I am adding the class file of the program that
> is sending the serialized data to the bin folder of the program that is
> receiving the data. I’m not sure what I am doing wrong. I’ve done the
> serialization and creation of the function similar to how Spark does it. I
> created another reduce function like this. When implemented this way, it
> prints out the result of func2 properly. But when I involve the network by
> sending the serialized data to another program, I get the above exception.
>
>    def reduceMod(f: (Integer, Integer) => Integer): Integer = {
>     val reducePartition: Iterator[Integer] => Option[Integer] = iter => {
>       if (iter.hasNext) {
>         Some(iter.reduceLeft(f))
>       } else {
>         None
>       }
>     }
>     val processFunc = (context: TaskContext, iter: Iterator[Integer]) =>
> reducePartition(iter)
>     val func = processFunc.asInstanceOf[(TaskContext, Iterator[Int]) =>
> Int]
>     context = new TaskContextImpl(stageId = 1, partitionId = 1,
>       taskAttemptId = 1, attemptNumber = 1, runningLocally = false)
>     println(func.getClass.getName);
>     println(func(context, rdd.iterator(rdd.partitions(1), context)));
>     val bb = closureSerializer.serialize((rdd, func) : AnyRef).array()
>     val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int],
> (TaskContext, Iterator[Int]) => Int)](
>       ByteBuffer.wrap(bb), Thread.currentThread.getContextClassLoader)
>     println(func2(context, rdd3.iterator(rdd3.partitions(1), context)));
>     1
>   }
>
> I was wondering if you had any ideas on what I am doing wrong, or how I
> can properly send the serialized version of the RDD and function to my
> other program. My thought is that I might need to add more jars to the
> build path, but I have no clue if thats the issue and what jars I need to
> add.
>
> Thanks,
> Raghav
>
> On Apr 13, 2015, at 10:22 PM, Imran Rashid <ir...@cloudera.com> wrote:
>
> On the worker side, it all happens in Executor.  The task result is
> computed here:
>
>
> https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210
>
> then its serialized along with some other goodies, and finally sent back
> to the driver here:
>
>
> https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L255
>
> What happens on the driver is quite a bit more complicated, and involves a
> number of spots in the code, but at least to get you started, the results
> are received here:
>
>
> https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L328
>
> though perhaps a more interesting spot is where they are handled in
> DagScheduler#handleTaskCompletion:
>
> https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1001
>
>
> also, I think I know what you mean, but just to make sure: I wouldn't say
> the results from the worker are "broadcast" back to the driver.  (a) in
> spark, "broadcast" tends to refer to a particular api for sharing immutable
> data from the driver to the workers (only one direction) and (b) it doesn't
> really fit a more general meaning of "broadcast" either, since the results
> are sent only to the driver, not to all nodes.
>
> On Sun, Mar 29, 2015 at 8:34 PM, raggy <ra...@gmail.com> wrote:
>
>> I am a PhD student working on a research project related to Apache Spark.
>> I
>> am trying to modify some of the spark source code such that instead of
>> sending the final result RDD from the worker nodes to a master node, I
>> want
>> to send the final result RDDs to some different node. In order to do
>> this, I
>> have been trying to identify at which point the Spark worker nodes
>> broadcast
>> the results of a job back to the master.
>>
>> So far, I understand that in Spark, the master serializes the RDD and the
>> functions to be applied on them and sends them over to the worker nodes.
>> In
>> the context of reduce, it serializes the RDD partition and the reduce
>> function and sends them to the worker nodes. However, my understanding of
>> how things happen at the worker node is very limited and I would
>> appreciate
>> it if someone could help me identify where the process of broadcasting the
>> results of local worker computations back to the master node takes place.
>>
>> This is some of the limited knowledge that I have about the worker nodes:
>>
>> Each job gets divided into smaller sets of tasks called stages. Each Stage
>> is either a Shuffle Map Stage or Result Stage. In a Shuffle Map Stage, the
>> task results are used as input for another stage. The result stage uses
>> the
>> RDD to compute the action that initiated the job. So, this result stage
>> executes the last task for the job on the worker node. I would assume
>> after
>> this is done, it gets the result and broadcasts it to the driver
>> application(the master).
>>
>> In ResultTask.scala(spark-core src/main/scala org.apache.spark.scheduler)
>> it
>> states "A task that sends back the output to the driver application.".
>> However, I don't see when or where this happens in the source code. I
>> would
>> very much appreciate it if someone could help me identify where this
>> happens
>> in the Spark source code.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Task-result-in-Spark-Worker-Node-tp22283.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com
>> <http://nabble.com/>.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>
>
>

Re: Task result in Spark Worker Node

Posted by Raghav Shankar <ra...@gmail.com>.
My apologies, I had pasted the wrong exception trace in the previous email. Here is the actual exception that I am receiving. 

Exception in thread "main" java.lang.NullPointerException
	at org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:154)
	at org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)

> On Apr 17, 2015, at 2:30 AM, Raghav Shankar <ra...@gmail.com> wrote:
> 
> Hey Imran, 
> 
>  Thanks for the great explanation! This cleared up a lot of things for me. I am actually trying to utilize some of the features within Spark for a system I am developing. I am currently working on developing a subsystem that can be integrated within Spark and other Big Data solutions. In order to integrate it within Spark, I am trying to utilize the rdds and functions provided to the reduce method on my system. My system is developed in Scala and Java. In Spark, I have seen that the function provided to the reduce method, along with the RDD, gets serialized and sent to the worker nodes. The worker nodes are able to deserialize them and then execute the task on them. I see this happening in ResultTask.scala. When I try to do something similar, I get exceptions. The system I am developing has Spark jars in its build path, so it is able to create a SparkContext etc. 
> 
> When I do, 
> 
> val bytes = closureSerializer.serialize((rdd, func) : AnyRef).array() (similar to DAGScheduler.scala)
> val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], (TaskContext, Iterator[Int]) => Int)](
>       ByteBuffer.wrap(bytes), Thread.currentThread.getContextClassLoader)
> println(func2(context, rdd2.iterator(rdd2.partitions(1), context)));
> 
> I get the proper result and can print it out. 
> 
> But when I involve the network by serializing the data, using the network to send it to a different program, then deserialize the data and use the function, I get the following error:
> 
> Exception in thread "main" java.lang.NullPointerException
> 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36)
> 	at SimpleApp$$anonfun$1.apply(SimpleApp.scala:31)
> 	at SimpleApp$$anonfun$1.apply(SimpleApp.scala:30)
> 	at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37)
> 	at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37)
> 	at SimpleApp$.net(SimpleApp.scala:71)
> 	at SimpleApp$.main(SimpleApp.scala:76)
> 	at SimpleApp.main(SimpleApp.scala)
> 
> I have also made sure that I am adding the class file of the program that is sending the serialized data to the bin folder of the program that is receiving the data. I’m not sure what I am doing wrong. I’ve done the serialization and creation of the function similar to how Spark does it. I created another reduce function like this. When implemented this way, it prints out the result of func2 properly. But when I involve the network by sending the serialized data to another program, I get the above exception. 
> 
>    def reduceMod(f: (Integer, Integer) => Integer): Integer = {
>     val reducePartition: Iterator[Integer] => Option[Integer] = iter => {
>       if (iter.hasNext) {
>         Some(iter.reduceLeft(f))
>       } else {
>         None
>       }
>     }
>     val processFunc = (context: TaskContext, iter: Iterator[Integer]) => reducePartition(iter)
>     val func = processFunc.asInstanceOf[(TaskContext, Iterator[Int]) => Int]
>     context = new TaskContextImpl(stageId = 1, partitionId = 1,
>       taskAttemptId = 1, attemptNumber = 1, runningLocally = false)
>     println(func.getClass.getName);
>     println(func(context, rdd.iterator(rdd.partitions(1), context)));
>     val bb = closureSerializer.serialize((rdd, func) : AnyRef).array()
>     val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], (TaskContext, Iterator[Int]) => Int)](
>       ByteBuffer.wrap(bb), Thread.currentThread.getContextClassLoader)
>     println(func2(context, rdd3.iterator(rdd3.partitions(1), context)));
>     1
>   }
>  
> I was wondering if you had any ideas on what I am doing wrong, or how I can properly send the serialized version of the RDD and function to my other program. My thought is that I might need to add more jars to the build path, but I have no clue if thats the issue and what jars I need to add. 
> 
> Thanks,
> Raghav
> 
>> On Apr 13, 2015, at 10:22 PM, Imran Rashid <irashid@cloudera.com <ma...@cloudera.com>> wrote:
>> 
>> On the worker side, it all happens in Executor.  The task result is computed here:
>> 
>> https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210 <https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210>
>> 
>> then its serialized along with some other goodies, and finally sent back to the driver here:
>> 
>> https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L255 <https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L255>
>> 
>> What happens on the driver is quite a bit more complicated, and involves a number of spots in the code, but at least to get you started, the results are received here:
>> 
>> https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L328 <https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L328>
>> 
>> though perhaps a more interesting spot is where they are handled in DagScheduler#handleTaskCompletion:
>> https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1001 <https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1001>
>> 
>> 
>> also, I think I know what you mean, but just to make sure: I wouldn't say the results from the worker are "broadcast" back to the driver.  (a) in spark, "broadcast" tends to refer to a particular api for sharing immutable data from the driver to the workers (only one direction) and (b) it doesn't really fit a more general meaning of "broadcast" either, since the results are sent only to the driver, not to all nodes.
>> 
>> On Sun, Mar 29, 2015 at 8:34 PM, raggy <raghav0110.cs@gmail.com <ma...@gmail.com>> wrote:
>> I am a PhD student working on a research project related to Apache Spark. I
>> am trying to modify some of the spark source code such that instead of
>> sending the final result RDD from the worker nodes to a master node, I want
>> to send the final result RDDs to some different node. In order to do this, I
>> have been trying to identify at which point the Spark worker nodes broadcast
>> the results of a job back to the master.
>> 
>> So far, I understand that in Spark, the master serializes the RDD and the
>> functions to be applied on them and sends them over to the worker nodes. In
>> the context of reduce, it serializes the RDD partition and the reduce
>> function and sends them to the worker nodes. However, my understanding of
>> how things happen at the worker node is very limited and I would appreciate
>> it if someone could help me identify where the process of broadcasting the
>> results of local worker computations back to the master node takes place.
>> 
>> This is some of the limited knowledge that I have about the worker nodes:
>> 
>> Each job gets divided into smaller sets of tasks called stages. Each Stage
>> is either a Shuffle Map Stage or Result Stage. In a Shuffle Map Stage, the
>> task results are used as input for another stage. The result stage uses the
>> RDD to compute the action that initiated the job. So, this result stage
>> executes the last task for the job on the worker node. I would assume after
>> this is done, it gets the result and broadcasts it to the driver
>> application(the master).
>> 
>> In ResultTask.scala(spark-core src/main/scala org.apache.spark.scheduler) it
>> states "A task that sends back the output to the driver application.".
>> However, I don't see when or where this happens in the source code. I would
>> very much appreciate it if someone could help me identify where this happens
>> in the Spark source code.
>> 
>> 
>> 
>> --
>> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Task-result-in-Spark-Worker-Node-tp22283.html <http://apache-spark-user-list.1001560.n3.nabble.com/Task-result-in-Spark-Worker-Node-tp22283.html>
>> Sent from the Apache Spark User List mailing list archive at Nabble.com <http://nabble.com/>.
>> 
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org <ma...@spark.apache.org>
>> For additional commands, e-mail: user-help@spark.apache.org <ma...@spark.apache.org>
>> 
>> 
> 


Re: Task result in Spark Worker Node

Posted by Raghav Shankar <ra...@gmail.com>.
Hey Imran, 

 Thanks for the great explanation! This cleared up a lot of things for me. I am actually trying to utilize some of the features within Spark for a system I am developing. I am currently working on developing a subsystem that can be integrated within Spark and other Big Data solutions. In order to integrate it within Spark, I am trying to utilize the rdds and functions provided to the reduce method on my system. My system is developed in Scala and Java. In Spark, I have seen that the function provided to the reduce method, along with the RDD, gets serialized and sent to the worker nodes. The worker nodes are able to deserialize them and then execute the task on them. I see this happening in ResultTask.scala. When I try to do something similar, I get exceptions. The system I am developing has Spark jars in its build path, so it is able to create a SparkContext etc. 

When I do, 

val bytes = closureSerializer.serialize((rdd, func) : AnyRef).array() (similar to DAGScheduler.scala)
val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], (TaskContext, Iterator[Int]) => Int)](
      ByteBuffer.wrap(bytes), Thread.currentThread.getContextClassLoader)
println(func2(context, rdd2.iterator(rdd2.partitions(1), context)));

I get the proper result and can print it out. 

But when I involve the network by serializing the data, using the network to send it to a different program, then deserialize the data and use the function, I get the following error:

Exception in thread "main" java.lang.NullPointerException
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36)
	at SimpleApp$$anonfun$1.apply(SimpleApp.scala:31)
	at SimpleApp$$anonfun$1.apply(SimpleApp.scala:30)
	at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37)
	at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37)
	at SimpleApp$.net(SimpleApp.scala:71)
	at SimpleApp$.main(SimpleApp.scala:76)
	at SimpleApp.main(SimpleApp.scala)

I have also made sure that I am adding the class file of the program that is sending the serialized data to the bin folder of the program that is receiving the data. I’m not sure what I am doing wrong. I’ve done the serialization and creation of the function similar to how Spark does it. I created another reduce function like this. When implemented this way, it prints out the result of func2 properly. But when I involve the network by sending the serialized data to another program, I get the above exception. 

   def reduceMod(f: (Integer, Integer) => Integer): Integer = {
    val reducePartition: Iterator[Integer] => Option[Integer] = iter => {
      if (iter.hasNext) {
        Some(iter.reduceLeft(f))
      } else {
        None
      }
    }
    val processFunc = (context: TaskContext, iter: Iterator[Integer]) => reducePartition(iter)
    val func = processFunc.asInstanceOf[(TaskContext, Iterator[Int]) => Int]
    context = new TaskContextImpl(stageId = 1, partitionId = 1,
      taskAttemptId = 1, attemptNumber = 1, runningLocally = false)
    println(func.getClass.getName);
    println(func(context, rdd.iterator(rdd.partitions(1), context)));
    val bb = closureSerializer.serialize((rdd, func) : AnyRef).array()
    val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], (TaskContext, Iterator[Int]) => Int)](
      ByteBuffer.wrap(bb), Thread.currentThread.getContextClassLoader)
    println(func2(context, rdd3.iterator(rdd3.partitions(1), context)));
    1
  }
 
I was wondering if you had any ideas on what I am doing wrong, or how I can properly send the serialized version of the RDD and function to my other program. My thought is that I might need to add more jars to the build path, but I have no clue if thats the issue and what jars I need to add. 

Thanks,
Raghav

> On Apr 13, 2015, at 10:22 PM, Imran Rashid <ir...@cloudera.com> wrote:
> 
> On the worker side, it all happens in Executor.  The task result is computed here:
> 
> https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210 <https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210>
> 
> then its serialized along with some other goodies, and finally sent back to the driver here:
> 
> https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L255 <https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L255>
> 
> What happens on the driver is quite a bit more complicated, and involves a number of spots in the code, but at least to get you started, the results are received here:
> 
> https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L328 <https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L328>
> 
> though perhaps a more interesting spot is where they are handled in DagScheduler#handleTaskCompletion:
> https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1001 <https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1001>
> 
> 
> also, I think I know what you mean, but just to make sure: I wouldn't say the results from the worker are "broadcast" back to the driver.  (a) in spark, "broadcast" tends to refer to a particular api for sharing immutable data from the driver to the workers (only one direction) and (b) it doesn't really fit a more general meaning of "broadcast" either, since the results are sent only to the driver, not to all nodes.
> 
> On Sun, Mar 29, 2015 at 8:34 PM, raggy <raghav0110.cs@gmail.com <ma...@gmail.com>> wrote:
> I am a PhD student working on a research project related to Apache Spark. I
> am trying to modify some of the spark source code such that instead of
> sending the final result RDD from the worker nodes to a master node, I want
> to send the final result RDDs to some different node. In order to do this, I
> have been trying to identify at which point the Spark worker nodes broadcast
> the results of a job back to the master.
> 
> So far, I understand that in Spark, the master serializes the RDD and the
> functions to be applied on them and sends them over to the worker nodes. In
> the context of reduce, it serializes the RDD partition and the reduce
> function and sends them to the worker nodes. However, my understanding of
> how things happen at the worker node is very limited and I would appreciate
> it if someone could help me identify where the process of broadcasting the
> results of local worker computations back to the master node takes place.
> 
> This is some of the limited knowledge that I have about the worker nodes:
> 
> Each job gets divided into smaller sets of tasks called stages. Each Stage
> is either a Shuffle Map Stage or Result Stage. In a Shuffle Map Stage, the
> task results are used as input for another stage. The result stage uses the
> RDD to compute the action that initiated the job. So, this result stage
> executes the last task for the job on the worker node. I would assume after
> this is done, it gets the result and broadcasts it to the driver
> application(the master).
> 
> In ResultTask.scala(spark-core src/main/scala org.apache.spark.scheduler) it
> states "A task that sends back the output to the driver application.".
> However, I don't see when or where this happens in the source code. I would
> very much appreciate it if someone could help me identify where this happens
> in the Spark source code.
> 
> 
> 
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Task-result-in-Spark-Worker-Node-tp22283.html <http://apache-spark-user-list.1001560.n3.nabble.com/Task-result-in-Spark-Worker-Node-tp22283.html>
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org <ma...@spark.apache.org>
> For additional commands, e-mail: user-help@spark.apache.org <ma...@spark.apache.org>
> 
> 


Re: Task result in Spark Worker Node

Posted by Imran Rashid <ir...@cloudera.com>.
On the worker side, it all happens in Executor.  The task result is
computed here:

https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210

then its serialized along with some other goodies, and finally sent back to
the driver here:

https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L255

What happens on the driver is quite a bit more complicated, and involves a
number of spots in the code, but at least to get you started, the results
are received here:

https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L328

though perhaps a more interesting spot is where they are handled in
DagScheduler#handleTaskCompletion:
https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1001


also, I think I know what you mean, but just to make sure: I wouldn't say
the results from the worker are "broadcast" back to the driver.  (a) in
spark, "broadcast" tends to refer to a particular api for sharing immutable
data from the driver to the workers (only one direction) and (b) it doesn't
really fit a more general meaning of "broadcast" either, since the results
are sent only to the driver, not to all nodes.

On Sun, Mar 29, 2015 at 8:34 PM, raggy <ra...@gmail.com> wrote:

> I am a PhD student working on a research project related to Apache Spark. I
> am trying to modify some of the spark source code such that instead of
> sending the final result RDD from the worker nodes to a master node, I want
> to send the final result RDDs to some different node. In order to do this,
> I
> have been trying to identify at which point the Spark worker nodes
> broadcast
> the results of a job back to the master.
>
> So far, I understand that in Spark, the master serializes the RDD and the
> functions to be applied on them and sends them over to the worker nodes. In
> the context of reduce, it serializes the RDD partition and the reduce
> function and sends them to the worker nodes. However, my understanding of
> how things happen at the worker node is very limited and I would appreciate
> it if someone could help me identify where the process of broadcasting the
> results of local worker computations back to the master node takes place.
>
> This is some of the limited knowledge that I have about the worker nodes:
>
> Each job gets divided into smaller sets of tasks called stages. Each Stage
> is either a Shuffle Map Stage or Result Stage. In a Shuffle Map Stage, the
> task results are used as input for another stage. The result stage uses the
> RDD to compute the action that initiated the job. So, this result stage
> executes the last task for the job on the worker node. I would assume after
> this is done, it gets the result and broadcasts it to the driver
> application(the master).
>
> In ResultTask.scala(spark-core src/main/scala org.apache.spark.scheduler)
> it
> states "A task that sends back the output to the driver application.".
> However, I don't see when or where this happens in the source code. I would
> very much appreciate it if someone could help me identify where this
> happens
> in the Spark source code.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Task-result-in-Spark-Worker-Node-tp22283.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>