You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yuming Wang (Jira)" <ji...@apache.org> on 2021/02/27 07:15:00 UTC

[jira] [Comment Edited] (SPARK-34542) Upgrade Parquet to 1.12.0

    [ https://issues.apache.org/jira/browse/SPARK-34542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17292041#comment-17292041 ] 

Yuming Wang edited comment on SPARK-34542 at 2/27/21, 7:14 AM:
---------------------------------------------------------------

Benchmark code:
{code:scala}
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.sql.execution.benchmark

import java.io.File

import scala.util.Random

import org.apache.spark.SparkConf
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.{monotonically_increasing_id, timestamp_seconds}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
import org.apache.spark.sql.types.{ByteType, Decimal, DecimalType}

object ParquetFilterPushdownBenchmark extends SqlBasedBenchmark {

  override def getSparkSession: SparkSession = {
    val conf = new SparkConf()
      .setAppName(this.getClass.getSimpleName)
      // Since `spark.master` always exists, overrides this value
      .set("spark.master", "local[1]")
      .setIfMissing("spark.driver.memory", "3g")
      .setIfMissing("spark.executor.memory", "3g")
      .setIfMissing("orc.compression", "snappy")
      .setIfMissing("spark.sql.parquet.compression.codec", "snappy")

    SparkSession.builder().config(conf).getOrCreate()
  }

  private val numRows = 1024 * 1024 * 15
  private val width = 5
  private val mid = numRows / 2

  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
    try f finally tableNames.foreach(spark.catalog.dropTempView)
  }

  private def prepareTable(
      dir: File, numRows: Int, width: Int, useStringForValue: Boolean): Unit = {
    import spark.implicits._
    val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i")
    val valueCol = if (useStringForValue) {
      monotonically_increasing_id().cast("string")
    } else {
      monotonically_increasing_id()
    }
    val df = spark.range(numRows).map(_ => Random.nextLong).selectExpr(selectExpr: _*)
      .withColumn("value", valueCol)
      .sort("value")

    saveAsTable(df, dir)
  }

  private def prepareStringDictTable(
      dir: File, numRows: Int, numDistinctValues: Int, width: Int): Unit = {
    val selectExpr = (0 to width).map {
      case 0 => s"CAST(id % $numDistinctValues AS STRING) AS value"
      case i => s"CAST(rand() AS STRING) c$i"
    }
    val df = spark.range(numRows).selectExpr(selectExpr: _*).sort("value")

    saveAsTable(df, dir)
  }

  private def saveAsTable(df: DataFrame, dir: File): Unit = {
    val parquetPath = dir.getCanonicalPath + "/parquet"
    df.write.mode("overwrite").parquet(parquetPath)
    spark.read.parquet(parquetPath).createOrReplaceTempView("parquetTable")
  }

  def filterPushDownBenchmark(
     values: Int,
     title: String,
     whereExpr: String,
     selectExpr: String = "*"): Unit = {
    val benchmark = new Benchmark(title, values, minNumIters = 5, output = output)

    Seq(false, true).foreach { pushDownEnabled =>
      val name = s"Parquet Vectorized ${if (pushDownEnabled) s"(Pushdown)" else ""}"
      benchmark.addCase(name) { _ =>
        withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> s"$pushDownEnabled") {
          spark.sql(s"SELECT $selectExpr FROM parquetTable WHERE $whereExpr").noop()
        }
      }
    }

    benchmark.run()
  }

  private def runIntBenchmark(numRows: Int, width: Int, mid: Int): Unit = {
    Seq("value IS NULL", s"$mid < value AND value < $mid").foreach { whereExpr =>
      val title = s"Select 0 int row ($whereExpr)".replace("value AND value", "value")
      filterPushDownBenchmark(numRows, title, whereExpr)
    }

    Seq(
      s"value = $mid",
      s"value <=> $mid",
      s"$mid <= value AND value <= $mid",
      s"${mid - 1} < value AND value < ${mid + 1}"
    ).foreach { whereExpr =>
      val title = s"Select 1 int row ($whereExpr)".replace("value AND value", "value")
      filterPushDownBenchmark(numRows, title, whereExpr)
    }

    val selectExpr = (1 to width).map(i => s"MAX(c$i)").mkString("", ",", ", MAX(value)")

    Seq(10, 50, 90).foreach { percent =>
      filterPushDownBenchmark(
        numRows,
        s"Select $percent% int rows (value < ${numRows * percent / 100})",
        s"value < ${numRows * percent / 100}",
        selectExpr
      )
    }

    Seq("value IS NOT NULL", "value > -1", "value != -1").foreach { whereExpr =>
      filterPushDownBenchmark(
        numRows,
        s"Select all int rows ($whereExpr)",
        whereExpr,
        selectExpr)
    }
  }

  private def runStringBenchmark(
      numRows: Int, width: Int, searchValue: Int, colType: String): Unit = {
    Seq("value IS NULL", s"'$searchValue' < value AND value < '$searchValue'")
      .foreach { whereExpr =>
        val title = s"Select 0 $colType row ($whereExpr)".replace("value AND value", "value")
        filterPushDownBenchmark(numRows, title, whereExpr)
      }

    Seq(
      s"value = '$searchValue'",
      s"value <=> '$searchValue'",
      s"'$searchValue' <= value AND value <= '$searchValue'"
    ).foreach { whereExpr =>
      val title = s"Select 1 $colType row ($whereExpr)".replace("value AND value", "value")
      filterPushDownBenchmark(numRows, title, whereExpr)
    }

    val selectExpr = (1 to width).map(i => s"MAX(c$i)").mkString("", ",", ", MAX(value)")

    Seq("value IS NOT NULL").foreach { whereExpr =>
      filterPushDownBenchmark(
        numRows,
        s"Select all $colType rows ($whereExpr)",
        whereExpr,
        selectExpr)
    }
  }

  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
    runBenchmark("Pushdown for many distinct value case") {
      withTempPath { dir =>
        withTempTable("parquetTable") {
          Seq(true, false).foreach { useStringForValue =>
            prepareTable(dir, numRows, width, useStringForValue)
            if (useStringForValue) {
              runStringBenchmark(numRows, width, mid, "string")
            } else {
              runIntBenchmark(numRows, width, mid)
            }
          }
        }
      }
    }

    runBenchmark("Pushdown for few distinct value case (use dictionary encoding)") {
      withTempPath { dir =>
        val numDistinctValues = 200

        withTempTable("parquetTable") {
          prepareStringDictTable(dir, numRows, numDistinctValues, width)
          runStringBenchmark(numRows, width, numDistinctValues / 2, "distinct string")
        }
      }
    }

    runBenchmark("Pushdown benchmark for StringStartsWith") {
      withTempPath { dir =>
        withTempTable("parquetTable") {
          prepareTable(dir, numRows, width, true)
          Seq(
            "value like '10%'",
            "value like '1000%'",
            s"value like '${mid.toString.substring(0, mid.toString.length - 1)}%'"
          ).foreach { whereExpr =>
            val title = s"StringStartsWith filter: ($whereExpr)"
            filterPushDownBenchmark(numRows, title, whereExpr)
          }
        }
      }
    }

    runBenchmark(s"Pushdown benchmark for ${DecimalType.simpleString}") {
      withTempPath { dir =>
        Seq(
          s"decimal(${Decimal.MAX_INT_DIGITS}, 2)",
          s"decimal(${Decimal.MAX_LONG_DIGITS}, 2)",
          s"decimal(${DecimalType.MAX_PRECISION}, 2)"
        ).foreach { dt =>
          val columns = (1 to width).map(i => s"CAST(id AS string) c$i")
          val valueCol = if (dt.equalsIgnoreCase(s"decimal(${Decimal.MAX_INT_DIGITS}, 2)")) {
            monotonically_increasing_id() % 9999999
          } else {
            monotonically_increasing_id()
          }
          val df = spark.range(numRows)
            .selectExpr(columns: _*).withColumn("value", valueCol.cast(dt))
          withTempTable("parquetTable") {
            saveAsTable(df, dir)

            Seq(s"value = $mid").foreach { whereExpr =>
              val title = s"Select 1 $dt row ($whereExpr)".replace("value AND value", "value")
              filterPushDownBenchmark(numRows, title, whereExpr)
            }

            val selectExpr = (1 to width).map(i => s"MAX(c$i)").mkString("", ",", ", MAX(value)")
            Seq(10, 50, 90).foreach { percent =>
              filterPushDownBenchmark(
                numRows,
                s"Select $percent% $dt rows (value < ${numRows * percent / 100})",
                s"value < ${numRows * percent / 100}",
                selectExpr
              )
            }
          }
        }
      }
    }

    runBenchmark("Pushdown benchmark for InSet -> InFilters") {
      withTempPath { dir =>
        withTempTable("parquetTable") {
          prepareTable(dir, numRows, width, false)
          Seq(5, 10, 50, 100).foreach { count =>
            Seq(10, 50, 90).foreach { distribution =>
              val filter =
                Range(0, count).map(r => scala.util.Random.nextInt(numRows * distribution / 100))
              val whereExpr = s"value in(${filter.mkString(",")})"
              val title = s"InSet -> InFilters (values count: $count, distribution: $distribution)"
              filterPushDownBenchmark(numRows, title, whereExpr)
            }
          }
        }
      }
    }

    runBenchmark(s"Pushdown benchmark for ${ByteType.simpleString}") {
      withTempPath { dir =>
        val columns = (1 to width).map(i => s"CAST(id AS string) c$i")
        val df = spark.range(numRows).selectExpr(columns: _*)
          .withColumn("value", (monotonically_increasing_id() % Byte.MaxValue).cast(ByteType))
          .orderBy("value")
        withTempTable("parquetTable") {
          saveAsTable(df, dir)

          Seq(s"value = CAST(${Byte.MaxValue / 2} AS ${ByteType.simpleString})")
            .foreach { whereExpr =>
              val title = s"Select 1 ${ByteType.simpleString} row ($whereExpr)"
                .replace("value AND value", "value")
              filterPushDownBenchmark(numRows, title, whereExpr)
            }

          val selectExpr = (1 to width).map(i => s"MAX(c$i)").mkString("", ",", ", MAX(value)")
          Seq(10, 50, 90).foreach { percent =>
            filterPushDownBenchmark(
              numRows,
              s"Select $percent% ${ByteType.simpleString} rows " +
                s"(value < CAST(${Byte.MaxValue * percent / 100} AS ${ByteType.simpleString}))",
              s"value < CAST(${Byte.MaxValue * percent / 100} AS ${ByteType.simpleString})",
              selectExpr
            )
          }
        }
      }
    }

    runBenchmark(s"Pushdown benchmark for Timestamp") {
      withTempPath { dir =>
        withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED.key -> true.toString) {
          ParquetOutputTimestampType.values.toSeq.map(_.toString).foreach { fileType =>
            withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> fileType) {
              val columns = (1 to width).map(i => s"CAST(id AS string) c$i")
              val df = spark.range(numRows).selectExpr(columns: _*)
                .withColumn("value", timestamp_seconds(monotonically_increasing_id()))
              withTempTable("parquetTable") {
                saveAsTable(df, dir)

                Seq(s"value = timestamp_seconds($mid)").foreach { whereExpr =>
                  val title = s"Select 1 timestamp stored as $fileType row ($whereExpr)"
                    .replace("value AND value", "value")
                  filterPushDownBenchmark(numRows, title, whereExpr)
                }

                val selectExpr = (1 to width)
                  .map(i => s"MAX(c$i)").mkString("", ",", ", MAX(value)")
                Seq(10, 50, 90).foreach { percent =>
                  filterPushDownBenchmark(
                    numRows,
                    s"Select $percent% timestamp stored as $fileType rows " +
                      s"(value < timestamp_seconds(${numRows * percent / 100}))",
                    s"value < timestamp_seconds(${numRows * percent / 100})",
                    selectExpr
                  )
                }
              }
            }
          }
        }
      }
    }

    runBenchmark(s"Pushdown benchmark with many filters") {
      val numRows = 1
      val width = 500

      withTempPath { dir =>
        val columns = (1 to width).map(i => s"id c$i")
        val df = spark.range(1).selectExpr(columns: _*)
        withTempTable("parquetTable") {
          saveAsTable(df, dir)
          Seq(1, 250, 500).foreach { numFilter =>
            val whereExpr = (1 to numFilter).map(i => s"c$i = 0").mkString(" and ")
            // Note: InferFiltersFromConstraints will add more filters to this given filters
            filterPushDownBenchmark(numRows, s"Select 1 row with $numFilter filters", whereExpr)
          }
        }
      }
    }
  }
}
{code}



