You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/01/24 12:19:13 UTC

spark git commit: [SPARK-23148][SQL] Allow pathnames with special characters for CSV / JSON / text

Repository: spark
Updated Branches:
  refs/heads/master 7af1a325d -> de36f65d3


[SPARK-23148][SQL] Allow pathnames with special characters for CSV / JSON / text

…JSON / text

## What changes were proposed in this pull request?

Fix for JSON and CSV data sources when file names include characters
that would be changed by URL encoding.

## How was this patch tested?

New unit tests for JSON, CSV and text suites

Author: Henry Robinson <he...@cloudera.com>

Closes #20355 from henryr/spark-23148.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de36f65d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de36f65d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de36f65d

Branch: refs/heads/master
Commit: de36f65d3a819c00d6bf6979deef46c824203669
Parents: 7af1a32
Author: Henry Robinson <he...@cloudera.com>
Authored: Wed Jan 24 21:19:09 2018 +0900
Committer: hyukjinkwon <gu...@gmail.com>
Committed: Wed Jan 24 21:19:09 2018 +0900

----------------------------------------------------------------------
 .../sql/execution/datasources/CodecStreams.scala  |  6 +++---
 .../execution/datasources/csv/CSVDataSource.scala | 11 ++++++-----
 .../datasources/json/JsonDataSource.scala         | 10 ++++++----
 .../spark/sql/FileBasedDataSourceSuite.scala      | 18 ++++++++++++++++--
 4 files changed, 31 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/de36f65d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala
index 54549f6..c0df6c7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala
@@ -45,11 +45,11 @@ object CodecStreams {
   }
 
   /**
-   * Creates an input stream from the string path and add a closure for the input stream to be
+   * Creates an input stream from the given path and add a closure for the input stream to be
    * closed on task completion.
    */
-  def createInputStreamWithCloseResource(config: Configuration, path: String): InputStream = {
-    val inputStream = createInputStream(config, new Path(path))
+  def createInputStreamWithCloseResource(config: Configuration, path: Path): InputStream = {
+    val inputStream = createInputStream(config, path)
     Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => inputStream.close()))
     inputStream
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/de36f65d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
index 2031381..4870d75 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
@@ -17,11 +17,12 @@
 
 package org.apache.spark.sql.execution.datasources.csv
 
+import java.net.URI
 import java.nio.charset.{Charset, StandardCharsets}
 
 import com.univocity.parsers.csv.CsvParser
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileStatus
+import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.io.{LongWritable, Text}
 import org.apache.hadoop.mapred.TextInputFormat
 import org.apache.hadoop.mapreduce.Job
@@ -32,7 +33,6 @@ import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
 import org.apache.spark.rdd.{BinaryFileRDD, RDD}
 import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.text.TextFileFormat
 import org.apache.spark.sql.types.StructType
@@ -206,7 +206,7 @@ object MultiLineCSVDataSource extends CSVDataSource {
       parser: UnivocityParser,
       schema: StructType): Iterator[InternalRow] = {
     UnivocityParser.parseStream(
-      CodecStreams.createInputStreamWithCloseResource(conf, file.filePath),
+      CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath))),
       parser.options.headerFlag,
       parser,
       schema)
@@ -218,8 +218,9 @@ object MultiLineCSVDataSource extends CSVDataSource {
       parsedOptions: CSVOptions): StructType = {
     val csv = createBaseRdd(sparkSession, inputPaths, parsedOptions)
     csv.flatMap { lines =>
+      val path = new Path(lines.getPath())
       UnivocityParser.tokenizeStream(
-        CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, lines.getPath()),
+        CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, path),
         shouldDropHeader = false,
         new CsvParser(parsedOptions.asParserSettings))
     }.take(1).headOption match {
@@ -230,7 +231,7 @@ object MultiLineCSVDataSource extends CSVDataSource {
           UnivocityParser.tokenizeStream(
             CodecStreams.createInputStreamWithCloseResource(
               lines.getConfiguration,
-              lines.getPath()),
+              new Path(lines.getPath())),
             parsedOptions.headerFlag,
             new CsvParser(parsedOptions.asParserSettings))
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/de36f65d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
index 8b7c270..77e7edc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
@@ -18,11 +18,12 @@
 package org.apache.spark.sql.execution.datasources.json
 
 import java.io.InputStream
+import java.net.URI
 
 import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
 import com.google.common.io.ByteStreams
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileStatus
+import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.io.Text
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
@@ -168,9 +169,10 @@ object MultiLineJsonDataSource extends JsonDataSource {
   }
 
   private def createParser(jsonFactory: JsonFactory, record: PortableDataStream): JsonParser = {
+    val path = new Path(record.getPath())
     CreateJacksonParser.inputStream(
       jsonFactory,
-      CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, record.getPath()))
+      CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path))
   }
 
   override def readFile(
@@ -180,7 +182,7 @@ object MultiLineJsonDataSource extends JsonDataSource {
       schema: StructType): Iterator[InternalRow] = {
     def partitionedFileString(ignored: Any): UTF8String = {
       Utils.tryWithResource {
-        CodecStreams.createInputStreamWithCloseResource(conf, file.filePath)
+        CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath)))
       } { inputStream =>
         UTF8String.fromBytes(ByteStreams.toByteArray(inputStream))
       }
@@ -193,6 +195,6 @@ object MultiLineJsonDataSource extends JsonDataSource {
       parser.options.columnNameOfCorruptRecord)
 
     safeParser.parse(
-      CodecStreams.createInputStreamWithCloseResource(conf, file.filePath))
+      CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath))))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/de36f65d/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 22fb496..c272c99 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -23,6 +23,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext {
   import testImplicits._
 
   private val allFileBasedDataSources = Seq("orc", "parquet", "csv", "json", "text")
+  private val nameWithSpecialChars = "sp&cial%c hars"
 
   allFileBasedDataSources.foreach { format =>
     test(s"Writing empty datasets should not fail - $format") {
@@ -54,7 +55,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext {
   // Only ORC/Parquet support this. `CSV` and `JSON` returns an empty schema.
   // `TEXT` data source always has a single column whose name is `value`.
   Seq("orc", "parquet").foreach { format =>
-    test(s"SPARK-15474 Write and read back non-emtpy schema with empty dataframe - $format") {
+    test(s"SPARK-15474 Write and read back non-empty schema with empty dataframe - $format") {
       withTempPath { file =>
         val path = file.getCanonicalPath
         val emptyDf = Seq((true, 1, "str")).toDF().limit(0)
@@ -69,7 +70,6 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext {
 
   allFileBasedDataSources.foreach { format =>
     test(s"SPARK-22146 read files containing special characters using $format") {
-      val nameWithSpecialChars = s"sp&cial%chars"
       withTempDir { dir =>
         val tmpFile = s"$dir/$nameWithSpecialChars"
         spark.createDataset(Seq("a", "b")).write.format(format).save(tmpFile)
@@ -78,4 +78,18 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext {
       }
     }
   }
+
+  // Separate test case for formats that support multiLine as an option.
+  Seq("json", "csv").foreach { format =>
+    test("SPARK-23148 read files containing special characters " +
+      s"using $format with multiline enabled") {
+      withTempDir { dir =>
+        val tmpFile = s"$dir/$nameWithSpecialChars"
+        spark.createDataset(Seq("a", "b")).write.format(format).save(tmpFile)
+        val reader = spark.read.format(format).option("multiLine", true)
+        val fileContent = reader.load(tmpFile)
+        checkAnswer(fileContent, Seq(Row("a"), Row("b")))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org