You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/06/22 06:43:25 UTC
spark git commit: [SPARK-21167][SS] Decode the path generated by File
sink to handle special characters
Repository: spark
Updated Branches:
refs/heads/master 53543374c -> d66b143ee
[SPARK-21167][SS] Decode the path generated by File sink to handle special characters
## What changes were proposed in this pull request?
Decode the path generated by File sink to handle special characters.
## How was this patch tested?
The added unit test.
Author: Shixiong Zhu <sh...@databricks.com>
Closes #18381 from zsxwing/SPARK-21167.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d66b143e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d66b143e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d66b143e
Branch: refs/heads/master
Commit: d66b143eec7f604595089f72d8786edbdcd74282
Parents: 5354337
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Wed Jun 21 23:43:21 2017 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Jun 21 23:43:21 2017 -0700
----------------------------------------------------------------------
.../execution/streaming/FileStreamSinkLog.scala | 5 +++-
.../sql/streaming/FileStreamSinkSuite.scala | 29 ++++++++++++++++++++
2 files changed, 33 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d66b143e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index 8d718b2..c9939ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.streaming
+import java.net.URI
+
import org.apache.hadoop.fs.{FileStatus, Path}
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
@@ -47,7 +49,8 @@ case class SinkFileStatus(
action: String) {
def toFileStatus: FileStatus = {
- new FileStatus(size, isDir, blockReplication, blockSize, modificationTime, new Path(path))
+ new FileStatus(
+ size, isDir, blockReplication, blockSize, modificationTime, new Path(new URI(path)))
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d66b143e/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 1a2d3a1..bb6a278 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -64,6 +64,35 @@ class FileStreamSinkSuite extends StreamTest {
}
}
+ test("SPARK-21167: encode and decode path correctly") {
+ val inputData = MemoryStream[String]
+ val ds = inputData.toDS()
+
+ val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
+ val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
+
+ val query = ds.map(s => (s, s.length))
+ .toDF("value", "len")
+ .writeStream
+ .partitionBy("value")
+ .option("checkpointLocation", checkpointDir)
+ .format("parquet")
+ .start(outputDir)
+
+ try {
+ // The output is partitoned by "value", so the value will appear in the file path.
+ // This is to test if we handle spaces in the path correctly.
+ inputData.addData("hello world")
+ failAfter(streamingTimeout) {
+ query.processAllAvailable()
+ }
+ val outputDf = spark.read.parquet(outputDir)
+ checkDatasetUnorderly(outputDf.as[(Int, String)], ("hello world".length, "hello world"))
+ } finally {
+ query.stop()
+ }
+ }
+
test("partitioned writing and batch reading") {
val inputData = MemoryStream[Int]
val ds = inputData.toDS()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org