You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/01/30 07:15:44 UTC

[spark] branch master updated: [SPARK-26378][SQL] Restore performance of queries against wide CSV/JSON tables

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7781c6f  [SPARK-26378][SQL] Restore performance of queries against wide CSV/JSON tables
7781c6f is described below

commit 7781c6fd7334979b6b4222d2271765219593f08e
Author: Bruce Robbins <be...@gmail.com>
AuthorDate: Wed Jan 30 15:15:29 2019 +0800

    [SPARK-26378][SQL] Restore performance of queries against wide CSV/JSON tables
    
    ## What changes were proposed in this pull request?
    
    After [recent changes](https://github.com/apache/spark/commit/11e5f1bcd49eec8ab4225d6e68a051b5c6a21cb2) to CSV parsing to return partial results for bad CSV records, queries of wide CSV tables slowed considerably. That recent change resulted in every row being recreated, even when the associated input record had no parsing issues and the user specified no corrupt record field in his/her schema.
    
    The change to FailureSafeParser.scala also impacted queries against wide JSON tables as well.
    
    In this PR, I propose that a row should be recreated only if columns need to be shifted due to the existence of a corrupt column field in the user-supplied schema. Otherwise, the code should use the row as-is (For CSV input, it will have values for the columns that could be converted, and also null values for columns that could not be converted).
    
    See benchmarks below. The CSV benchmark for 1000 columns went from 120144 ms to 89069 ms, a savings of 25% (this only brings the cost down to baseline levels. Again, see benchmarks below).
    
    Similarly, the JSON benchmark for 1000 columns (added in this PR) went from 109621 ms to 80871 ms, also a savings of 25%.
    
    Still, partial results functionality is preserved:
    
    <pre>
    bash-3.2$ cat test2.csv
    "hello",1999-08-01,"last"
    "there","bad date","field"
    "again","2017-11-22","in file"
    bash-3.2$ bin/spark-shell
    ...etc...
    scala> val df = spark.read.schema("a string, b date, c string").csv("test2.csv")
    df: org.apache.spark.sql.DataFrame = [a: string, b: date ... 1 more field]
    scala> df.show
    +-----+----------+-------+
    |    a|         b|      c|
    +-----+----------+-------+
    |hello|1999-08-01|   last|
    |there|      null|  field|
    |again|2017-11-22|in file|
    +-----+----------+-------+
    scala> val df = spark.read.schema("badRecord string, a string, b date, c string").
         | option("columnNameOfCorruptRecord", "badRecord").
         | csv("test2.csv")
    df: org.apache.spark.sql.DataFrame = [badRecord: string, a: string ... 2 more fields]
    scala> df.show
    +--------------------+-----+----------+-------+
    |           badRecord|    a|         b|      c|
    +--------------------+-----+----------+-------+
    |                null|hello|1999-08-01|   last|
    |"there","bad date...|there|      null|  field|
    |                null|again|2017-11-22|in file|
    +--------------------+-----+----------+-------+
    scala>
    </pre>
    
    ### CSVBenchmark Benchmarks:
    
    baseline = commit before partial results change
    PR = this PR
    master = master branch
    
    [baseline_CSVBenchmark-results.txt](https://github.com/apache/spark/files/2697109/baseline_CSVBenchmark-results.txt)
    [pr_CSVBenchmark-results.txt](https://github.com/apache/spark/files/2697110/pr_CSVBenchmark-results.txt)
    [master_CSVBenchmark-results.txt](https://github.com/apache/spark/files/2697111/master_CSVBenchmark-results.txt)
    
    ### JSONBenchmark Benchmarks:
    
    baseline = commit before partial results change
    PR = this PR
    master = master branch
    
    [baseline_JSONBenchmark-results.txt](https://github.com/apache/spark/files/2711040/baseline_JSONBenchmark-results.txt)
    [pr_JSONBenchmark-results.txt](https://github.com/apache/spark/files/2711041/pr_JSONBenchmark-results.txt)
    [master_JSONBenchmark-results.txt](https://github.com/apache/spark/files/2711042/master_JSONBenchmark-results.txt)
    
    ## How was this patch tested?
    
    - All SQL unit tests.
    - Added 2 CSV benchmarks
    - Python core and SQL tests
    
    Closes #23336 from bersprockets/csv-wide-row-opt2.
    
    Authored-by: Bruce Robbins <be...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../sql/catalyst/util/FailureSafeParser.scala      | 21 +++++----
 sql/core/benchmarks/CSVBenchmark-results.txt       | 31 +++++++------
 sql/core/benchmarks/JSONBenchmark-results.txt      | 54 +++++++++++++---------
 .../execution/datasources/csv/CSVBenchmark.scala   | 16 +++++++
 .../execution/datasources/json/JsonBenchmark.scala | 48 ++++++++++++++++++-
 5 files changed, 124 insertions(+), 46 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
index 4baf052..76745b1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
@@ -33,21 +33,26 @@ class FailureSafeParser[IN](
   private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord)
   private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord))
   private val resultRow = new GenericInternalRow(schema.length)
+  private val nullResult = new GenericInternalRow(schema.length)
 
   // This function takes 2 parameters: an optional partial result, and the bad record. If the given
   // schema doesn't contain a field for corrupted record, we just return the partial result or a
   // row with all fields null. If the given schema contains a field for corrupted record, we will
   // set the bad record to this field, and set other fields according to the partial result or null.
   private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = {
-    (row, badRecord) => {
-      var i = 0
-      while (i < actualSchema.length) {
-        val from = actualSchema(i)
-        resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, from.dataType)).orNull
-        i += 1
+    if (corruptFieldIndex.isDefined) {
+      (row, badRecord) => {
+        var i = 0
+        while (i < actualSchema.length) {
+          val from = actualSchema(i)
+          resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, from.dataType)).orNull
+          i += 1
+        }
+        resultRow(corruptFieldIndex.get) = badRecord()
+        resultRow
       }
-      corruptFieldIndex.foreach(index => resultRow(index) = badRecord())
-      resultRow
+    } else {
+      (row, _) => row.getOrElse(nullResult)
     }
   }
 
diff --git a/sql/core/benchmarks/CSVBenchmark-results.txt b/sql/core/benchmarks/CSVBenchmark-results.txt
index 865575b..4fef15b 100644
--- a/sql/core/benchmarks/CSVBenchmark-results.txt
+++ b/sql/core/benchmarks/CSVBenchmark-results.txt
@@ -2,26 +2,29 @@
 Benchmark to measure CSV read/write performance
 ================================================================================================
 
-OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
-Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Linux 3.16.0-31-generic
+Intel(R) Xeon(R) CPU @ 2.50GHz
 Parsing quoted values:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
-One quoted string                           64733 / 64839          0.0     1294653.1       1.0X
+One quoted string                           49754 / 50158          0.0      995072.2       1.0X
 
-OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
-Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Linux 3.16.0-31-generic
+Intel(R) Xeon(R) CPU @ 2.50GHz
 Wide rows with 1000 columns:             Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
-Select 1000 columns                       185609 / 189735          0.0      185608.6       1.0X
-Select 100 columns                          50195 / 51808          0.0       50194.8       3.7X
-Select one column                           39266 / 39293          0.0       39265.6       4.7X
-count()                                     10959 / 11000          0.1       10958.5      16.9X
+Select 1000 columns                       149402 / 151785          0.0      149401.9       1.0X
+Select 100 columns                          42986 / 43985          0.0       42986.1       3.5X
+Select one column                           33764 / 34057          0.0       33763.6       4.4X
+count()                                       9332 / 9508          0.1        9332.2      16.0X
+Select 100 columns, one bad input field     50963 / 51512          0.0       50962.5       2.9X
+Select 100 columns, corrupt record field    69627 / 71029          0.0       69627.5       2.1X
 
-OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
-Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Linux 3.16.0-31-generic
+Intel(R) Xeon(R) CPU @ 2.50GHz
 Count a dataset with 10 columns:         Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
-Select 10 columns + count()                 24637 / 24768          0.4        2463.7       1.0X
-Select 1 column + count()                   20026 / 20076          0.5        2002.6       1.2X
-count()                                       3754 / 3877          2.7         375.4       6.6X
+Select 10 columns + count()                 22588 / 22623          0.4        2258.8       1.0X
+Select 1 column + count()                   14649 / 14690          0.7        1464.9       1.5X
+count()                                       3385 / 3453          3.0         338.5       6.7X
+
 
diff --git a/sql/core/benchmarks/JSONBenchmark-results.txt b/sql/core/benchmarks/JSONBenchmark-results.txt
index 4774294..947f57d 100644
--- a/sql/core/benchmarks/JSONBenchmark-results.txt
+++ b/sql/core/benchmarks/JSONBenchmark-results.txt
@@ -3,46 +3,54 @@ Benchmark for performance of JSON parsing
 ================================================================================================
 
 Preparing data for benchmarking ...
-OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
-Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Linux 3.16.0-31-generic
+Intel(R) Xeon(R) CPU @ 2.50GHz
 JSON schema inferring:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
-No encoding                                 71832 / 72149          1.4         718.3       1.0X
-UTF-8 is set                              101700 / 101819          1.0        1017.0       0.7X
+No encoding                                 80821 / 82526          1.2         808.2       1.0X
+UTF-8 is set                              129478 / 130381          0.8        1294.8       0.6X
 
 Preparing data for benchmarking ...
-OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
-Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Linux 3.16.0-31-generic
+Intel(R) Xeon(R) CPU @ 2.50GHz
 count a short column:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
-No encoding                                 16501 / 16519          6.1         165.0       1.0X
-UTF-8 is set                                16477 / 16516          6.1         164.8       1.0X
+No encoding                                 16804 / 16948          6.0         168.0       1.0X
+UTF-8 is set                                16648 / 16757          6.0         166.5       1.0X
 
 Preparing data for benchmarking ...
-OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
-Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Linux 3.16.0-31-generic
+Intel(R) Xeon(R) CPU @ 2.50GHz
 count a wide column:                     Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
-No encoding                                 39871 / 40242          0.3        3987.1       1.0X
-UTF-8 is set                                39581 / 39721          0.3        3958.1       1.0X
+No encoding                                 30949 / 31058          0.3        3094.9       1.0X
+UTF-8 is set                                30629 / 33896          0.3        3062.9       1.0X
 
 Preparing data for benchmarking ...
-OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
-Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Linux 3.16.0-31-generic
+Intel(R) Xeon(R) CPU @ 2.50GHz
+select wide row:                         Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+No encoding                               123050 / 124199          0.0      246099.8       1.0X
+UTF-8 is set                              139306 / 142569          0.0      278612.7       0.9X
+
+Preparing data for benchmarking ...
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Linux 3.16.0-31-generic
+Intel(R) Xeon(R) CPU @ 2.50GHz
 Select a subset of 10 columns:           Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
-Select 10 columns + count()                 16011 / 16033          0.6        1601.1       1.0X
-Select 1 column + count()                   14350 / 14392          0.7        1435.0       1.1X
-count()                                       3007 / 3034          3.3         300.7       5.3X
+Select 10 columns + count()                 19539 / 19896          0.5        1953.9       1.0X
+Select 1 column + count()                   16412 / 16445          0.6        1641.2       1.2X
+count()                                       2783 / 2801          3.6         278.3       7.0X
 
 Preparing data for benchmarking ...
-OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
-Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Linux 3.16.0-31-generic
+Intel(R) Xeon(R) CPU @ 2.50GHz
 creation of JSON parser per line:        Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
-Short column without encoding                 8334 / 8453          1.2         833.4       1.0X
-Short column with UTF-8                     13627 / 13784          0.7        1362.7       0.6X
-Wide column without encoding              155073 / 155351          0.1       15507.3       0.1X
-Wide column with UTF-8                    212114 / 212263          0.0       21211.4       0.0X
+Short column without encoding                 9576 / 9612          1.0         957.6       1.0X
+Short column with UTF-8                     13555 / 13698          0.7        1355.5       0.7X
+Wide column without encoding              174761 / 175665          0.1       17476.1       0.1X
+Wide column with UTF-8                    203219 / 205151          0.0       20321.9       0.0X
 
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala
index ce38b08..6e6fc47 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala
@@ -92,6 +92,22 @@ object CSVBenchmark extends SqlBasedBenchmark {
         ds.count()
       }
 
+      val schemaErr1 = StructType(StructField("col0", DateType) +:
+        (1 until colsNum).map(i => StructField(s"col$i", IntegerType)))
+      val dsErr1 = spark.read.schema(schemaErr1).csv(path.getAbsolutePath)
+      benchmark.addCase(s"Select 100 columns, one bad input field", 3) { _ =>
+        dsErr1.select(cols100: _*).filter((row: Row) => true).count()
+      }
+
+      val badRecColName = "badRecord"
+      val schemaErr2 = schemaErr1.add(StructField(badRecColName, StringType))
+      val dsErr2 = spark.read.schema(schemaErr2)
+        .option("columnNameOfCorruptRecord", badRecColName)
+        .csv(path.getAbsolutePath)
+      benchmark.addCase(s"Select 100 columns, corrupt record field", 3) { _ =>
+        dsErr2.select((Column(badRecColName) +: cols100): _*).filter((row: Row) => true).count()
+      }
+
       benchmark.run()
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala
index f50c25e..27f7023 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala
@@ -59,12 +59,15 @@ object JSONBenchmark extends SqlBasedBenchmark {
         .json(path.getAbsolutePath)
 
       benchmark.addCase("No encoding", numIters) { _ =>
-        spark.read.json(path.getAbsolutePath)
+        spark.read
+          .option("inferTimestamp", false)
+          .json(path.getAbsolutePath)
       }
 
       benchmark.addCase("UTF-8 is set", numIters) { _ =>
         spark.read
           .option("encoding", "UTF-8")
+          .option("inferTimestamp", false)
           .json(path.getAbsolutePath)
       }
 
@@ -121,6 +124,18 @@ object JSONBenchmark extends SqlBasedBenchmark {
       .add("z", StringType)
   }
 
+  def writeWideRow(path: String, rowsNum: Int): StructType = {
+    val colsNum = 1000
+    val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType))
+    val schema = StructType(fields)
+
+    spark.range(rowsNum)
+      .select(Seq.tabulate(colsNum)(i => lit(i).as(s"col$i")): _*)
+      .write.json(path)
+
+    schema
+  }
+
   def countWideColumn(rowsNum: Int, numIters: Int): Unit = {
     val benchmark = new Benchmark("count a wide column", rowsNum, output = output)
 
@@ -147,6 +162,36 @@ object JSONBenchmark extends SqlBasedBenchmark {
     }
   }
 
+  def countWideRow(rowsNum: Int, numIters: Int): Unit = {
+    val benchmark = new Benchmark("select wide row", rowsNum, output = output)
+
+    withTempPath { path =>
+      prepareDataInfo(benchmark)
+      val schema = writeWideRow(path.getAbsolutePath, rowsNum)
+
+      benchmark.addCase("No encoding", numIters) { _ =>
+        spark.read
+          .schema(schema)
+          .json(path.getAbsolutePath)
+          .select("*")
+          .filter((row: Row) => true)
+          .count()
+      }
+
+      benchmark.addCase("UTF-8 is set", numIters) { _ =>
+        spark.read
+          .option("encoding", "UTF-8")
+          .schema(schema)
+          .json(path.getAbsolutePath)
+          .select("*")
+          .filter((row: Row) => true)
+          .count()
+      }
+
+      benchmark.run()
+    }
+  }
+
   def selectSubsetOfColumns(rowsNum: Int, numIters: Int): Unit = {
     val colsNum = 10
     val benchmark =
@@ -236,6 +281,7 @@ object JSONBenchmark extends SqlBasedBenchmark {
       schemaInferring(100 * 1000 * 1000, numIters)
       countShortColumn(100 * 1000 * 1000, numIters)
       countWideColumn(10 * 1000 * 1000, numIters)
+      countWideRow(500 * 1000, numIters)
       selectSubsetOfColumns(10 * 1000 * 1000, numIters)
       jsonParserCreation(10 * 1000 * 1000, numIters)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org