You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Trường Trần Phan An <tr...@vlute.edu.vn> on 2023/04/11 16:27:50 UTC

How to determine the function of tasks on each stage in an Apache Spark application?

Hi all,

I am conducting a study comparing the execution time of Bloom Filter Join
operation on two environments: Apache Spark Cluster and Apache Spark. I
have compared the overall time of the two environments, but I want to
compare specific "tasks on each stage" to see which computation has the
most significant difference.

I have taken a screenshot of the DAG of Stage 0 and the list of tasks
executed in Stage 0.
- DAG.png
- Task.png

*I have questions:*
1. Can we determine which tasks are responsible for executing each step
scheduled on the DAG during the processing?
2. Is it possible to know the function of each task (e.g., what is task ID
0 responsible for? What is task ID 1 responsible for? ... )?

Best regards,
Truong

Re: How to determine the function of tasks on each stage in an Apache Spark application?

Posted by Maytas Monsereenusorn <ma...@apache.org>.
Hi,

I was wondering if it's not possible to determine tasks to functions, is it
still possible to easily figure out which job and stage completed which
part of the query from the UI?
For example, in the SQL tab of the Spark UI, I am able to see the query and
the Job IDs for that query. However, when looking at the details for the
Query, how do I know which part of the execution plan was completed by
which job/stage?

Thanks,
Maytas


On Wed, Apr 12, 2023 at 7:06 AM Jacek Laskowski <ja...@japila.pl> wrote:

> Hi,
>
> tl;dr it's not possible to "reverse-engineer" tasks to functions.
>
> In essence, Spark SQL is an abstraction layer over RDD API that's made up
> of partitions and tasks. Tasks are Scala functions (possibly with some
> Python for PySpark). A simple-looking high-level operator like
> DataFrame.join can end up with multiple RDDs, each with a set of partitions
> (and hence tasks). What the tasks do is an implementation detail that you'd
> have to know about by reading the source code of Spark SQL that produces
> the "bytecode".
>
> Just looking at the DAG or the tasks screenshots won't give you that level
> of detail. You'd have to intercept execution events and correlate them. Not
> an easy task yet doable. HTH.
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> "The Internals Of" Online Books <https://books.japila.pl/>
> Follow me on https://twitter.com/jaceklaskowski
>
> <https://twitter.com/jaceklaskowski>
>
>
> On Tue, Apr 11, 2023 at 6:53 PM Trường Trần Phan An <
> truongtpa@vlute.edu.vn> wrote:
>
>> Hi all,
>>
>> I am conducting a study comparing the execution time of Bloom Filter Join
>> operation on two environments: Apache Spark Cluster and Apache Spark. I
>> have compared the overall time of the two environments, but I want to
>> compare specific "tasks on each stage" to see which computation has the
>> most significant difference.
>>
>> I have taken a screenshot of the DAG of Stage 0 and the list of tasks
>> executed in Stage 0.
>> - DAG.png
>> - Task.png
>>
>> *I have questions:*
>> 1. Can we determine which tasks are responsible for executing each step
>> scheduled on the DAG during the processing?
>> 2. Is it possible to know the function of each task (e.g., what is task
>> ID 0 responsible for? What is task ID 1 responsible for? ... )?
>>
>> Best regards,
>> Truong
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>

Re: How to determine the function of tasks on each stage in an Apache Spark application?

Posted by Trường Trần Phan An <tr...@vlute.edu.vn>.
Hi all,

I have written a program and overridden two events onStageCompleted and
onTaskEnd. However, these two events do not provide information on when a
Task/Stage is completed.

What I want to know is which Task corresponds to which stage of a DAG (the
Spark history server only tells me how many stages a Job has and how many
Jobs a Stage has).

Can I print out the edges of the Tasks according to the DAGScheduler?
Below is the program I have written:

import org.apache.spark.rdd.RDD
import org.apache.spark.TaskContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext, TaskEndReason}
import org.apache.spark.scheduler.{SparkListener,
SparkListenerEnvironmentUpdate, SparkListenerStageCompleted,
SparkListenerTaskEnd}
import scala.collection.mutable
import org.apache.spark.sql.execution.SparkPlan

