You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/03/30 08:53:39 UTC
[spark] branch branch-3.0 updated: [SPARK-31296][SQL][TESTS]
Benchmark date-time rebasing in Parquet datasource
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 3ab2f88 [SPARK-31296][SQL][TESTS] Benchmark date-time rebasing in Parquet datasource
3ab2f88 is described below
commit 3ab2f8877e4618031b3258cb90eb249145077e08
Author: Maxim Gekk <ma...@gmail.com>
AuthorDate: Mon Mar 30 16:46:31 2020 +0800
[SPARK-31296][SQL][TESTS] Benchmark date-time rebasing in Parquet datasource
### What changes were proposed in this pull request?
In the PR, I propose to add new benchmark `DateTimeRebaseBenchmark` which should measure the performance of rebasing of dates/timestamps from/to to the hybrid calendar (Julian+Gregorian) to/from Proleptic Gregorian calendar:
1. In write, it saves separately dates and timestamps before and after 1582 year w/ and w/o rebasing.
2. In read, it loads previously saved parquet files by vectorized reader and by regular reader.
Here is the summary of benchmarking:
- Saving timestamps is **~6 times slower**
- Loading timestamps w/ vectorized **off** is **~4 times slower**
- Loading timestamps w/ vectorized **on** is **~10 times slower**
### Why are the changes needed?
To know the impact of date-time rebasing introduced by #27915, #27953, #27807.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Run the `DateTimeRebaseBenchmark` benchmark using Amazon EC2:
| Item | Description |
| ---- | ----|
| Region | us-west-2 (Oregon) |
| Instance | r3.xlarge |
| AMI | ubuntu/images/hvm-ssd/ubuntu-bionic-18.04-amd64-server-20190722.1 (ami-06f2f779464715dc5) |
| Java | OpenJDK8/11 |
Closes #28057 from MaxGekk/rebase-bechmark.
Lead-authored-by: Maxim Gekk <ma...@gmail.com>
Co-authored-by: Max Gekk <ma...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit a1dbcd13a3eeaee50cc1a46e909f9478d6d55177)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../DateTimeRebaseBenchmark-jdk11-results.txt | 53 +++++++
.../benchmarks/DateTimeRebaseBenchmark-results.txt | 53 +++++++
.../benchmark/DateTimeRebaseBenchmark.scala | 161 +++++++++++++++++++++
3 files changed, 267 insertions(+)
diff --git a/sql/core/benchmarks/DateTimeRebaseBenchmark-jdk11-results.txt b/sql/core/benchmarks/DateTimeRebaseBenchmark-jdk11-results.txt
new file mode 100644
index 0000000..52522f8
--- /dev/null
+++ b/sql/core/benchmarks/DateTimeRebaseBenchmark-jdk11-results.txt
@@ -0,0 +1,53 @@
+================================================================================================
+Rebasing dates/timestamps in Parquet datasource
+================================================================================================
+
+OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1058-aws
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Save dates to parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+after 1582, noop 9272 9272 0 10.8 92.7 1.0X
+before 1582, noop 9142 9142 0 10.9 91.4 1.0X
+after 1582, rebase off 21841 21841 0 4.6 218.4 0.4X
+after 1582, rebase on 58245 58245 0 1.7 582.4 0.2X
+before 1582, rebase off 19813 19813 0 5.0 198.1 0.5X
+before 1582, rebase on 63737 63737 0 1.6 637.4 0.1X
+
+OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1058-aws
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Load dates from parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+after 1582, vec off, rebase off 13004 13063 67 7.7 130.0 1.0X
+after 1582, vec off, rebase on 36224 36253 26 2.8 362.2 0.4X
+after 1582, vec on, rebase off 3596 3654 54 27.8 36.0 3.6X
+after 1582, vec on, rebase on 26144 26253 112 3.8 261.4 0.5X
+before 1582, vec off, rebase off 12872 12914 51 7.8 128.7 1.0X
+before 1582, vec off, rebase on 37762 37904 153 2.6 377.6 0.3X
+before 1582, vec on, rebase off 3522 3592 94 28.4 35.2 3.7X
+before 1582, vec on, rebase on 27580 27615 59 3.6 275.8 0.5X
+
+OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1058-aws
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Save timestamps to parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+after 1582, noop 3113 3113 0 32.1 31.1 1.0X
+before 1582, noop 3078 3078 0 32.5 30.8 1.0X
+after 1582, rebase off 15749 15749 0 6.3 157.5 0.2X
+after 1582, rebase on 69106 69106 0 1.4 691.1 0.0X
+before 1582, rebase off 15967 15967 0 6.3 159.7 0.2X
+before 1582, rebase on 76843 76843 0 1.3 768.4 0.0X
+
+OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1058-aws
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Load timestamps from parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+after 1582, vec off, rebase off 15070 15172 94 6.6 150.7 1.0X
+after 1582, vec off, rebase on 43748 43867 157 2.3 437.5 0.3X
+after 1582, vec on, rebase off 4805 4859 60 20.8 48.1 3.1X
+after 1582, vec on, rebase on 33960 34027 61 2.9 339.6 0.4X
+before 1582, vec off, rebase off 15037 15071 52 6.7 150.4 1.0X
+before 1582, vec off, rebase on 44590 44749 156 2.2 445.9 0.3X
+before 1582, vec on, rebase off 4831 4852 30 20.7 48.3 3.1X
+before 1582, vec on, rebase on 35460 35481 18 2.8 354.6 0.4X
+
+
diff --git a/sql/core/benchmarks/DateTimeRebaseBenchmark-results.txt b/sql/core/benchmarks/DateTimeRebaseBenchmark-results.txt
new file mode 100644
index 0000000..c9320cf
--- /dev/null
+++ b/sql/core/benchmarks/DateTimeRebaseBenchmark-results.txt
@@ -0,0 +1,53 @@
+================================================================================================
+Rebasing dates/timestamps in Parquet datasource
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1058-aws
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Save dates to parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+after 1582, noop 9472 9472 0 10.6 94.7 1.0X
+before 1582, noop 9226 9226 0 10.8 92.3 1.0X
+after 1582, rebase off 21201 21201 0 4.7 212.0 0.4X
+after 1582, rebase on 56471 56471 0 1.8 564.7 0.2X
+before 1582, rebase off 20179 20179 0 5.0 201.8 0.5X
+before 1582, rebase on 65717 65717 0 1.5 657.2 0.1X
+
+OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1058-aws
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Load dates from parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+after 1582, vec off, rebase off 12294 12434 205 8.1 122.9 1.0X
+after 1582, vec off, rebase on 36959 36967 12 2.7 369.6 0.3X
+after 1582, vec on, rebase off 3644 3691 49 27.4 36.4 3.4X
+after 1582, vec on, rebase on 26764 26852 92 3.7 267.6 0.5X
+before 1582, vec off, rebase off 12830 12917 85 7.8 128.3 1.0X
+before 1582, vec off, rebase on 38897 39053 229 2.6 389.0 0.3X
+before 1582, vec on, rebase off 3638 3693 85 27.5 36.4 3.4X
+before 1582, vec on, rebase on 28956 29007 44 3.5 289.6 0.4X
+
+OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1058-aws
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Save timestamps to parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+after 1582, noop 2952 2952 0 33.9 29.5 1.0X
+before 1582, noop 2880 2880 0 34.7 28.8 1.0X
+after 1582, rebase off 15928 15928 0 6.3 159.3 0.2X
+after 1582, rebase on 82816 82816 0 1.2 828.2 0.0X
+before 1582, rebase off 15988 15988 0 6.3 159.9 0.2X
+before 1582, rebase on 92636 92636 0 1.1 926.4 0.0X
+
+OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1058-aws
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Load timestamps from parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+after 1582, vec off, rebase off 14863 14917 78 6.7 148.6 1.0X
+after 1582, vec off, rebase on 54819 54939 140 1.8 548.2 0.3X
+after 1582, vec on, rebase off 4905 4941 32 20.4 49.0 3.0X
+after 1582, vec on, rebase on 44914 45008 124 2.2 449.1 0.3X
+before 1582, vec off, rebase off 14928 14970 48 6.7 149.3 1.0X
+before 1582, vec off, rebase on 59752 59996 245 1.7 597.5 0.2X
+before 1582, vec on, rebase off 4892 4916 33 20.4 48.9 3.0X
+before 1582, vec on, rebase on 46854 46977 198 2.1 468.5 0.3X
+
+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala
new file mode 100644
index 0000000..983d9b4
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import java.time.{LocalDate, LocalDateTime, LocalTime, ZoneOffset}
+
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.util.DateTimeConstants.SECONDS_PER_DAY
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Synthetic benchmark for rebasing of date and timestamp in read/write.
+ * To run this benchmark:
+ * {{{
+ * 1. without sbt:
+ * bin/spark-submit --class <this class> --jars <spark core test jar> <sql core test jar>
+ * 2. build/sbt "sql/test:runMain <this class>"
+ * 3. generate result:
+ * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
+ * Results will be written to "benchmarks/DateTimeRebaseBenchmark-results.txt".
+ * }}}
+ */
+object DateTimeRebaseBenchmark extends SqlBasedBenchmark {
+ import spark.implicits._
+
+ private def genTs(cardinality: Int, start: LocalDateTime, end: LocalDateTime): DataFrame = {
+ val startSec = start.toEpochSecond(ZoneOffset.UTC)
+ val endSec = end.toEpochSecond(ZoneOffset.UTC)
+ spark.range(0, cardinality, 1, 1)
+ .select((($"id" % (endSec - startSec)) + startSec).as("seconds"))
+ .select($"seconds".cast("timestamp").as("ts"))
+ }
+
+ private def genTsAfter1582(cardinality: Int): DataFrame = {
+ val start = LocalDateTime.of(1582, 10, 15, 0, 0, 0)
+ val end = LocalDateTime.of(3000, 1, 1, 0, 0, 0)
+ genTs(cardinality, start, end)
+ }
+
+ private def genTsBefore1582(cardinality: Int): DataFrame = {
+ val start = LocalDateTime.of(10, 1, 1, 0, 0, 0)
+ val end = LocalDateTime.of(1580, 1, 1, 0, 0, 0)
+ genTs(cardinality, start, end)
+ }
+
+ private def genDate(cardinality: Int, start: LocalDate, end: LocalDate): DataFrame = {
+ val startSec = LocalDateTime.of(start, LocalTime.MIDNIGHT).toEpochSecond(ZoneOffset.UTC)
+ val endSec = LocalDateTime.of(end, LocalTime.MIDNIGHT).toEpochSecond(ZoneOffset.UTC)
+ spark.range(0, cardinality * SECONDS_PER_DAY, SECONDS_PER_DAY, 1)
+ .select((($"id" % (endSec - startSec)) + startSec).as("seconds"))
+ .select($"seconds".cast("timestamp").as("ts"))
+ .select($"ts".cast("date").as("date"))
+ }
+
+ private def genDateAfter1582(cardinality: Int): DataFrame = {
+ val start = LocalDate.of(1582, 10, 15)
+ val end = LocalDate.of(3000, 1, 1)
+ genDate(cardinality, start, end)
+ }
+
+ private def genDateBefore1582(cardinality: Int): DataFrame = {
+ val start = LocalDate.of(10, 1, 1)
+ val end = LocalDate.of(1580, 1, 1)
+ genDate(cardinality, start, end)
+ }
+
+ private def genDF(cardinality: Int, dateTime: String, after1582: Boolean): DataFrame = {
+ (dateTime, after1582) match {
+ case ("date", true) => genDateAfter1582(cardinality)
+ case ("date", false) => genDateBefore1582(cardinality)
+ case ("timestamp", true) => genTsAfter1582(cardinality)
+ case ("timestamp", false) => genTsBefore1582(cardinality)
+ case _ => throw new IllegalArgumentException(
+ s"cardinality = $cardinality dateTime = $dateTime after1582 = $after1582")
+ }
+ }
+
+ override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+ withTempPath { path =>
+ runBenchmark("Rebasing dates/timestamps in Parquet datasource") {
+ val rowsNum = 100000000
+ Seq("date", "timestamp").foreach { dateTime =>
+ val benchmark = new Benchmark(s"Save ${dateTime}s to parquet", rowsNum, output = output)
+ benchmark.addCase("after 1582, noop", 1) { _ =>
+ genDF(rowsNum, dateTime, after1582 = true).noop()
+ }
+ benchmark.addCase("before 1582, noop", 1) { _ =>
+ genDF(rowsNum, dateTime, after1582 = false).noop()
+ }
+
+ def save(after1582: Boolean, rebase: Boolean): Unit = {
+ val period = if (after1582) "after" else "before"
+ val rebaseFlag = if (rebase) "on" else "off"
+ val caseName = s"$period 1582, rebase $rebaseFlag"
+ benchmark.addCase(caseName, 1) { _ =>
+ withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> rebase.toString) {
+ val df = genDF(rowsNum, dateTime, after1582)
+ val pathToWrite = path.getAbsolutePath + s"/${dateTime}_${period}_1582_$rebaseFlag"
+ df.write
+ .mode("overwrite")
+ .format("parquet")
+ .save(pathToWrite)
+ }
+ }
+ }
+
+ Seq(true, false).foreach { after1582 =>
+ Seq(false, true).foreach { rebase =>
+ save(after1582, rebase)
+ }
+ }
+ benchmark.run()
+
+ val benchmark2 = new Benchmark(
+ s"Load ${dateTime}s from parquet", rowsNum, output = output)
+
+ def load(after1582: Boolean, vec: Boolean, rebase: Boolean): Unit = {
+ val period = if (after1582) "after" else "before"
+ val rebaseFlag = if (rebase) "on" else "off"
+ val vecFlag = if (vec) "on" else "off"
+ val caseName = s"$period 1582, vec $vecFlag, rebase $rebaseFlag"
+ benchmark2.addCase(caseName, 3) { _ =>
+ withSQLConf(
+ SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vec.toString,
+ SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> rebase.toString) {
+ val pathToRead = path.getAbsolutePath + s"/${dateTime}_${period}_1582_$rebaseFlag"
+ spark.read.format("parquet").load(pathToRead).noop()
+ }
+ }
+ }
+
+ Seq(true, false).foreach { after1582 =>
+ Seq(false, true).foreach { vec =>
+ Seq(false, true).foreach { rebase =>
+ load(after1582, vec, rebase)
+ }
+ }
+ }
+
+ benchmark2.run()
+ }
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org