You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Wenchen Fan (Jira)" <ji...@apache.org> on 2022/02/09 14:14:00 UTC

[jira] [Resolved] (SPARK-37585) DSV2 InputMetrics are not getting update in corner case

     [ https://issues.apache.org/jira/browse/SPARK-37585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Wenchen Fan resolved SPARK-37585.
---------------------------------
    Fix Version/s: 3.3.0
       Resolution: Fixed

Issue resolved by pull request 35432
[https://github.com/apache/spark/pull/35432]

> DSV2 InputMetrics are not getting update in corner case
> -------------------------------------------------------
>
>                 Key: SPARK-37585
>                 URL: https://issues.apache.org/jira/browse/SPARK-37585
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.0.3, 3.1.2
>            Reporter: Sandeep Katta
>            Assignee: Sandeep Katta
>            Priority: Major
>             Fix For: 3.3.0
>
>
> In some corner cases, DSV2 is not updating the input metrics.
>  
> This is very special case where the number of records read are less than 1000 and *hasNext* is not called for last element(cz input.hasNext returns false so MetricsIterator.hasNext is not called)
>  
> hasNext implementation of MetricsIterator
>  
> {code:java}
> override def hasNext: Boolean = {
>   if (iter.hasNext) {
>     true
>   } else {
>     metricsHandler.updateMetrics(0, force = true)
>     false
>   } {code}
>  
> You reproduce this issue easily in spark-shell by running below code
> {code:java}
> import scala.collection.mutable
> import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}spark.conf.set("spark.sql.sources.useV1SourceList", "")
> val dir = "Users/tmp1"
> spark.range(0, 100).write.format("parquet").mode("overwrite").save(dir)
> val df = spark.read.format("parquet").load(dir)
> val bytesReads = new mutable.ArrayBuffer[Long]()
> val recordsRead = new mutable.ArrayBuffer[Long]()val bytesReadListener = new SparkListener() {
>   override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
>     bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
>     recordsRead += taskEnd.taskMetrics.inputMetrics.recordsRead
>   }
> }
> spark.sparkContext.addSparkListener(bytesReadListener)
> try {
> df.limit(10).collect()
> assert(recordsRead.sum > 0)
> assert(bytesReads.sum > 0)
> } finally {
> spark.sparkContext.removeSparkListener(bytesReadListener)
> } {code}
> This code generally fails at *assert(bytesReads.sum > 0)* which confirms that updateMetrics API is not called
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org