You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/01/24 12:19:13 UTC
spark git commit: [SPARK-23148][SQL] Allow pathnames with special
characters for CSV / JSON / text
Repository: spark
Updated Branches:
refs/heads/master 7af1a325d -> de36f65d3
[SPARK-23148][SQL] Allow pathnames with special characters for CSV / JSON / text
…JSON / text
## What changes were proposed in this pull request?
Fix for JSON and CSV data sources when file names include characters
that would be changed by URL encoding.
## How was this patch tested?
New unit tests for JSON, CSV and text suites
Author: Henry Robinson <he...@cloudera.com>
Closes #20355 from henryr/spark-23148.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de36f65d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de36f65d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de36f65d
Branch: refs/heads/master
Commit: de36f65d3a819c00d6bf6979deef46c824203669
Parents: 7af1a32
Author: Henry Robinson <he...@cloudera.com>
Authored: Wed Jan 24 21:19:09 2018 +0900
Committer: hyukjinkwon <gu...@gmail.com>
Committed: Wed Jan 24 21:19:09 2018 +0900
----------------------------------------------------------------------
.../sql/execution/datasources/CodecStreams.scala | 6 +++---
.../execution/datasources/csv/CSVDataSource.scala | 11 ++++++-----
.../datasources/json/JsonDataSource.scala | 10 ++++++----
.../spark/sql/FileBasedDataSourceSuite.scala | 18 ++++++++++++++++--
4 files changed, 31 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/de36f65d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala
index 54549f6..c0df6c7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala
@@ -45,11 +45,11 @@ object CodecStreams {
}
/**
- * Creates an input stream from the string path and add a closure for the input stream to be
+ * Creates an input stream from the given path and add a closure for the input stream to be
* closed on task completion.
*/
- def createInputStreamWithCloseResource(config: Configuration, path: String): InputStream = {
- val inputStream = createInputStream(config, new Path(path))
+ def createInputStreamWithCloseResource(config: Configuration, path: Path): InputStream = {
+ val inputStream = createInputStream(config, path)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => inputStream.close()))
inputStream
}
http://git-wip-us.apache.org/repos/asf/spark/blob/de36f65d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
index 2031381..4870d75 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
@@ -17,11 +17,12 @@
package org.apache.spark.sql.execution.datasources.csv
+import java.net.URI
import java.nio.charset.{Charset, StandardCharsets}
import com.univocity.parsers.csv.CsvParser
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileStatus
+import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.Job
@@ -32,7 +33,6 @@ import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
import org.apache.spark.rdd.{BinaryFileRDD, RDD}
import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
import org.apache.spark.sql.types.StructType
@@ -206,7 +206,7 @@ object MultiLineCSVDataSource extends CSVDataSource {
parser: UnivocityParser,
schema: StructType): Iterator[InternalRow] = {
UnivocityParser.parseStream(
- CodecStreams.createInputStreamWithCloseResource(conf, file.filePath),
+ CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath))),
parser.options.headerFlag,
parser,
schema)
@@ -218,8 +218,9 @@ object MultiLineCSVDataSource extends CSVDataSource {
parsedOptions: CSVOptions): StructType = {
val csv = createBaseRdd(sparkSession, inputPaths, parsedOptions)
csv.flatMap { lines =>
+ val path = new Path(lines.getPath())
UnivocityParser.tokenizeStream(
- CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, lines.getPath()),
+ CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, path),
shouldDropHeader = false,
new CsvParser(parsedOptions.asParserSettings))
}.take(1).headOption match {
@@ -230,7 +231,7 @@ object MultiLineCSVDataSource extends CSVDataSource {
UnivocityParser.tokenizeStream(
CodecStreams.createInputStreamWithCloseResource(
lines.getConfiguration,
- lines.getPath()),
+ new Path(lines.getPath())),
parsedOptions.headerFlag,
new CsvParser(parsedOptions.asParserSettings))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/de36f65d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
index 8b7c270..77e7edc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
@@ -18,11 +18,12 @@
package org.apache.spark.sql.execution.datasources.json
import java.io.InputStream
+import java.net.URI
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
import com.google.common.io.ByteStreams
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileStatus
+import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
@@ -168,9 +169,10 @@ object MultiLineJsonDataSource extends JsonDataSource {
}
private def createParser(jsonFactory: JsonFactory, record: PortableDataStream): JsonParser = {
+ val path = new Path(record.getPath())
CreateJacksonParser.inputStream(
jsonFactory,
- CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, record.getPath()))
+ CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path))
}
override def readFile(
@@ -180,7 +182,7 @@ object MultiLineJsonDataSource extends JsonDataSource {
schema: StructType): Iterator[InternalRow] = {
def partitionedFileString(ignored: Any): UTF8String = {
Utils.tryWithResource {
- CodecStreams.createInputStreamWithCloseResource(conf, file.filePath)
+ CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath)))
} { inputStream =>
UTF8String.fromBytes(ByteStreams.toByteArray(inputStream))
}
@@ -193,6 +195,6 @@ object MultiLineJsonDataSource extends JsonDataSource {
parser.options.columnNameOfCorruptRecord)
safeParser.parse(
- CodecStreams.createInputStreamWithCloseResource(conf, file.filePath))
+ CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath))))
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/de36f65d/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 22fb496..c272c99 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -23,6 +23,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext {
import testImplicits._
private val allFileBasedDataSources = Seq("orc", "parquet", "csv", "json", "text")
+ private val nameWithSpecialChars = "sp&cial%c hars"
allFileBasedDataSources.foreach { format =>
test(s"Writing empty datasets should not fail - $format") {
@@ -54,7 +55,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext {
// Only ORC/Parquet support this. `CSV` and `JSON` returns an empty schema.
// `TEXT` data source always has a single column whose name is `value`.
Seq("orc", "parquet").foreach { format =>
- test(s"SPARK-15474 Write and read back non-emtpy schema with empty dataframe - $format") {
+ test(s"SPARK-15474 Write and read back non-empty schema with empty dataframe - $format") {
withTempPath { file =>
val path = file.getCanonicalPath
val emptyDf = Seq((true, 1, "str")).toDF().limit(0)
@@ -69,7 +70,6 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext {
allFileBasedDataSources.foreach { format =>
test(s"SPARK-22146 read files containing special characters using $format") {
- val nameWithSpecialChars = s"sp&cial%chars"
withTempDir { dir =>
val tmpFile = s"$dir/$nameWithSpecialChars"
spark.createDataset(Seq("a", "b")).write.format(format).save(tmpFile)
@@ -78,4 +78,18 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext {
}
}
}
+
+ // Separate test case for formats that support multiLine as an option.
+ Seq("json", "csv").foreach { format =>
+ test("SPARK-23148 read files containing special characters " +
+ s"using $format with multiline enabled") {
+ withTempDir { dir =>
+ val tmpFile = s"$dir/$nameWithSpecialChars"
+ spark.createDataset(Seq("a", "b")).write.format(format).save(tmpFile)
+ val reader = spark.read.format(format).option("multiLine", true)
+ val fileContent = reader.load(tmpFile)
+ checkAnswer(fileContent, Seq(Row("a"), Row("b")))
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org