You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Wenchen Fan (Jira)" <ji...@apache.org> on 2022/02/09 14:14:00 UTC
[jira] [Resolved] (SPARK-37585) DSV2 InputMetrics are not getting update in corner case
[ https://issues.apache.org/jira/browse/SPARK-37585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wenchen Fan resolved SPARK-37585.
---------------------------------
Fix Version/s: 3.3.0
Resolution: Fixed
Issue resolved by pull request 35432
[https://github.com/apache/spark/pull/35432]
> DSV2 InputMetrics are not getting update in corner case
> -------------------------------------------------------
>
> Key: SPARK-37585
> URL: https://issues.apache.org/jira/browse/SPARK-37585
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.0.3, 3.1.2
> Reporter: Sandeep Katta
> Assignee: Sandeep Katta
> Priority: Major
> Fix For: 3.3.0
>
>
> In some corner cases, DSV2 is not updating the input metrics.
>
> This is very special case where the number of records read are less than 1000 and *hasNext* is not called for last element(cz input.hasNext returns false so MetricsIterator.hasNext is not called)
>
> hasNext implementation of MetricsIterator
>
> {code:java}
> override def hasNext: Boolean = {
> if (iter.hasNext) {
> true
> } else {
> metricsHandler.updateMetrics(0, force = true)
> false
> } {code}
>
> You reproduce this issue easily in spark-shell by running below code
> {code:java}
> import scala.collection.mutable
> import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}spark.conf.set("spark.sql.sources.useV1SourceList", "")
> val dir = "Users/tmp1"
> spark.range(0, 100).write.format("parquet").mode("overwrite").save(dir)
> val df = spark.read.format("parquet").load(dir)
> val bytesReads = new mutable.ArrayBuffer[Long]()
> val recordsRead = new mutable.ArrayBuffer[Long]()val bytesReadListener = new SparkListener() {
> override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
> bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
> recordsRead += taskEnd.taskMetrics.inputMetrics.recordsRead
> }
> }
> spark.sparkContext.addSparkListener(bytesReadListener)
> try {
> df.limit(10).collect()
> assert(recordsRead.sum > 0)
> assert(bytesReads.sum > 0)
> } finally {
> spark.sparkContext.removeSparkListener(bytesReadListener)
> } {code}
> This code generally fails at *assert(bytesReads.sum > 0)* which confirms that updateMetrics API is not called
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org