You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sandeep Katta (Jira)" <ji...@apache.org> on 2021/12/08 14:08:00 UTC

[jira] [Created] (SPARK-37585) DSV2 InputMetrics are not getting update in corner case

Sandeep Katta created SPARK-37585:
-------------------------------------

             Summary: DSV2 InputMetrics are not getting update in corner case
                 Key: SPARK-37585
                 URL: https://issues.apache.org/jira/browse/SPARK-37585
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 3.1.2, 3.0.3
            Reporter: Sandeep Katta


In some corner cases, DSV2 is not updating the input metrics.

 

This is very special case where the number of records read are less than 1000 and *hasNext* is not called for last element(cz input.hasNext returns false so MetricsIterator.hasNext is not called)

 

hasNext implementation of MetricsIterator

 
{code:java}
override def hasNext: Boolean = {
  if (iter.hasNext) {
    true
  } else {
    metricsHandler.updateMetrics(0, force = true)
    false
  } {code}
 

You reproduce this issue easily in spark-shell by running below code
{code:java}
import scala.collection.mutable
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}spark.conf.set("spark.sql.sources.useV1SourceList", "")
val dir = "Users/tmp1"
spark.range(0, 100).write.format("parquet").mode("overwrite").save(dir)
val df = spark.read.format("parquet").load(dir)
val bytesReads = new mutable.ArrayBuffer[Long]()
val recordsRead = new mutable.ArrayBuffer[Long]()val bytesReadListener = new SparkListener() {
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
    bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
    recordsRead += taskEnd.taskMetrics.inputMetrics.recordsRead
  }
}
spark.sparkContext.addSparkListener(bytesReadListener)
try {
df.limit(10).collect()
assert(recordsRead.sum > 0)
assert(bytesReads.sum > 0)
} finally {
spark.sparkContext.removeSparkListener(bytesReadListener)
} {code}
This code generally fails at *assert(bytesReads.sum > 0)* which confirms that updateMetrics API is not called

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org