was (Author: q79969786):
{code:scala}
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.sql.execution.benchmark

import java.io.File

import scala.util.Random

import org.apache.spark.SparkConf
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.{monotonically_increasing_id, timestamp_seconds}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
import org.apache.spark.sql.types.{ByteType, Decimal, DecimalType}

object ParquetFilterPushdownBenchmark extends SqlBasedBenchmark {

  override def getSparkSession: SparkSession = {
    val conf = new SparkConf()
      .setAppName(this.getClass.getSimpleName)
      // Since `spark.master` always exists, overrides this value
      .set("spark.master", "local[1]")
      .setIfMissing("spark.driver.memory", "3g")
      .setIfMissing("spark.executor.memory", "3g")
      .setIfMissing("orc.compression", "snappy")
      .setIfMissing("spark.sql.parquet.compression.codec", "snappy")

    SparkSession.builder().config(conf).getOrCreate()
  }

  private val numRows = 1024 * 1024 * 15
  private val width = 5
  private val mid = numRows / 2

  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
    try f finally tableNames.foreach(spark.catalog.dropTempView)
  }

  private def prepareTable(
      dir: File, numRows: Int, width: Int, useStringForValue: Boolean): Unit = {
    import spark.implicits._
    val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i")
    val valueCol = if (useStringForValue) {
      monotonically_increasing_id().cast("string")
    } else {
      monotonically_increasing_id()
    }
    val df = spark.range(numRows).map(_ => Random.nextLong).selectExpr(selectExpr: _*)
      .withColumn("value", valueCol)
      .sort("value")

    saveAsTable(df, dir)
  }

  private def prepareStringDictTable(
      dir: File, numRows: Int, numDistinctValues: Int, width: Int): Unit = {
    val selectExpr = (0 to width).map {
      case 0 => s"CAST(id % $numDistinctValues AS STRING) AS value"
      case i => s"CAST(rand() AS STRING) c$i"
    }
    val df = spark.range(numRows).selectExpr(selectExpr: _*).sort("value")

    saveAsTable(df, dir)
  }

  private def saveAsTable(df: DataFrame, dir: File): Unit = {
    val parquetPath = dir.getCanonicalPath + "/parquet"
    df.write.mode("overwrite").parquet(parquetPath)
    spark.read.parquet(parquetPath).createOrReplaceTempView("parquetTable")
  }

  def filterPushDownBenchmark(
     values: Int,
     title: String,
     whereExpr: String,
     selectExpr: String = "*"): Unit = {
    val benchmark = new Benchmark(title, values, minNumIters = 5, output = output)

    Seq(false, true).foreach { pushDownEnabled =>
      val name = s"Parquet Vectorized ${if (pushDownEnabled) s"(Pushdown)" else ""}"
      benchmark.addCase(name) { _ =>
        withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> s"$pushDownEnabled") {
          spark.sql(s"SELECT $selectExpr FROM parquetTable WHERE $whereExpr").noop()
        }
      }
    }

    benchmark.run()
  }

  private def runIntBenchmark(numRows: Int, width: Int, mid: Int): Unit = {
    Seq("value IS NULL", s"$mid < value AND value < $mid").foreach { whereExpr =>
      val title = s"Select 0 int row ($whereExpr)".replace("value AND value", "value")
      filterPushDownBenchmark(numRows, title, whereExpr)
    }

    Seq(
      s"value = $mid",
      s"value <=> $mid",
      s"$mid <= value AND value <= $mid",
      s"${mid - 1} < value AND value < ${mid + 1}"
    ).foreach { whereExpr =>
      val title = s"Select 1 int row ($whereExpr)".replace("value AND value", "value")
      filterPushDownBenchmark(numRows, title, whereExpr)
    }

    val selectExpr = (1 to width).map(i => s"MAX(c$i)").mkString("", ",", ", MAX(value)")

    Seq(10, 50, 90).foreach { percent =>
      filterPushDownBenchmark(
        numRows,
        s"Select $percent% int rows (value < ${numRows * percent / 100})",
        s"value < ${numRows * percent / 100}",
        selectExpr
      )
    }

    Seq("value IS NOT NULL", "value > -1", "value != -1").foreach { whereExpr =>
      filterPushDownBenchmark(
        numRows,
        s"Select all int rows ($whereExpr)",
        whereExpr,
        selectExpr)
    }
  }

  private def runStringBenchmark(
      numRows: Int, width: Int, searchValue: Int, colType: String): Unit = {
    Seq("value IS NULL", s"'$searchValue' < value AND value < '$searchValue'")
      .foreach { whereExpr =>
        val title = s"Select 0 $colType row ($whereExpr)".replace("value AND value", "value")
        filterPushDownBenchmark(numRows, title, whereExpr)
      }

    Seq(
      s"value = '$searchValue'",
      s"value <=> '$searchValue'",
      s"'$searchValue' <= value AND value <= '$searchValue'"
    ).foreach { whereExpr =>
      val title = s"Select 1 $colType row ($whereExpr)".replace("value AND value", "value")
      filterPushDownBenchmark(numRows, title, whereExpr)
    }

    val selectExpr = (1 to width).map(i => s"MAX(c$i)").mkString("", ",", ", MAX(value)")

    Seq("value IS NOT NULL").foreach { whereExpr =>
      filterPushDownBenchmark(
        numRows,
        s"Select all $colType rows ($whereExpr)",
        whereExpr,
        selectExpr)
    }
  }

  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
    runBenchmark("Pushdown for many distinct value case") {
      withTempPath { dir =>
        withTempTable("parquetTable") {
          Seq(true, false).foreach { useStringForValue =>
            prepareTable(dir, numRows, width, useStringForValue)
            if (useStringForValue) {
              runStringBenchmark(numRows, width, mid, "string")
            } else {
              runIntBenchmark(numRows, width, mid)
            }
          }
        }
      }
    }

    runBenchmark("Pushdown for few distinct value case (use dictionary encoding)") {
      withTempPath { dir =>
        val numDistinctValues = 200

        withTempTable("parquetTable") {
          prepareStringDictTable(dir, numRows, numDistinctValues, width)
          runStringBenchmark(numRows, width, numDistinctValues / 2, "distinct string")
        }
      }
    }

    runBenchmark("Pushdown benchmark for StringStartsWith") {
      withTempPath { dir =>
        withTempTable("parquetTable") {
          prepareTable(dir, numRows, width, true)
          Seq(
            "value like '10%'",
            "value like '1000%'",
            s"value like '${mid.toString.substring(0, mid.toString.length - 1)}%'"
          ).foreach { whereExpr =>
            val title = s"StringStartsWith filter: ($whereExpr)"
            filterPushDownBenchmark(numRows, title, whereExpr)
          }
        }
      }
    }

    runBenchmark(s"Pushdown benchmark for ${DecimalType.simpleString}") {
      withTempPath { dir =>
        Seq(
          s"decimal(${Decimal.MAX_INT_DIGITS}, 2)",
          s"decimal(${Decimal.MAX_LONG_DIGITS}, 2)",
          s"decimal(${DecimalType.MAX_PRECISION}, 2)"
        ).foreach { dt =>
          val columns = (1 to width).map(i => s"CAST(id AS string) c$i")
          val valueCol = if (dt.equalsIgnoreCase(s"decimal(${Decimal.MAX_INT_DIGITS}, 2)")) {
            monotonically_increasing_id() % 9999999
          } else {
            monotonically_increasing_id()
          }
          val df = spark.range(numRows)
            .selectExpr(columns: _*).withColumn("value", valueCol.cast(dt))
          withTempTable("parquetTable") {
            saveAsTable(df, dir)

            Seq(s"value = $mid").foreach { whereExpr =>
              val title = s"Select 1 $dt row ($whereExpr)".replace("value AND value", "value")
              filterPushDownBenchmark(numRows, title, whereExpr)
            }

            val selectExpr = (1 to width).map(i => s"MAX(c$i)").mkString("", ",", ", MAX(value)")
            Seq(10, 50, 90).foreach { percent =>
              filterPushDownBenchmark(
                numRows,
                s"Select $percent% $dt rows (value < ${numRows * percent / 100})",
                s"value < ${numRows * percent / 100}",
                selectExpr
              )
            }
          }
        }
      }
    }

    runBenchmark("Pushdown benchmark for InSet -> InFilters") {
      withTempPath { dir =>
        withTempTable("parquetTable") {
          prepareTable(dir, numRows, width, false)
          Seq(5, 10, 50, 100).foreach { count =>
            Seq(10, 50, 90).foreach { distribution =>
              val filter =
                Range(0, count).map(r => scala.util.Random.nextInt(numRows * distribution / 100))
              val whereExpr = s"value in(${filter.mkString(",")})"
              val title = s"InSet -> InFilters (values count: $count, distribution: $distribution)"
              filterPushDownBenchmark(numRows, title, whereExpr)
            }
          }
        }
      }
    }

    runBenchmark(s"Pushdown benchmark for ${ByteType.simpleString}") {
      withTempPath { dir =>
        val columns = (1 to width).map(i => s"CAST(id AS string) c$i")
        val df = spark.range(numRows).selectExpr(columns: _*)
          .withColumn("value", (monotonically_increasing_id() % Byte.MaxValue).cast(ByteType))
          .orderBy("value")
        withTempTable("parquetTable") {
          saveAsTable(df, dir)

          Seq(s"value = CAST(${Byte.MaxValue / 2} AS ${ByteType.simpleString})")
            .foreach { whereExpr =>
              val title = s"Select 1 ${ByteType.simpleString} row ($whereExpr)"
                .replace("value AND value", "value")
              filterPushDownBenchmark(numRows, title, whereExpr)
            }

          val selectExpr = (1 to width).map(i => s"MAX(c$i)").mkString("", ",", ", MAX(value)")
          Seq(10, 50, 90).foreach { percent =>
            filterPushDownBenchmark(
              numRows,
              s"Select $percent% ${ByteType.simpleString} rows " +
                s"(value < CAST(${Byte.MaxValue * percent / 100} AS ${ByteType.simpleString}))",
              s"value < CAST(${Byte.MaxValue * percent / 100} AS ${ByteType.simpleString})",
              selectExpr
            )
          }
        }
      }
    }

    runBenchmark(s"Pushdown benchmark for Timestamp") {
      withTempPath { dir =>
        withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED.key -> true.toString) {
          ParquetOutputTimestampType.values.toSeq.map(_.toString).foreach { fileType =>
            withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> fileType) {
              val columns = (1 to width).map(i => s"CAST(id AS string) c$i")
              val df = spark.range(numRows).selectExpr(columns: _*)
                .withColumn("value", timestamp_seconds(monotonically_increasing_id()))
              withTempTable("parquetTable") {
                saveAsTable(df, dir)

                Seq(s"value = timestamp_seconds($mid)").foreach { whereExpr =>
                  val title = s"Select 1 timestamp stored as $fileType row ($whereExpr)"
                    .replace("value AND value", "value")
                  filterPushDownBenchmark(numRows, title, whereExpr)
                }

                val selectExpr = (1 to width)
                  .map(i => s"MAX(c$i)").mkString("", ",", ", MAX(value)")
                Seq(10, 50, 90).foreach { percent =>
                  filterPushDownBenchmark(
                    numRows,
                    s"Select $percent% timestamp stored as $fileType rows " +
                      s"(value < timestamp_seconds(${numRows * percent / 100}))",
                    s"value < timestamp_seconds(${numRows * percent / 100})",
                    selectExpr
                  )
                }
              }
            }
          }
        }
      }
    }

    runBenchmark(s"Pushdown benchmark with many filters") {
      val numRows = 1
      val width = 500

      withTempPath { dir =>
        val columns = (1 to width).map(i => s"id c$i")
        val df = spark.range(1).selectExpr(columns: _*)
        withTempTable("parquetTable") {
          saveAsTable(df, dir)
          Seq(1, 250, 500).foreach { numFilter =>
            val whereExpr = (1 to numFilter).map(i => s"c$i = 0").mkString(" and ")
            // Note: InferFiltersFromConstraints will add more filters to this given filters
            filterPushDownBenchmark(numRows, s"Select 1 row with $numFilter filters", whereExpr)
          }
        }
      }
    }
  }
}
{code}


> Upgrade Parquet to 1.12.0
> -------------------------
>
>                 Key: SPARK-34542
>                 URL: https://issues.apache.org/jira/browse/SPARK-34542
>             Project: Spark
>          Issue Type: Improvement
>          Components: Build, SQL
>    Affects Versions: 3.2.0
>            Reporter: Yuming Wang
>            Priority: Major
>
> Parquet-1.12.0 release notes:
> https://github.com/apache/parquet-mr/blob/apache-parquet-1.12.0-rc2/CHANGES.md



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org