You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/06/22 06:43:25 UTC

spark git commit: [SPARK-21167][SS] Decode the path generated by File sink to handle special characters

Repository: spark
Updated Branches:
  refs/heads/master 53543374c -> d66b143ee


[SPARK-21167][SS] Decode the path generated by File sink to handle special characters

## What changes were proposed in this pull request?

Decode the path generated by File sink to handle special characters.

## How was this patch tested?

The added unit test.

Author: Shixiong Zhu <sh...@databricks.com>

Closes #18381 from zsxwing/SPARK-21167.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d66b143e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d66b143e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d66b143e

Branch: refs/heads/master
Commit: d66b143eec7f604595089f72d8786edbdcd74282
Parents: 5354337
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Wed Jun 21 23:43:21 2017 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Jun 21 23:43:21 2017 -0700

----------------------------------------------------------------------
 .../execution/streaming/FileStreamSinkLog.scala |  5 +++-
 .../sql/streaming/FileStreamSinkSuite.scala     | 29 ++++++++++++++++++++
 2 files changed, 33 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d66b143e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index 8d718b2..c9939ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.net.URI
+
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.json4s.NoTypeHints
 import org.json4s.jackson.Serialization
@@ -47,7 +49,8 @@ case class SinkFileStatus(
     action: String) {
 
   def toFileStatus: FileStatus = {
-    new FileStatus(size, isDir, blockReplication, blockSize, modificationTime, new Path(path))
+    new FileStatus(
+      size, isDir, blockReplication, blockSize, modificationTime, new Path(new URI(path)))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d66b143e/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 1a2d3a1..bb6a278 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -64,6 +64,35 @@ class FileStreamSinkSuite extends StreamTest {
     }
   }
 
+  test("SPARK-21167: encode and decode path correctly") {
+    val inputData = MemoryStream[String]
+    val ds = inputData.toDS()
+
+    val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
+    val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
+
+    val query = ds.map(s => (s, s.length))
+      .toDF("value", "len")
+      .writeStream
+      .partitionBy("value")
+      .option("checkpointLocation", checkpointDir)
+      .format("parquet")
+      .start(outputDir)
+
+    try {
+      // The output is partitoned by "value", so the value will appear in the file path.
+      // This is to test if we handle spaces in the path correctly.
+      inputData.addData("hello world")
+      failAfter(streamingTimeout) {
+        query.processAllAvailable()
+      }
+      val outputDf = spark.read.parquet(outputDir)
+      checkDatasetUnorderly(outputDf.as[(Int, String)], ("hello world".length, "hello world"))
+    } finally {
+      query.stop()
+    }
+  }
+
   test("partitioned writing and batch reading") {
     val inputData = MemoryStream[Int]
     val ds = inputData.toDS()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org