You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Amar Agrawal (JIRA)" <ji...@apache.org> on 2019/01/22 08:26:00 UTC

[jira] [Created] (SPARK-26683) Incorrect value of "internal.metrics.input.recordsRead" when reading from temp hive table backed by HDFS file

Amar Agrawal created SPARK-26683:
------------------------------------

             Summary: Incorrect value of "internal.metrics.input.recordsRead" when reading from temp hive table backed by HDFS file
                 Key: SPARK-26683
                 URL: https://issues.apache.org/jira/browse/SPARK-26683
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 2.3.1
            Reporter: Amar Agrawal


*Issue description*

The summary of the issue is - when persisted DataFrame is used in two different concurrent threads, we are getting wrong value of *internal.metrics.input.recordsRead* in SparkListenerStageCompleted event.

 

*Issue Details* 

The spark code I have written has 2 source temp hive tables. When the first temp table is read, it's dataframe is persisted. Whereas, for the other temp table, its source dataframe is not persisted. After that, we have 2 pipelines which we run in async fashion. In the 1st pipeline, the persisted dataframe is written to some hive target table. Whereas, in the 2nd pipeline, we are performing a UNION of persisted dataframe with non-persisted dataframe, which is then written to a separate hive table.

Our expectation is, since the first dataframe is persisted, its metric for recordsRead should be computed exactly once. But in our case, we are seeing an increased value of the metric.

Example - if my persisted dataframe has 2 rows, the above mentioned metric is consistently reporting it as 3 rows.

 

*Steps to reproduce Issue:*

Run the following in spark-shell.

scala> :load asyncfactory.scala

scala> : paste -raw

 
{code:java}
package org.apache.spark

import org.apache.spark.scheduler._
import org.apache.spark.util.JsonProtocol
import org.json4s.jackson.JsonMethods._

class InfaListener(mode:String="ACCUMULATOR") extends org.apache.spark.scheduler.SparkListener {

def onEvent(event: SparkListenerEvent): Unit = {
val jv = JsonProtocol.sparkEventToJson(event)
println(compact(jv))
}

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { onEvent(stageCompleted)}
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { onEvent(stageSubmitted)}
}

{code}
 

scala> :paste
{code:java}
import org.apache.spark.InfaListener
implicit def df2idf(d:DataFrame):InfaDataFrame = new InfaDataFrame(d);
val sqlc = spark.sqlContext
val sc = spark.sparkContext
val lis = new InfaListener("TAG")
sc.addSparkListener(lis)

sqlc.sql("DROP TABLE IF EXISTS `default`.`read1`")
sqlc.sql("CREATE TABLE `default`.`read1` (`col0` STRING) LOCATION '/tmp/INFA_UNION1'")
sqlc.sql("DROP TABLE IF EXISTS `default`.`read2`")
sqlc.sql("CREATE TABLE `default`.`read2` (`col0` STRING) LOCATION '/tmp/INFA_UNION2'")
sqlc.sql("DROP TABLE IF EXISTS `default`.`write1`")
sqlc.sql("CREATE TABLE `default`.`write1` (`col0` STRING)")
sqlc.sql("DROP TABLE IF EXISTS `default`.`write2`")
sqlc.sql("CREATE TABLE `default`.`write2` (`col0` STRING)")

val v0 = sqlc.sql("SELECT `read1`.`col0` as a0 FROM `default`.`read1`").itoDF.persist(MEMORY_AND_DISK).where(lit(true));
async(asBlock(sqlc.sql("INSERT OVERWRITE TABLE `default`.`write1` SELECT tbl0.c0 as a0 FROM tbl0"), v0.unionAll(sqlc.sql("SELECT `read2`.`col0` as a0 FROM `default`.`read2`").itoDF).itoDF("TGT_").itoDF("c").createOrReplaceTempView("tbl0")));
async(asBlock(sqlc.sql("INSERT OVERWRITE TABLE `default`.`write2` SELECT tbl1.c0 as a0 FROM tbl1"), v0.itoDF("TGT_").itoDF("c").createOrReplaceTempView("tbl1")));
stop;
{code}
*NOTE* - The above code refers to 2 file directories /tmp/INFA_UNION1 and /tmp/INFA_UNION2. We have attached the files which need to be copied to the above locations after these directories are created.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org