class CustomListener extends SparkListener {
  override def onStageCompleted(stageCompleted:
SparkListenerStageCompleted): Unit = {
    val rdds = stageCompleted.stageInfo.rddInfos
    val stageInfo = stageCompleted.stageInfo
    println(s"Stage ${stageInfo.stageId}")
    println(s"Number of tasks: ${stageInfo.numTasks}")

    stageInfo.rddInfos.foreach { rddInfo =>
      println(s"RDD ${rddInfo.id} has ${rddInfo.numPartitions} partitions.")
    }
  }

  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
    val stageId = taskEnd.stageId
    val stageAttemptId = taskEnd.stageAttemptId
    val taskInfo = taskEnd.taskInfo
    println(s"Task: ${taskInfo.taskId}; Stage: $stageId; Duration:
${taskInfo.duration} ms.")
  }

  def wordCount(sc: SparkContext, inputPath: String): Unit = {
    val data = sc.textFile(inputPath)
    val flatMap = data.flatMap(line => line.split(","))
    val map = flatMap.map(word => (word, 1))
    val reduceByKey = map.reduceByKey(_ + _)
    reduceByKey.foreach(println)
  }
}

object Scenario1 {
  def main(args: Array[String]): Unit = {

    val appName = "scenario1"
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName(appName)
      .getOrCreate()

    val sc = spark.sparkContext
    val sparkListener = new CustomListener()
    sc.addSparkListener(sparkListener)
    val inputPath = "s3a://data-join/file00"
    sparkListener.wordCount(sc, inputPath)
    sc.stop()

  }
}

Best regards,

Truong


Vào CN, 16 thg 4, 2023 vào lúc 09:32 Trường Trần Phan An <
truongtpa@vlute.edu.vn> đã viết:

> Dear Jacek Laskowski,
>
> Thank you for your guide. I will try it out for my problem.
>
> Best regards,
> Truong
>
>
> Vào Th 6, 14 thg 4, 2023 vào lúc 21:00 Jacek Laskowski <ja...@japila.pl>
> đã viết:
>
>> Hi,
>>
>> Start with intercepting stage completions
>> using SparkListenerStageCompleted [1]. That's Spark Core (jobs, stages and
>> tasks).
>>
>> Go up the execution chain to Spark SQL
>> with SparkListenerSQLExecutionStart [2] and SparkListenerSQLExecutionEnd
>> [3], and correlate infos.
>>
>> You may want to look at how web UI works under the covers to collect all
>> the information. Start from SQLTab that should give you what is displayed
>> (that should give you then what's needed and how it's collected).
>>
>> [1]
>> https://github.com/apache/spark/blob/8cceb3946bdfa5ceac0f2b4fe6a7c43eafb76d59/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala#L46
>> [2]
>> https://github.com/apache/spark/blob/24cdae8f3dcfc825c6c0b8ab8aa8505ae194050b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L44
>> [3]
>> https://github.com/apache/spark/blob/24cdae8f3dcfc825c6c0b8ab8aa8505ae194050b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L60
>> [4]
>> https://github.com/apache/spark/blob/c124037b97538b2656d29ce547b2a42209a41703/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala#L24
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> ----
>> "The Internals Of" Online Books <https://books.japila.pl/>
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> <https://twitter.com/jaceklaskowski>
>>
>>
>> On Thu, Apr 13, 2023 at 10:40 AM Trường Trần Phan An <
>> truongtpa@vlute.edu.vn> wrote:
>>
>>> Hi,
>>>
>>> Can you give me more details or give me a tutorial on "You'd have to
>>> intercept execution events and correlate them. Not an easy task yet doable"
>>>
>>> Thank
>>>
>>> Vào Th 4, 12 thg 4, 2023 vào lúc 21:04 Jacek Laskowski <
>>> jacek@japila.pl> đã viết:
>>>
>>>> Hi,
>>>>
>>>> tl;dr it's not possible to "reverse-engineer" tasks to functions.
>>>>
>>>> In essence, Spark SQL is an abstraction layer over RDD API that's made
>>>> up of partitions and tasks. Tasks are Scala functions (possibly with some
>>>> Python for PySpark). A simple-looking high-level operator like
>>>> DataFrame.join can end up with multiple RDDs, each with a set of partitions
>>>> (and hence tasks). What the tasks do is an implementation detail that you'd
>>>> have to know about by reading the source code of Spark SQL that produces
>>>> the "bytecode".
>>>>
>>>> Just looking at the DAG or the tasks screenshots won't give you that
>>>> level of detail. You'd have to intercept execution events and correlate
>>>> them. Not an easy task yet doable. HTH.
>>>>
>>>> Pozdrawiam,
>>>> Jacek Laskowski
>>>> ----
>>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>>> Follow me on https://twitter.com/jaceklaskowski
>>>>
>>>> <https://twitter.com/jaceklaskowski>
>>>>
>>>>
>>>> On Tue, Apr 11, 2023 at 6:53 PM Trường Trần Phan An <
>>>> truongtpa@vlute.edu.vn> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I am conducting a study comparing the execution time of Bloom Filter
>>>>> Join operation on two environments: Apache Spark Cluster and Apache Spark.
>>>>> I have compared the overall time of the two environments, but I want to
>>>>> compare specific "tasks on each stage" to see which computation has the
>>>>> most significant difference.
>>>>>
>>>>> I have taken a screenshot of the DAG of Stage 0 and the list of tasks
>>>>> executed in Stage 0.
>>>>> - DAG.png
>>>>> - Task.png
>>>>>
>>>>> *I have questions:*
>>>>> 1. Can we determine which tasks are responsible for executing each
>>>>> step scheduled on the DAG during the processing?
>>>>> 2. Is it possible to know the function of each task (e.g., what is
>>>>> task ID 0 responsible for? What is task ID 1 responsible for? ... )?
>>>>>
>>>>> Best regards,
>>>>> Truong
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>
>>>>

