You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/03/30 08:53:39 UTC

[spark] branch branch-3.0 updated: [SPARK-31296][SQL][TESTS] Benchmark date-time rebasing in Parquet datasource

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 3ab2f88  [SPARK-31296][SQL][TESTS] Benchmark date-time rebasing in Parquet datasource
3ab2f88 is described below

commit 3ab2f8877e4618031b3258cb90eb249145077e08
Author: Maxim Gekk <ma...@gmail.com>
AuthorDate: Mon Mar 30 16:46:31 2020 +0800

    [SPARK-31296][SQL][TESTS] Benchmark date-time rebasing in Parquet datasource
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to add new benchmark `DateTimeRebaseBenchmark` which should measure the performance of rebasing of dates/timestamps from/to to the hybrid calendar (Julian+Gregorian) to/from Proleptic Gregorian calendar:
    1. In write, it saves separately dates and timestamps before and after 1582 year w/ and w/o rebasing.
    2. In read, it loads previously saved parquet files by vectorized reader and by regular reader.
    
    Here is the summary of benchmarking:
    - Saving timestamps is **~6 times slower**
    - Loading timestamps w/ vectorized **off** is **~4 times slower**
    - Loading timestamps w/ vectorized **on** is **~10 times slower**
    
    ### Why are the changes needed?
    To know the impact of date-time rebasing introduced by #27915, #27953, #27807.
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    Run the `DateTimeRebaseBenchmark` benchmark using Amazon EC2:
    
    | Item | Description |
    | ---- | ----|
    | Region | us-west-2 (Oregon) |
    | Instance | r3.xlarge |
    | AMI | ubuntu/images/hvm-ssd/ubuntu-bionic-18.04-amd64-server-20190722.1 (ami-06f2f779464715dc5) |
    | Java | OpenJDK8/11 |
    
    Closes #28057 from MaxGekk/rebase-bechmark.
    
    Lead-authored-by: Maxim Gekk <ma...@gmail.com>
    Co-authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit a1dbcd13a3eeaee50cc1a46e909f9478d6d55177)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../DateTimeRebaseBenchmark-jdk11-results.txt      |  53 +++++++
 .../benchmarks/DateTimeRebaseBenchmark-results.txt |  53 +++++++
 .../benchmark/DateTimeRebaseBenchmark.scala        | 161 +++++++++++++++++++++
 3 files changed, 267 insertions(+)

