You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/01/30 07:15:44 UTC
[spark] branch master updated: [SPARK-26378][SQL] Restore
performance of queries against wide CSV/JSON tables
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7781c6f [SPARK-26378][SQL] Restore performance of queries against wide CSV/JSON tables
7781c6f is described below
commit 7781c6fd7334979b6b4222d2271765219593f08e
Author: Bruce Robbins <be...@gmail.com>
AuthorDate: Wed Jan 30 15:15:29 2019 +0800
[SPARK-26378][SQL] Restore performance of queries against wide CSV/JSON tables
## What changes were proposed in this pull request?
After [recent changes](https://github.com/apache/spark/commit/11e5f1bcd49eec8ab4225d6e68a051b5c6a21cb2) to CSV parsing to return partial results for bad CSV records, queries of wide CSV tables slowed considerably. That recent change resulted in every row being recreated, even when the associated input record had no parsing issues and the user specified no corrupt record field in his/her schema.
The change to FailureSafeParser.scala also impacted queries against wide JSON tables as well.
In this PR, I propose that a row should be recreated only if columns need to be shifted due to the existence of a corrupt column field in the user-supplied schema. Otherwise, the code should use the row as-is (For CSV input, it will have values for the columns that could be converted, and also null values for columns that could not be converted).
See benchmarks below. The CSV benchmark for 1000 columns went from 120144 ms to 89069 ms, a savings of 25% (this only brings the cost down to baseline levels. Again, see benchmarks below).
Similarly, the JSON benchmark for 1000 columns (added in this PR) went from 109621 ms to 80871 ms, also a savings of 25%.
Still, partial results functionality is preserved:
<pre>
bash-3.2$ cat test2.csv
"hello",1999-08-01,"last"
"there","bad date","field"
"again","2017-11-22","in file"
bash-3.2$ bin/spark-shell
...etc...
scala> val df = spark.read.schema("a string, b date, c string").csv("test2.csv")
df: org.apache.spark.sql.DataFrame = [a: string, b: date ... 1 more field]
scala> df.show
+-----+----------+-------+
| a| b| c|
+-----+----------+-------+
|hello|1999-08-01| last|
|there| null| field|
|again|2017-11-22|in file|
+-----+----------+-------+
scala> val df = spark.read.schema("badRecord string, a string, b date, c string").
| option("columnNameOfCorruptRecord", "badRecord").
| csv("test2.csv")
df: org.apache.spark.sql.DataFrame = [badRecord: string, a: string ... 2 more fields]
scala> df.show
+--------------------+-----+----------+-------+
| badRecord| a| b| c|
+--------------------+-----+----------+-------+
| null|hello|1999-08-01| last|
|"there","bad date...|there| null| field|
| null|again|2017-11-22|in file|
+--------------------+-----+----------+-------+
scala>
</pre>
### CSVBenchmark Benchmarks:
baseline = commit before partial results change
PR = this PR
master = master branch
[baseline_CSVBenchmark-results.txt](https://github.com/apache/spark/files/2697109/baseline_CSVBenchmark-results.txt)
[pr_CSVBenchmark-results.txt](https://github.com/apache/spark/files/2697110/pr_CSVBenchmark-results.txt)
[master_CSVBenchmark-results.txt](https://github.com/apache/spark/files/2697111/master_CSVBenchmark-results.txt)
### JSONBenchmark Benchmarks:
baseline = commit before partial results change
PR = this PR
master = master branch
[baseline_JSONBenchmark-results.txt](https://github.com/apache/spark/files/2711040/baseline_JSONBenchmark-results.txt)
[pr_JSONBenchmark-results.txt](https://github.com/apache/spark/files/2711041/pr_JSONBenchmark-results.txt)
[master_JSONBenchmark-results.txt](https://github.com/apache/spark/files/2711042/master_JSONBenchmark-results.txt)
## How was this patch tested?
- All SQL unit tests.
- Added 2 CSV benchmarks
- Python core and SQL tests
Closes #23336 from bersprockets/csv-wide-row-opt2.
Authored-by: Bruce Robbins <be...@gmail.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../sql/catalyst/util/FailureSafeParser.scala | 21 +++++----
sql/core/benchmarks/CSVBenchmark-results.txt | 31 +++++++------
sql/core/benchmarks/JSONBenchmark-results.txt | 54 +++++++++++++---------
.../execution/datasources/csv/CSVBenchmark.scala | 16 +++++++
.../execution/datasources/json/JsonBenchmark.scala | 48 ++++++++++++++++++-
5 files changed, 124 insertions(+), 46 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
index 4baf052..76745b1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
@@ -33,21 +33,26 @@ class FailureSafeParser[IN](
private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord)
private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord))
private val resultRow = new GenericInternalRow(schema.length)
+ private val nullResult = new GenericInternalRow(schema.length)
// This function takes 2 parameters: an optional partial result, and the bad record. If the given
// schema doesn't contain a field for corrupted record, we just return the partial result or a
// row with all fields null. If the given schema contains a field for corrupted record, we will
// set the bad record to this field, and set other fields according to the partial result or null.
private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = {
- (row, badRecord) => {
- var i = 0
- while (i < actualSchema.length) {
- val from = actualSchema(i)
- resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, from.dataType)).orNull
- i += 1
+ if (corruptFieldIndex.isDefined) {
+ (row, badRecord) => {
+ var i = 0
+ while (i < actualSchema.length) {
+ val from = actualSchema(i)
+ resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, from.dataType)).orNull
+ i += 1
+ }
+ resultRow(corruptFieldIndex.get) = badRecord()
+ resultRow
}
- corruptFieldIndex.foreach(index => resultRow(index) = badRecord())
- resultRow
+ } else {
+ (row, _) => row.getOrElse(nullResult)
}
}
diff --git a/sql/core/benchmarks/CSVBenchmark-results.txt b/sql/core/benchmarks/CSVBenchmark-results.txt
index 865575b..4fef15b 100644
--- a/sql/core/benchmarks/CSVBenchmark-results.txt
+++ b/sql/core/benchmarks/CSVBenchmark-results.txt
@@ -2,26 +2,29 @@
Benchmark to measure CSV read/write performance
================================================================================================
-OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
-Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Linux 3.16.0-31-generic
+Intel(R) Xeon(R) CPU @ 2.50GHz
Parsing quoted values: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
-One quoted string 64733 / 64839 0.0 1294653.1 1.0X
+One quoted string 49754 / 50158 0.0 995072.2 1.0X
-OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
-Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Linux 3.16.0-31-generic
+Intel(R) Xeon(R) CPU @ 2.50GHz
Wide rows with 1000 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
-Select 1000 columns 185609 / 189735 0.0 185608.6 1.0X
-Select 100 columns 50195 / 51808 0.0 50194.8 3.7X
-Select one column 39266 / 39293 0.0 39265.6 4.7X
-count() 10959 / 11000 0.1 10958.5 16.9X
+Select 1000 columns 149402 / 151785 0.0 149401.9 1.0X
+Select 100 columns 42986 / 43985 0.0 42986.1 3.5X
+Select one column 33764 / 34057 0.0 33763.6 4.4X
+count() 9332 / 9508 0.1 9332.2 16.0X
+Select 100 columns, one bad input field 50963 / 51512 0.0 50962.5 2.9X
+Select 100 columns, corrupt record field 69627 / 71029 0.0 69627.5 2.1X
-OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
-Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Linux 3.16.0-31-generic
+Intel(R) Xeon(R) CPU @ 2.50GHz
Count a dataset with 10 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
-Select 10 columns + count() 24637 / 24768 0.4 2463.7 1.0X
-Select 1 column + count() 20026 / 20076 0.5 2002.6 1.2X
-count() 3754 / 3877 2.7 375.4 6.6X
+Select 10 columns + count() 22588 / 22623 0.4 2258.8 1.0X
+Select 1 column + count() 14649 / 14690 0.7 1464.9 1.5X
+count() 3385 / 3453 3.0 338.5 6.7X
+
diff --git a/sql/core/benchmarks/JSONBenchmark-results.txt b/sql/core/benchmarks/JSONBenchmark-results.txt
index 4774294..947f57d 100644
--- a/sql/core/benchmarks/JSONBenchmark-results.txt
+++ b/sql/core/benchmarks/JSONBenchmark-results.txt
@@ -3,46 +3,54 @@ Benchmark for performance of JSON parsing
================================================================================================
Preparing data for benchmarking ...
-OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
-Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Linux 3.16.0-31-generic
+Intel(R) Xeon(R) CPU @ 2.50GHz
JSON schema inferring: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
-No encoding 71832 / 72149 1.4 718.3 1.0X
-UTF-8 is set 101700 / 101819 1.0 1017.0 0.7X
+No encoding 80821 / 82526 1.2 808.2 1.0X
+UTF-8 is set 129478 / 130381 0.8 1294.8 0.6X
Preparing data for benchmarking ...
-OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
-Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Linux 3.16.0-31-generic
+Intel(R) Xeon(R) CPU @ 2.50GHz
count a short column: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
-No encoding 16501 / 16519 6.1 165.0 1.0X
-UTF-8 is set 16477 / 16516 6.1 164.8 1.0X
+No encoding 16804 / 16948 6.0 168.0 1.0X
+UTF-8 is set 16648 / 16757 6.0 166.5 1.0X
Preparing data for benchmarking ...
-OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
-Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Linux 3.16.0-31-generic
+Intel(R) Xeon(R) CPU @ 2.50GHz
count a wide column: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
-No encoding 39871 / 40242 0.3 3987.1 1.0X
-UTF-8 is set 39581 / 39721 0.3 3958.1 1.0X
+No encoding 30949 / 31058 0.3 3094.9 1.0X
+UTF-8 is set 30629 / 33896 0.3 3062.9 1.0X
Preparing data for benchmarking ...
-OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
-Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Linux 3.16.0-31-generic
+Intel(R) Xeon(R) CPU @ 2.50GHz
+select wide row: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------
+No encoding 123050 / 124199 0.0 246099.8 1.0X
+UTF-8 is set 139306 / 142569 0.0 278612.7 0.9X
+
+Preparing data for benchmarking ...
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Linux 3.16.0-31-generic
+Intel(R) Xeon(R) CPU @ 2.50GHz
Select a subset of 10 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
-Select 10 columns + count() 16011 / 16033 0.6 1601.1 1.0X
-Select 1 column + count() 14350 / 14392 0.7 1435.0 1.1X
-count() 3007 / 3034 3.3 300.7 5.3X
+Select 10 columns + count() 19539 / 19896 0.5 1953.9 1.0X
+Select 1 column + count() 16412 / 16445 0.6 1641.2 1.2X
+count() 2783 / 2801 3.6 278.3 7.0X
Preparing data for benchmarking ...
-OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
-Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Linux 3.16.0-31-generic
+Intel(R) Xeon(R) CPU @ 2.50GHz
creation of JSON parser per line: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
-Short column without encoding 8334 / 8453 1.2 833.4 1.0X
-Short column with UTF-8 13627 / 13784 0.7 1362.7 0.6X
-Wide column without encoding 155073 / 155351 0.1 15507.3 0.1X
-Wide column with UTF-8 212114 / 212263 0.0 21211.4 0.0X
+Short column without encoding 9576 / 9612 1.0 957.6 1.0X
+Short column with UTF-8 13555 / 13698 0.7 1355.5 0.7X
+Wide column without encoding 174761 / 175665 0.1 17476.1 0.1X
+Wide column with UTF-8 203219 / 205151 0.0 20321.9 0.0X
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala
index ce38b08..6e6fc47 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala
@@ -92,6 +92,22 @@ object CSVBenchmark extends SqlBasedBenchmark {
ds.count()
}
+ val schemaErr1 = StructType(StructField("col0", DateType) +:
+ (1 until colsNum).map(i => StructField(s"col$i", IntegerType)))
+ val dsErr1 = spark.read.schema(schemaErr1).csv(path.getAbsolutePath)
+ benchmark.addCase(s"Select 100 columns, one bad input field", 3) { _ =>
+ dsErr1.select(cols100: _*).filter((row: Row) => true).count()
+ }
+
+ val badRecColName = "badRecord"
+ val schemaErr2 = schemaErr1.add(StructField(badRecColName, StringType))
+ val dsErr2 = spark.read.schema(schemaErr2)
+ .option("columnNameOfCorruptRecord", badRecColName)
+ .csv(path.getAbsolutePath)
+ benchmark.addCase(s"Select 100 columns, corrupt record field", 3) { _ =>
+ dsErr2.select((Column(badRecColName) +: cols100): _*).filter((row: Row) => true).count()
+ }
+
benchmark.run()
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala
index f50c25e..27f7023 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala
@@ -59,12 +59,15 @@ object JSONBenchmark extends SqlBasedBenchmark {
.json(path.getAbsolutePath)
benchmark.addCase("No encoding", numIters) { _ =>
- spark.read.json(path.getAbsolutePath)
+ spark.read
+ .option("inferTimestamp", false)
+ .json(path.getAbsolutePath)
}
benchmark.addCase("UTF-8 is set", numIters) { _ =>
spark.read
.option("encoding", "UTF-8")
+ .option("inferTimestamp", false)
.json(path.getAbsolutePath)
}
@@ -121,6 +124,18 @@ object JSONBenchmark extends SqlBasedBenchmark {
.add("z", StringType)
}
+ def writeWideRow(path: String, rowsNum: Int): StructType = {
+ val colsNum = 1000
+ val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType))
+ val schema = StructType(fields)
+
+ spark.range(rowsNum)
+ .select(Seq.tabulate(colsNum)(i => lit(i).as(s"col$i")): _*)
+ .write.json(path)
+
+ schema
+ }
+
def countWideColumn(rowsNum: Int, numIters: Int): Unit = {
val benchmark = new Benchmark("count a wide column", rowsNum, output = output)
@@ -147,6 +162,36 @@ object JSONBenchmark extends SqlBasedBenchmark {
}
}
+ def countWideRow(rowsNum: Int, numIters: Int): Unit = {
+ val benchmark = new Benchmark("select wide row", rowsNum, output = output)
+
+ withTempPath { path =>
+ prepareDataInfo(benchmark)
+ val schema = writeWideRow(path.getAbsolutePath, rowsNum)
+
+ benchmark.addCase("No encoding", numIters) { _ =>
+ spark.read
+ .schema(schema)
+ .json(path.getAbsolutePath)
+ .select("*")
+ .filter((row: Row) => true)
+ .count()
+ }
+
+ benchmark.addCase("UTF-8 is set", numIters) { _ =>
+ spark.read
+ .option("encoding", "UTF-8")
+ .schema(schema)
+ .json(path.getAbsolutePath)
+ .select("*")
+ .filter((row: Row) => true)
+ .count()
+ }
+
+ benchmark.run()
+ }
+ }
+
def selectSubsetOfColumns(rowsNum: Int, numIters: Int): Unit = {
val colsNum = 10
val benchmark =
@@ -236,6 +281,7 @@ object JSONBenchmark extends SqlBasedBenchmark {
schemaInferring(100 * 1000 * 1000, numIters)
countShortColumn(100 * 1000 * 1000, numIters)
countWideColumn(10 * 1000 * 1000, numIters)
+ countWideRow(500 * 1000, numIters)
selectSubsetOfColumns(10 * 1000 * 1000, numIters)
jsonParserCreation(10 * 1000 * 1000, numIters)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org