You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "jaceklaskowski (via GitHub)" <gi...@apache.org> on 2023/03/27 11:13:07 UTC

[GitHub] [spark] jaceklaskowski commented on a diff in pull request #39907: [SPARK-42359][SQL] Support row skipping when reading CSV files

jaceklaskowski commented on code in PR #39907:
URL: https://github.com/apache/spark/pull/39907#discussion_r1149152549


##########
docs/sql-data-sources-csv.md:
##########
@@ -102,6 +102,12 @@ Data source options of CSV can be set via:
     <td>For reading, uses the first line as names of columns. For writing, writes the names of columns as the first line. Note that if the given path is a RDD of Strings, this header option will remove all lines same with the header if exists. CSV built-in functions ignore this option.</td>
     <td>read/write</td>
   </tr>
+  <tr>
+    <td><code>skipLines</code></td>
+    <td>0</td>
+    <td>Sets a number of non-empty, uncommented lines to skip before parsing each of the CSV files. If the <code>header</code> option is set to <code>true</code>, the first line after the number of <code>skipLines</code> will be taken as the header.</td>

Review Comment:
   nit: s/a number/the number + "before parsing CSV files"



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala:
##########
@@ -25,21 +25,33 @@ import org.apache.spark.sql.functions._
 
 object CSVUtils {
   /**
-   * Filter ignorable rows for CSV dataset (lines empty and starting with `comment`).
-   * This is currently being used in CSV schema inference.
+   * Filter blank lines, remove comments, and skip specified rows from a CSV iterator. Then blank
+   * entries (and comments if set) are removed. This is currently being used in CSV schema
+   * inference.
    */
-  def filterCommentAndEmpty(lines: Dataset[String], options: CSVOptions): Dataset[String] = {
+  def filterUnwantedLines(lines: Dataset[String], options: CSVOptions): Dataset[String] = {
     // Note that this was separately made by SPARK-18362. Logically, this should be the same
-    // with the one below, `filterCommentAndEmpty` but execution path is different. One of them
+    // with the one below, `filterUnwantedLines` but execution path is different. One of them
     // might have to be removed in the near future if possible.
     import lines.sqlContext.implicits._
     val aliased = lines.toDF("value")
     val nonEmptyLines = aliased.filter(length(trim($"value")) > 0)
-    if (options.isCommentSet) {
-      nonEmptyLines.filter(!$"value".startsWith(options.comment.toString)).as[String]
-    } else {
-      nonEmptyLines.as[String]
+    val commentFilteredLines = {
+      if (options.isCommentSet) {
+        nonEmptyLines.filter(!$"value".startsWith(options.comment.toString)).as[String]
+      } else {
+        nonEmptyLines.as[String]
+      }
     }
+    // Note that unlike actual CSV reading path, it simply filters the given skipped lines.
+    // Therefore, this skips the line same with the skipped lines if exists.
+    val linesToSkip = commentFilteredLines.head(options.skipLines)
+    commentFilteredLines.rdd

Review Comment:
   Is `.rdd` required here? `Dataset.mapPartitions` should give us what we want, shouldn't it?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala:
##########
@@ -25,21 +25,33 @@ import org.apache.spark.sql.functions._
 
 object CSVUtils {
   /**
-   * Filter ignorable rows for CSV dataset (lines empty and starting with `comment`).
-   * This is currently being used in CSV schema inference.
+   * Filter blank lines, remove comments, and skip specified rows from a CSV iterator. Then blank
+   * entries (and comments if set) are removed. This is currently being used in CSV schema
+   * inference.
    */
-  def filterCommentAndEmpty(lines: Dataset[String], options: CSVOptions): Dataset[String] = {
+  def filterUnwantedLines(lines: Dataset[String], options: CSVOptions): Dataset[String] = {
     // Note that this was separately made by SPARK-18362. Logically, this should be the same
-    // with the one below, `filterCommentAndEmpty` but execution path is different. One of them
+    // with the one below, `filterUnwantedLines` but execution path is different. One of them
     // might have to be removed in the near future if possible.
     import lines.sqlContext.implicits._
     val aliased = lines.toDF("value")
     val nonEmptyLines = aliased.filter(length(trim($"value")) > 0)
-    if (options.isCommentSet) {
-      nonEmptyLines.filter(!$"value".startsWith(options.comment.toString)).as[String]
-    } else {
-      nonEmptyLines.as[String]
+    val commentFilteredLines = {
+      if (options.isCommentSet) {
+        nonEmptyLines.filter(!$"value".startsWith(options.comment.toString)).as[String]
+      } else {
+        nonEmptyLines.as[String]
+      }
     }
+    // Note that unlike actual CSV reading path, it simply filters the given skipped lines.
+    // Therefore, this skips the line same with the skipped lines if exists.
+    val linesToSkip = commentFilteredLines.head(options.skipLines)
+    commentFilteredLines.rdd
+      .mapPartitions { iter =>
+        iter.filterNot(linesToSkip.contains(_))

Review Comment:
   `contains` should be enough. Moreover, the following should work too:
   
   ```
   .mapPartitions(_.filterNot(linesToSkip.contains))
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala:
##########
@@ -401,10 +404,17 @@ private[sql] object UnivocityParser {
       schema,
       parser.options.columnNameOfCorruptRecord)
 
+    val handleSkipLines: () => Unit =
+      () => 1.to(parser.options.skipLines).foreach(_ => tokenizer.parseNext())
     val handleHeader: () => Unit =
       () => headerChecker.checkHeaderColumnNames(tokenizer)
 
-    convertStream(inputStream, tokenizer, handleHeader, parser.options.charset) { tokens =>
+    convertStream(
+      inputStream,
+      tokenizer,
+      handleHeader,
+      handleSkipLines,
+      parser.options.charset) { tokens =>
       safeParser.parse(tokens)

Review Comment:
   `safeParser.parse` should be enough. If so, replace `{`s with `(`s, too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org