You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Amar Agrawal (JIRA)" <ji...@apache.org> on 2019/06/26 09:57:00 UTC

[jira] [Updated] (SPARK-26683) Incorrect value of "internal.metrics.input.recordsRead" when reading from temp hive table backed by HDFS file

     [ https://issues.apache.org/jira/browse/SPARK-26683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Amar Agrawal updated SPARK-26683:
---------------------------------
    Fix Version/s: 3.0.0

> Incorrect value of "internal.metrics.input.recordsRead" when reading from temp hive table backed by HDFS file
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-26683
>                 URL: https://issues.apache.org/jira/browse/SPARK-26683
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.3.1
>            Reporter: Amar Agrawal
>            Priority: Major
>             Fix For: 3.0.0
>
>         Attachments: asyncfactory.scala, input1.txt, input2.txt
>
>
> *Issue description*
> The summary of the issue is - when persisted DataFrame is used in two different concurrent threads, we are getting wrong value of *internal.metrics.input.recordsRead* in SparkListenerStageCompleted event.
>  
> *Issue Details* 
> The spark code I have written has 2 source temp hive tables. When the first temp table is read, it's dataframe is persisted. Whereas, for the other temp table, its source dataframe is not persisted. After that, we have 2 pipelines which we run in async fashion. In the 1st pipeline, the persisted dataframe is written to some hive target table. Whereas, in the 2nd pipeline, we are performing a UNION of persisted dataframe with non-persisted dataframe, which is then written to a separate hive table.
> Our expectation is, since the first dataframe is persisted, its metric for recordsRead should be computed exactly once. But in our case, we are seeing an increased value of the metric.
> Example - if my persisted dataframe has 2 rows, the above mentioned metric is consistently reporting it as 3 rows.
>  
> *Steps to reproduce Issue:*
>  # Create directory /tmp/INFA_UNION1 and copy input1.txt to this directory.
>  # Create directory /tmp/INFA_UNION2 and copy input2.txt to this directory.
>  # Run the following in spark-shell:
> scala> :load asyncfactory.scala
> scala> : paste -raw
>  
> {code:java}
> package org.apache.spark
> import org.apache.spark.scheduler._
> import org.apache.spark.util.JsonProtocol
> import org.json4s.jackson.JsonMethods._
> class InfaListener(mode:String="ACCUMULATOR") extends org.apache.spark.scheduler.SparkListener {
> def onEvent(event: SparkListenerEvent): Unit = {
> val jv = JsonProtocol.sparkEventToJson(event)
> println(compact(jv))
> }
> override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { onEvent(stageCompleted)}
> override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { onEvent(stageSubmitted)}
> }
> {code}
>  
> scala> :paste
> {code:java}
> import org.apache.spark.InfaListener
> implicit def df2idf(d:DataFrame):InfaDataFrame = new InfaDataFrame(d);
> val sqlc = spark.sqlContext
> val sc = spark.sparkContext
> val lis = new InfaListener("TAG")
> sc.addSparkListener(lis)
> sqlc.sql("DROP TABLE IF EXISTS `default`.`read1`")
> sqlc.sql("CREATE TABLE `default`.`read1` (`col0` STRING) LOCATION '/tmp/INFA_UNION1'")
> sqlc.sql("DROP TABLE IF EXISTS `default`.`read2`")
> sqlc.sql("CREATE TABLE `default`.`read2` (`col0` STRING) LOCATION '/tmp/INFA_UNION2'")
> sqlc.sql("DROP TABLE IF EXISTS `default`.`write1`")
> sqlc.sql("CREATE TABLE `default`.`write1` (`col0` STRING)")
> sqlc.sql("DROP TABLE IF EXISTS `default`.`write2`")
> sqlc.sql("CREATE TABLE `default`.`write2` (`col0` STRING)")
> val v0 = sqlc.sql("SELECT `read1`.`col0` as a0 FROM `default`.`read1`").itoDF.persist(MEMORY_AND_DISK).where(lit(true));
> async(asBlock(sqlc.sql("INSERT OVERWRITE TABLE `default`.`write1` SELECT tbl0.c0 as a0 FROM tbl0"), v0.unionAll(sqlc.sql("SELECT `read2`.`col0` as a0 FROM `default`.`read2`").itoDF).itoDF("TGT_").itoDF("c").createOrReplaceTempView("tbl0")));
> async(asBlock(sqlc.sql("INSERT OVERWRITE TABLE `default`.`write2` SELECT tbl1.c0 as a0 FROM tbl1"), v0.itoDF("TGT_").itoDF("c").createOrReplaceTempView("tbl1")));
> stop;
> {code}
> *NOTE* - The above code refers to 2 file directories /tmp/INFA_UNION1 and /tmp/INFA_UNION2. We have attached the files which need to be copied to the above locations after these directories are created.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org