Re: How to determine the function of tasks on each stage in an Apache Spark application?

Posted by Jacek Laskowski <ja...@japila.pl>.
Hi,

Start with intercepting stage completions using SparkListenerStageCompleted
[1]. That's Spark Core (jobs, stages and tasks).

Go up the execution chain to Spark SQL with SparkListenerSQLExecutionStart
[2] and SparkListenerSQLExecutionEnd [3], and correlate infos.

You may want to look at how web UI works under the covers to collect all
the information. Start from SQLTab that should give you what is displayed
(that should give you then what's needed and how it's collected).

[1]
https://github.com/apache/spark/blob/8cceb3946bdfa5ceac0f2b4fe6a7c43eafb76d59/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala#L46
[2]
https://github.com/apache/spark/blob/24cdae8f3dcfc825c6c0b8ab8aa8505ae194050b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L44
[3]
https://github.com/apache/spark/blob/24cdae8f3dcfc825c6c0b8ab8aa8505ae194050b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L60
[4]
https://github.com/apache/spark/blob/c124037b97538b2656d29ce547b2a42209a41703/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala#L24

Pozdrawiam,
Jacek Laskowski
----
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Thu, Apr 13, 2023 at 10:40 AM Trường Trần Phan An <tr...@vlute.edu.vn>
wrote:

> Hi,
>
> Can you give me more details or give me a tutorial on "You'd have to
> intercept execution events and correlate them. Not an easy task yet doable"
>
> Thank
>
> Vào Th 4, 12 thg 4, 2023 vào lúc 21:04 Jacek Laskowski <ja...@japila.pl>
> đã viết:
>
>> Hi,
>>
>> tl;dr it's not possible to "reverse-engineer" tasks to functions.
>>
>> In essence, Spark SQL is an abstraction layer over RDD API that's made up
>> of partitions and tasks. Tasks are Scala functions (possibly with some
>> Python for PySpark). A simple-looking high-level operator like
>> DataFrame.join can end up with multiple RDDs, each with a set of partitions
>> (and hence tasks). What the tasks do is an implementation detail that you'd
>> have to know about by reading the source code of Spark SQL that produces
>> the "bytecode".
>>
>> Just looking at the DAG or the tasks screenshots won't give you that
>> level of detail. You'd have to intercept execution events and correlate
>> them. Not an easy task yet doable. HTH.
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> ----
>> "The Internals Of" Online Books <https://books.japila.pl/>
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> <https://twitter.com/jaceklaskowski>
>>
>>
>> On Tue, Apr 11, 2023 at 6:53 PM Trường Trần Phan An <
>> truongtpa@vlute.edu.vn> wrote:
>>
>>> Hi all,
>>>
>>> I am conducting a study comparing the execution time of Bloom Filter
>>> Join operation on two environments: Apache Spark Cluster and Apache Spark.
>>> I have compared the overall time of the two environments, but I want to
>>> compare specific "tasks on each stage" to see which computation has the
>>> most significant difference.
>>>
>>> I have taken a screenshot of the DAG of Stage 0 and the list of tasks
>>> executed in Stage 0.
>>> - DAG.png
>>> - Task.png
>>>
>>> *I have questions:*
>>> 1. Can we determine which tasks are responsible for executing each step
>>> scheduled on the DAG during the processing?
>>> 2. Is it possible to know the function of each task (e.g., what is task
>>> ID 0 responsible for? What is task ID 1 responsible for? ... )?
>>>
>>> Best regards,
>>> Truong
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>
>>

