You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Ivan Tsukanov (Jira)" <ji...@apache.org> on 2021/05/25 08:46:00 UTC

[jira] [Updated] (SPARK-35511) Spark computes all rows during count() on a parquet file

     [ https://issues.apache.org/jira/browse/SPARK-35511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ivan Tsukanov updated SPARK-35511:
----------------------------------
    Description: 
We expect spark uses parquet metadata to fetch the rows count of a parquet file. But when we execute the following code 
{code:java}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}

object Test extends App {
  val sparkConf = new SparkConf()
    .setAppName("test-app")
    .setMaster("local[1]")

  val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
  import sparkSession.implicits._

  val filePath = "./tempFile.parquet"
  (1 to 1000).toDF("c1")
    .repartition(10)
    .write
    .mode("overwrite")
    .parquet(filePath)

  val df = sparkSession.read.parquet(filePath)

  var rowsInHeavyComputation = 0
  def heavyComputation(row: Row): Row = {
    rowsInHeavyComputation += 1
    println(s"rowsInHeavyComputation = $rowsInHeavyComputation")
    Thread.sleep(50)
    row
  }

  implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema)
  val cnt = df
    .map(row => heavyComputation(row)) // map operation cannot change number of rows 
    .count()
  println(s"counting done, cnt=$cnt")
}
{code}
we see 
{code:java}
rowsInHeavyComputation = 1
rowsInHeavyComputation = 2
...
rowsInHeavyComputation = 999
rowsInHeavyComputation = 1000
counting done, cnt=1000
{code}
 *Expected result* - spark does not perform heavyComputation at all.

 

P.S. In our real application we:
 - transform data from parquet files
 - return some examples (50 rows and spark does heavyComputation only for 50 rows)
 - return rows count of the whole DataFrame and here spark for some reason computes the whole DataFrame despite the fact there are only map operations and initial rows count can be gotten from parquet meta

 

 

  was:
We expect spark uses parquet metadata to fetch the rows count of a parquet file. But when we execute the following code

 
{code:java}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}

object Test extends App {
  val sparkConf = new SparkConf()
    .setAppName("test-app")
    .setMaster("local[1]")

  val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
  import sparkSession.implicits._

  val filePath = "./tempFile.parquet"
  (1 to 1000).toDF("c1")
    .repartition(10)
    .write
    .mode("overwrite")
    .parquet(filePath)

  val df = sparkSession.read.parquet(filePath)

  var rowsInHeavyComputation = 0
  def heavyComputation(row: Row): Row = {
    rowsInHeavyComputation += 1
    println(s"rowsInHeavyComputation = $rowsInHeavyComputation")
    Thread.sleep(50)
    row
  }

  implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema)
  val cnt = df
    .map(row => heavyComputation(row)) // map operation cannot change number of rows 
    .count()
  println(s"counting done, cnt=$cnt")
}
{code}
we see

 

 
{code:java}
rowsInHeavyComputation = 1
rowsInHeavyComputation = 2
...
rowsInHeavyComputation = 999
rowsInHeavyComputation = 1000
counting done, cnt=1000
{code}
 

Expected result - spark does not perform heavyComputation at all.

 

P.S. In our real application we
- transform data from parquet files
- return some examples (50 rows and spark does heavyComputation only for 50 rows)
- return rows count of the whole DataFrame and here spark for some reason computes the whole DataFrame despite the fact there are only map operations and initial rows count can be gotten from parquet meta

 

 


> Spark computes all rows during count() on a parquet file
> --------------------------------------------------------
>
>                 Key: SPARK-35511
>                 URL: https://issues.apache.org/jira/browse/SPARK-35511
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.0.0
>            Reporter: Ivan Tsukanov
>            Priority: Major
>
> We expect spark uses parquet metadata to fetch the rows count of a parquet file. But when we execute the following code 
> {code:java}
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.{Row, SparkSession}
> import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
> object Test extends App {
>   val sparkConf = new SparkConf()
>     .setAppName("test-app")
>     .setMaster("local[1]")
>   val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
>   import sparkSession.implicits._
>   val filePath = "./tempFile.parquet"
>   (1 to 1000).toDF("c1")
>     .repartition(10)
>     .write
>     .mode("overwrite")
>     .parquet(filePath)
>   val df = sparkSession.read.parquet(filePath)
>   var rowsInHeavyComputation = 0
>   def heavyComputation(row: Row): Row = {
>     rowsInHeavyComputation += 1
>     println(s"rowsInHeavyComputation = $rowsInHeavyComputation")
>     Thread.sleep(50)
>     row
>   }
>   implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema)
>   val cnt = df
>     .map(row => heavyComputation(row)) // map operation cannot change number of rows 
>     .count()
>   println(s"counting done, cnt=$cnt")
> }
> {code}
> we see 
> {code:java}
> rowsInHeavyComputation = 1
> rowsInHeavyComputation = 2
> ...
> rowsInHeavyComputation = 999
> rowsInHeavyComputation = 1000
> counting done, cnt=1000
> {code}
>  *Expected result* - spark does not perform heavyComputation at all.
>  
> P.S. In our real application we:
>  - transform data from parquet files
>  - return some examples (50 rows and spark does heavyComputation only for 50 rows)
>  - return rows count of the whole DataFrame and here spark for some reason computes the whole DataFrame despite the fact there are only map operations and initial rows count can be gotten from parquet meta
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org