You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by maropu <gi...@git.apache.org> on 2018/05/29 03:26:23 UTC
[GitHub] spark pull request #21409: [SPARK-24365][SQL] Add Data Source write benchmar...
Github user maropu commented on a diff in the pull request:
https://github.com/apache/spark/pull/21409#discussion_r191297180
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceWriteBenchmark.scala ---
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Benchmark
+
+/**
+ * Benchmark to measure data source write performance.
+ * By default it measures 4 data source format: Parquet, ORC, JSON, CSV:
+ * spark-submit --class <this class> <spark sql test jar>
+ * To measure specified formats, run it with arguments:
+ * spark-submit --class <this class> <spark sql test jar> format1 [format2] [...]
+ */
+object DataSourceWriteBenchmark {
+ val conf = new SparkConf()
+ .setAppName("DataSourceWriteBenchmark")
+ .setIfMissing("spark.master", "local[1]")
+ .set("spark.sql.parquet.compression.codec", "snappy")
+ .set("spark.sql.orc.compression.codec", "snappy")
+
+ val spark = SparkSession.builder.config(conf).getOrCreate()
+
+ // Set default configs. Individual cases will change them if necessary.
+ spark.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")
+
+ val tempTable = "temp"
+ val numRows = 1024 * 1024 * 15
+
+ def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+ try f finally tableNames.foreach(spark.catalog.dropTempView)
+ }
+
+ def withTable(tableNames: String*)(f: => Unit): Unit = {
+ try f finally {
+ tableNames.foreach { name =>
+ spark.sql(s"DROP TABLE IF EXISTS $name")
+ }
+ }
+ }
+
+ def writeInt(table: String, format: String, benchmark: Benchmark): Unit = {
+ spark.sql(s"create table $table(c1 INT, c2 STRING) using $format")
+ benchmark.addCase("Output Single Int Column") { _ =>
+ spark.sql(s"INSERT overwrite table $table select cast(id as INT) as " +
+ s"c1, cast(id as STRING) as c2 from $tempTable")
+ }
+ }
+
+ def writeIntString(table: String, format: String, benchmark: Benchmark): Unit = {
+ spark.sql(s"create table $table(c1 INT, c2 STRING) using $format")
+ benchmark.addCase("Output Int and String Column") { _ =>
+ spark.sql(s"INSERT overwrite table $table select cast(id as INT) as " +
+ s"c1, cast(id as STRING) as c2 from $tempTable")
+ }
+ }
+
+ def writePartition(table: String, format: String, benchmark: Benchmark): Unit = {
+ spark.sql(s"create table $table(p INT, id INT) using $format PARTITIONED BY (p)")
+ benchmark.addCase("Output Partitions") { _ =>
+ spark.sql(s"INSERT overwrite table $table select cast(id as INT) as id," +
+ s" cast(id % 2 as INT) as p from $tempTable")
+ }
+ }
+
+ def writeBucket(table: String, format: String, benchmark: Benchmark): Unit = {
+ spark.sql(s"create table $table(c1 INT, c2 INT) using $format CLUSTERED BY (c2) INTO 2 BUCKETS")
+ benchmark.addCase("Output Buckets") { _ =>
+ spark.sql(s"INSERT overwrite table $table select cast(id as INT) as " +
+ s"c1, cast(id as INT) as c2 from $tempTable")
+ }
+ }
+
+ def main(args: Array[String]): Unit = {
+ val tableInt = "tableInt"
+ val tableIntString = "tableIntString"
+ val tablePartition = "tablePartition"
+ val tableBucket = "tableBucket"
+ // If the
+ val formats: Seq[String] = if (args.isEmpty) {
+ Seq("Parquet", "ORC", "JSON", "CSV")
+ } else {
+ args
+ }
+ /*
+ Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
+ Parquet writer benchmark: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Output Single Int Column 6054 / 6070 2.6 384.9 1.0X
+ Output Int and String Column 5784 / 5800 2.7 367.8 1.0X
+ Output Partitions 3891 / 3904 4.0 247.4 1.6X
+ Output Buckets 5446 / 5729 2.9 346.2 1.1X
+
--- End diff --
To make the results easy-to-compare, how about benchmarking the 4 cases for each types? e.g.,
```
Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
Output Single Int Column Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
CSV ...
JSON ...
Parquet ...
ORC ...
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org