Re: How to determine the function of tasks on each stage in an Apache Spark application?

Posted by Trường Trần Phan An <tr...@vlute.edu.vn>.
Hi,

Can you give me more details or give me a tutorial on "You'd have to
intercept execution events and correlate them. Not an easy task yet doable"

Thank

Vào Th 4, 12 thg 4, 2023 vào lúc 21:04 Jacek Laskowski <ja...@japila.pl>
đã viết:

> Hi,
>
> tl;dr it's not possible to "reverse-engineer" tasks to functions.
>
> In essence, Spark SQL is an abstraction layer over RDD API that's made up
> of partitions and tasks. Tasks are Scala functions (possibly with some
> Python for PySpark). A simple-looking high-level operator like
> DataFrame.join can end up with multiple RDDs, each with a set of partitions
> (and hence tasks). What the tasks do is an implementation detail that you'd
> have to know about by reading the source code of Spark SQL that produces
> the "bytecode".
>
> Just looking at the DAG or the tasks screenshots won't give you that level
> of detail. You'd have to intercept execution events and correlate them. Not
> an easy task yet doable. HTH.
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> "The Internals Of" Online Books <https://books.japila.pl/>
> Follow me on https://twitter.com/jaceklaskowski
>
> <https://twitter.com/jaceklaskowski>
>
>
> On Tue, Apr 11, 2023 at 6:53 PM Trường Trần Phan An <
> truongtpa@vlute.edu.vn> wrote:
>
>> Hi all,
>>
>> I am conducting a study comparing the execution time of Bloom Filter Join
>> operation on two environments: Apache Spark Cluster and Apache Spark. I
>> have compared the overall time of the two environments, but I want to
>> compare specific "tasks on each stage" to see which computation has the
>> most significant difference.
>>
>> I have taken a screenshot of the DAG of Stage 0 and the list of tasks
>> executed in Stage 0.
>> - DAG.png
>> - Task.png
>>
>> *I have questions:*
>> 1. Can we determine which tasks are responsible for executing each step
>> scheduled on the DAG during the processing?
>> 2. Is it possible to know the function of each task (e.g., what is task
>> ID 0 responsible for? What is task ID 1 responsible for? ... )?
>>
>> Best regards,
>> Truong
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>

Re: How to determine the function of tasks on each stage in an Apache Spark application?

Posted by Jacek Laskowski <ja...@japila.pl>.
Hi,

tl;dr it's not possible to "reverse-engineer" tasks to functions.

In essence, Spark SQL is an abstraction layer over RDD API that's made up
of partitions and tasks. Tasks are Scala functions (possibly with some
Python for PySpark). A simple-looking high-level operator like
DataFrame.join can end up with multiple RDDs, each with a set of partitions
(and hence tasks). What the tasks do is an implementation detail that you'd
have to know about by reading the source code of Spark SQL that produces
the "bytecode".

Just looking at the DAG or the tasks screenshots won't give you that level
of detail. You'd have to intercept execution events and correlate them. Not
an easy task yet doable. HTH.

Pozdrawiam,
Jacek Laskowski
----
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Tue, Apr 11, 2023 at 6:53 PM Trường Trần Phan An <tr...@vlute.edu.vn>
wrote:

> Hi all,
>
> I am conducting a study comparing the execution time of Bloom Filter Join
> operation on two environments: Apache Spark Cluster and Apache Spark. I
> have compared the overall time of the two environments, but I want to
> compare specific "tasks on each stage" to see which computation has the
> most significant difference.
>
> I have taken a screenshot of the DAG of Stage 0 and the list of tasks
> executed in Stage 0.
> - DAG.png
> - Task.png
>
> *I have questions:*
> 1. Can we determine which tasks are responsible for executing each step
> scheduled on the DAG during the processing?
> 2. Is it possible to know the function of each task (e.g., what is task ID
> 0 responsible for? What is task ID 1 responsible for? ... )?
>
> Best regards,
> Truong
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org