You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Trường Trần Phan An <tr...@vlute.edu.vn> on 2023/05/02 15:25:31 UTC

Re: How to determine the function of tasks on each stage in an Apache Spark application?

Hi all,

I have written a program and overridden two events onStageCompleted and
onTaskEnd. However, these two events do not provide information on when a
Task/Stage is completed.

What I want to know is which Task corresponds to which stage of a DAG (the
Spark history server only tells me how many stages a Job has and how many
Jobs a Stage has).

Can I print out the edges of the Tasks according to the DAGScheduler?
Below is the program I have written:

import org.apache.spark.rdd.RDD
import org.apache.spark.TaskContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext, TaskEndReason}
import org.apache.spark.scheduler.{SparkListener,
SparkListenerEnvironmentUpdate, SparkListenerStageCompleted,
SparkListenerTaskEnd}
import scala.collection.mutable
import org.apache.spark.sql.execution.SparkPlan

class CustomListener extends SparkListener {
  override def onStageCompleted(stageCompleted:
SparkListenerStageCompleted): Unit = {
    val rdds = stageCompleted.stageInfo.rddInfos
    val stageInfo = stageCompleted.stageInfo
    println(s"Stage ${stageInfo.stageId}")
    println(s"Number of tasks: ${stageInfo.numTasks}")

    stageInfo.rddInfos.foreach { rddInfo =>
      println(s"RDD ${rddInfo.id} has ${rddInfo.numPartitions} partitions.")
    }
  }

  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
    val stageId = taskEnd.stageId
    val stageAttemptId = taskEnd.stageAttemptId
    val taskInfo = taskEnd.taskInfo
    println(s"Task: ${taskInfo.taskId}; Stage: $stageId; Duration:
${taskInfo.duration} ms.")
  }

  def wordCount(sc: SparkContext, inputPath: String): Unit = {
    val data = sc.textFile(inputPath)
    val flatMap = data.flatMap(line => line.split(","))
    val map = flatMap.map(word => (word, 1))
    val reduceByKey = map.reduceByKey(_ + _)
    reduceByKey.foreach(println)
  }
}

object Scenario1 {
  def main(args: Array[String]): Unit = {

    val appName = "scenario1"
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName(appName)
      .getOrCreate()

    val sc = spark.sparkContext
    val sparkListener = new CustomListener()
    sc.addSparkListener(sparkListener)
    val inputPath = "s3a://data-join/file00"
    sparkListener.wordCount(sc, inputPath)
    sc.stop()

  }
}

Best regards,

Truong


Vào CN, 16 thg 4, 2023 vào lúc 09:32 Trường Trần Phan An <
truongtpa@vlute.edu.vn> đã viết:

> Dear Jacek Laskowski,
>
> Thank you for your guide. I will try it out for my problem.
>
> Best regards,
> Truong
>
>
> Vào Th 6, 14 thg 4, 2023 vào lúc 21:00 Jacek Laskowski <ja...@japila.pl>
> đã viết:
>
>> Hi,
>>
>> Start with intercepting stage completions
>> using SparkListenerStageCompleted [1]. That's Spark Core (jobs, stages and
>> tasks).
>>
>> Go up the execution chain to Spark SQL
>> with SparkListenerSQLExecutionStart [2] and SparkListenerSQLExecutionEnd
>> [3], and correlate infos.
>>
>> You may want to look at how web UI works under the covers to collect all
>> the information. Start from SQLTab that should give you what is displayed
>> (that should give you then what's needed and how it's collected).
>>
>> [1]
>> https://github.com/apache/spark/blob/8cceb3946bdfa5ceac0f2b4fe6a7c43eafb76d59/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala#L46
>> [2]
>> https://github.com/apache/spark/blob/24cdae8f3dcfc825c6c0b8ab8aa8505ae194050b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L44
>> [3]
>> https://github.com/apache/spark/blob/24cdae8f3dcfc825c6c0b8ab8aa8505ae194050b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L60
>> [4]
>> https://github.com/apache/spark/blob/c124037b97538b2656d29ce547b2a42209a41703/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala#L24
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> ----
>> "The Internals Of" Online Books <https://books.japila.pl/>
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> <https://twitter.com/jaceklaskowski>
>>
>>
>> On Thu, Apr 13, 2023 at 10:40 AM Trường Trần Phan An <
>> truongtpa@vlute.edu.vn> wrote:
>>
>>> Hi,
>>>
>>> Can you give me more details or give me a tutorial on "You'd have to
>>> intercept execution events and correlate them. Not an easy task yet doable"
>>>
>>> Thank
>>>
>>> Vào Th 4, 12 thg 4, 2023 vào lúc 21:04 Jacek Laskowski <
>>> jacek@japila.pl> đã viết:
>>>
>>>> Hi,
>>>>
>>>> tl;dr it's not possible to "reverse-engineer" tasks to functions.
>>>>
>>>> In essence, Spark SQL is an abstraction layer over RDD API that's made
>>>> up of partitions and tasks. Tasks are Scala functions (possibly with some
>>>> Python for PySpark). A simple-looking high-level operator like
>>>> DataFrame.join can end up with multiple RDDs, each with a set of partitions
>>>> (and hence tasks). What the tasks do is an implementation detail that you'd
>>>> have to know about by reading the source code of Spark SQL that produces
>>>> the "bytecode".
>>>>
>>>> Just looking at the DAG or the tasks screenshots won't give you that
>>>> level of detail. You'd have to intercept execution events and correlate
>>>> them. Not an easy task yet doable. HTH.
>>>>
>>>> Pozdrawiam,
>>>> Jacek Laskowski
>>>> ----
>>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>>> Follow me on https://twitter.com/jaceklaskowski
>>>>
>>>> <https://twitter.com/jaceklaskowski>
>>>>
>>>>
>>>> On Tue, Apr 11, 2023 at 6:53 PM Trường Trần Phan An <
>>>> truongtpa@vlute.edu.vn> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I am conducting a study comparing the execution time of Bloom Filter
>>>>> Join operation on two environments: Apache Spark Cluster and Apache Spark.
>>>>> I have compared the overall time of the two environments, but I want to
>>>>> compare specific "tasks on each stage" to see which computation has the
>>>>> most significant difference.
>>>>>
>>>>> I have taken a screenshot of the DAG of Stage 0 and the list of tasks
>>>>> executed in Stage 0.
>>>>> - DAG.png
>>>>> - Task.png
>>>>>
>>>>> *I have questions:*
>>>>> 1. Can we determine which tasks are responsible for executing each
>>>>> step scheduled on the DAG during the processing?
>>>>> 2. Is it possible to know the function of each task (e.g., what is
>>>>> task ID 0 responsible for? What is task ID 1 responsible for? ... )?
>>>>>
>>>>> Best regards,
>>>>> Truong
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>
>>>>