diff --git a/sql/core/benchmarks/DateTimeRebaseBenchmark-jdk11-results.txt b/sql/core/benchmarks/DateTimeRebaseBenchmark-jdk11-results.txt
new file mode 100644
index 0000000..52522f8
--- /dev/null
+++ b/sql/core/benchmarks/DateTimeRebaseBenchmark-jdk11-results.txt
@@ -0,0 +1,53 @@
+================================================================================================
+Rebasing dates/timestamps in Parquet datasource
+================================================================================================
+
+OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1058-aws
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Save dates to parquet:                    Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+after 1582, noop                                   9272           9272           0         10.8          92.7       1.0X
+before 1582, noop                                  9142           9142           0         10.9          91.4       1.0X
+after 1582, rebase off                            21841          21841           0          4.6         218.4       0.4X
+after 1582, rebase on                             58245          58245           0          1.7         582.4       0.2X
+before 1582, rebase off                           19813          19813           0          5.0         198.1       0.5X
+before 1582, rebase on                            63737          63737           0          1.6         637.4       0.1X
+
+OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1058-aws
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Load dates from parquet:                  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+after 1582, vec off, rebase off                   13004          13063          67          7.7         130.0       1.0X
+after 1582, vec off, rebase on                    36224          36253          26          2.8         362.2       0.4X
+after 1582, vec on, rebase off                     3596           3654          54         27.8          36.0       3.6X
+after 1582, vec on, rebase on                     26144          26253         112          3.8         261.4       0.5X
+before 1582, vec off, rebase off                  12872          12914          51          7.8         128.7       1.0X
+before 1582, vec off, rebase on                   37762          37904         153          2.6         377.6       0.3X
+before 1582, vec on, rebase off                    3522           3592          94         28.4          35.2       3.7X
+before 1582, vec on, rebase on                    27580          27615          59          3.6         275.8       0.5X
+
+OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1058-aws
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Save timestamps to parquet:               Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+after 1582, noop                                   3113           3113           0         32.1          31.1       1.0X
+before 1582, noop                                  3078           3078           0         32.5          30.8       1.0X
+after 1582, rebase off                            15749          15749           0          6.3         157.5       0.2X
+after 1582, rebase on                             69106          69106           0          1.4         691.1       0.0X
+before 1582, rebase off                           15967          15967           0          6.3         159.7       0.2X
+before 1582, rebase on                            76843          76843           0          1.3         768.4       0.0X
+
+OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1058-aws
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Load timestamps from parquet:             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+after 1582, vec off, rebase off                   15070          15172          94          6.6         150.7       1.0X
+after 1582, vec off, rebase on                    43748          43867         157          2.3         437.5       0.3X
+after 1582, vec on, rebase off                     4805           4859          60         20.8          48.1       3.1X
+after 1582, vec on, rebase on                     33960          34027          61          2.9         339.6       0.4X
+before 1582, vec off, rebase off                  15037          15071          52          6.7         150.4       1.0X
+before 1582, vec off, rebase on                   44590          44749         156          2.2         445.9       0.3X
+before 1582, vec on, rebase off                    4831           4852          30         20.7          48.3       3.1X
+before 1582, vec on, rebase on                    35460          35481          18          2.8         354.6       0.4X
+
+
diff --git a/sql/core/benchmarks/DateTimeRebaseBenchmark-results.txt b/sql/core/benchmarks/DateTimeRebaseBenchmark-results.txt
new file mode 100644
index 0000000..c9320cf
--- /dev/null
+++ b/sql/core/benchmarks/DateTimeRebaseBenchmark-results.txt
@@ -0,0 +1,53 @@
+================================================================================================
+Rebasing dates/timestamps in Parquet datasource
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1058-aws
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Save dates to parquet:                    Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+after 1582, noop                                   9472           9472           0         10.6          94.7       1.0X
+before 1582, noop                                  9226           9226           0         10.8          92.3       1.0X
+after 1582, rebase off                            21201          21201           0          4.7         212.0       0.4X
+after 1582, rebase on                             56471          56471           0          1.8         564.7       0.2X
+before 1582, rebase off                           20179          20179           0          5.0         201.8       0.5X
+before 1582, rebase on                            65717          65717           0          1.5         657.2       0.1X
+
+OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1058-aws
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Load dates from parquet:                  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+after 1582, vec off, rebase off                   12294          12434         205          8.1         122.9       1.0X
+after 1582, vec off, rebase on                    36959          36967          12          2.7         369.6       0.3X
+after 1582, vec on, rebase off                     3644           3691          49         27.4          36.4       3.4X
+after 1582, vec on, rebase on                     26764          26852          92          3.7         267.6       0.5X
+before 1582, vec off, rebase off                  12830          12917          85          7.8         128.3       1.0X
+before 1582, vec off, rebase on                   38897          39053         229          2.6         389.0       0.3X
+before 1582, vec on, rebase off                    3638           3693          85         27.5          36.4       3.4X
+before 1582, vec on, rebase on                    28956          29007          44          3.5         289.6       0.4X
+
+OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1058-aws
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Save timestamps to parquet:               Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+after 1582, noop                                   2952           2952           0         33.9          29.5       1.0X
+before 1582, noop                                  2880           2880           0         34.7          28.8       1.0X
+after 1582, rebase off                            15928          15928           0          6.3         159.3       0.2X
+after 1582, rebase on                             82816          82816           0          1.2         828.2       0.0X
+before 1582, rebase off                           15988          15988           0          6.3         159.9       0.2X
+before 1582, rebase on                            92636          92636           0          1.1         926.4       0.0X
+
+OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1058-aws
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Load timestamps from parquet:             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+after 1582, vec off, rebase off                   14863          14917          78          6.7         148.6       1.0X
+after 1582, vec off, rebase on                    54819          54939         140          1.8         548.2       0.3X
+after 1582, vec on, rebase off                     4905           4941          32         20.4          49.0       3.0X
+after 1582, vec on, rebase on                     44914          45008         124          2.2         449.1       0.3X
+before 1582, vec off, rebase off                  14928          14970          48          6.7         149.3       1.0X
+before 1582, vec off, rebase on                   59752          59996         245          1.7         597.5       0.2X
+before 1582, vec on, rebase off                    4892           4916          33         20.4          48.9       3.0X
+before 1582, vec on, rebase on                    46854          46977         198          2.1         468.5       0.3X
+
+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala
new file mode 100644
index 0000000..983d9b4
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import java.time.{LocalDate, LocalDateTime, LocalTime, ZoneOffset}
+
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.util.DateTimeConstants.SECONDS_PER_DAY
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Synthetic benchmark for rebasing of date and timestamp in read/write.
+ * To run this benchmark:
+ * {{{
+ *   1. without sbt:
+ *      bin/spark-submit --class <this class> --jars <spark core test jar> <sql core test jar>
+ *   2. build/sbt "sql/test:runMain <this class>"
+ *   3. generate result:
+ *      SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
+ *      Results will be written to "benchmarks/DateTimeRebaseBenchmark-results.txt".
+ * }}}
+ */
+object DateTimeRebaseBenchmark extends SqlBasedBenchmark {
+  import spark.implicits._
+
+  private def genTs(cardinality: Int, start: LocalDateTime, end: LocalDateTime): DataFrame = {
+    val startSec = start.toEpochSecond(ZoneOffset.UTC)
+    val endSec = end.toEpochSecond(ZoneOffset.UTC)
+    spark.range(0, cardinality, 1, 1)
+      .select((($"id" % (endSec - startSec)) + startSec).as("seconds"))
+      .select($"seconds".cast("timestamp").as("ts"))
+  }
+
+  private def genTsAfter1582(cardinality: Int): DataFrame = {
+    val start = LocalDateTime.of(1582, 10, 15, 0, 0, 0)
+    val end = LocalDateTime.of(3000, 1, 1, 0, 0, 0)
+    genTs(cardinality, start, end)
+  }
+
+  private def genTsBefore1582(cardinality: Int): DataFrame = {
+    val start = LocalDateTime.of(10, 1, 1, 0, 0, 0)
+    val end = LocalDateTime.of(1580, 1, 1, 0, 0, 0)
+    genTs(cardinality, start, end)
+  }
+
+  private def genDate(cardinality: Int, start: LocalDate, end: LocalDate): DataFrame = {
+    val startSec = LocalDateTime.of(start, LocalTime.MIDNIGHT).toEpochSecond(ZoneOffset.UTC)
+    val endSec = LocalDateTime.of(end, LocalTime.MIDNIGHT).toEpochSecond(ZoneOffset.UTC)
+    spark.range(0, cardinality * SECONDS_PER_DAY, SECONDS_PER_DAY, 1)
+      .select((($"id" % (endSec - startSec)) + startSec).as("seconds"))
+      .select($"seconds".cast("timestamp").as("ts"))
+      .select($"ts".cast("date").as("date"))
+  }
+
+  private def genDateAfter1582(cardinality: Int): DataFrame = {
+    val start = LocalDate.of(1582, 10, 15)
+    val end = LocalDate.of(3000, 1, 1)
+    genDate(cardinality, start, end)
+  }
+
+  private def genDateBefore1582(cardinality: Int): DataFrame = {
+    val start = LocalDate.of(10, 1, 1)
+    val end = LocalDate.of(1580, 1, 1)
+    genDate(cardinality, start, end)
+  }
+
+  private def genDF(cardinality: Int, dateTime: String, after1582: Boolean): DataFrame = {
+    (dateTime, after1582) match {
+      case ("date", true) => genDateAfter1582(cardinality)
+      case ("date", false) => genDateBefore1582(cardinality)
+      case ("timestamp", true) => genTsAfter1582(cardinality)
+      case ("timestamp", false) => genTsBefore1582(cardinality)
+      case _ => throw new IllegalArgumentException(
+        s"cardinality = $cardinality dateTime = $dateTime after1582 = $after1582")
+    }
+  }
+
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    withTempPath { path =>
+      runBenchmark("Rebasing dates/timestamps in Parquet datasource") {
+        val rowsNum = 100000000
+        Seq("date", "timestamp").foreach { dateTime =>
+          val benchmark = new Benchmark(s"Save ${dateTime}s to parquet", rowsNum, output = output)
+          benchmark.addCase("after 1582, noop", 1) { _ =>
+            genDF(rowsNum, dateTime, after1582 = true).noop()
+          }
+          benchmark.addCase("before 1582, noop", 1) { _ =>
+            genDF(rowsNum, dateTime, after1582 = false).noop()
+          }
+
+          def save(after1582: Boolean, rebase: Boolean): Unit = {
+            val period = if (after1582) "after" else "before"
+            val rebaseFlag = if (rebase) "on" else "off"
+            val caseName = s"$period 1582, rebase $rebaseFlag"
+            benchmark.addCase(caseName, 1) { _ =>
+              withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> rebase.toString) {
+                val df = genDF(rowsNum, dateTime, after1582)
+                val pathToWrite = path.getAbsolutePath + s"/${dateTime}_${period}_1582_$rebaseFlag"
+                df.write
+                  .mode("overwrite")
+                  .format("parquet")
+                  .save(pathToWrite)
+              }
+            }
+          }
+
+          Seq(true, false).foreach { after1582 =>
+            Seq(false, true).foreach { rebase =>
+              save(after1582, rebase)
+            }
+          }
+          benchmark.run()
+
+          val benchmark2 = new Benchmark(
+            s"Load ${dateTime}s from parquet", rowsNum, output = output)
+
+          def load(after1582: Boolean, vec: Boolean, rebase: Boolean): Unit = {
+            val period = if (after1582) "after" else "before"
+            val rebaseFlag = if (rebase) "on" else "off"
+            val vecFlag = if (vec) "on" else "off"
+            val caseName = s"$period 1582, vec $vecFlag, rebase $rebaseFlag"
+            benchmark2.addCase(caseName, 3) { _ =>
+              withSQLConf(
+                SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vec.toString,
+                SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> rebase.toString) {
+                val pathToRead = path.getAbsolutePath + s"/${dateTime}_${period}_1582_$rebaseFlag"
+                spark.read.format("parquet").load(pathToRead).noop()
+              }
+            }
+          }
+
+          Seq(true, false).foreach { after1582 =>
+            Seq(false, true).foreach { vec =>
+              Seq(false, true).foreach { rebase =>
+                load(after1582, vec, rebase)
+              }
+            }
+          }
+
+          benchmark2.run()
+        }
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org