You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/02/07 05:03:26 UTC

spark git commit: [SPARK-19407][SS] defaultFS is used FileSystem.get instead of getting it from uri scheme

Repository: spark
Updated Branches:
  refs/heads/master fab0d62a7 -> 7a0a630e0


[SPARK-19407][SS] defaultFS is used FileSystem.get instead of getting it from uri scheme

## What changes were proposed in this pull request?

```
Caused by: java.lang.IllegalArgumentException: Wrong FS: s3a://**************/checkpoint/7b2231a3-d845-4740-bfa3-681850e5987f/metadata, expected: file:///
	at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:649)
	at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:82)
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:606)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
	at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
	at org.apache.spark.sql.execution.streaming.StreamMetadata$.read(StreamMetadata.scala:51)
	at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:100)
	at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
	at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262)
```

Can easily replicate on spark standalone cluster by providing checkpoint location uri scheme anything other than "file://" and not overriding in config.

WorkAround  --conf spark.hadoop.fs.defaultFS=s3a://somebucket or set it in sparkConf or spark-default.conf

## How was this patch tested?

existing ut

Author: uncleGen <hu...@gmail.com>

Closes #16815 from uncleGen/SPARK-19407.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a0a630e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a0a630e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a0a630e

Branch: refs/heads/master
Commit: 7a0a630e0f699017c7d0214923cd4aa0227e62ff
Parents: fab0d62
Author: uncleGen <hu...@gmail.com>
Authored: Mon Feb 6 21:03:20 2017 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Mon Feb 6 21:03:20 2017 -0800

----------------------------------------------------------------------
 .../apache/spark/sql/execution/streaming/StreamMetadata.scala    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7a0a630e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
index 7807c9f..0bc54ea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
@@ -47,7 +47,7 @@ object StreamMetadata extends Logging {
 
   /** Read the metadata from file if it exists */
   def read(metadataFile: Path, hadoopConf: Configuration): Option[StreamMetadata] = {
-    val fs = FileSystem.get(hadoopConf)
+    val fs = metadataFile.getFileSystem(hadoopConf)
     if (fs.exists(metadataFile)) {
       var input: FSDataInputStream = null
       try {
@@ -72,7 +72,7 @@ object StreamMetadata extends Logging {
       hadoopConf: Configuration): Unit = {
     var output: FSDataOutputStream = null
     try {
-      val fs = FileSystem.get(hadoopConf)
+      val fs = metadataFile.getFileSystem(hadoopConf)
       output = fs.create(metadataFile)
       val writer = new OutputStreamWriter(output)
       Serialization.write(metadata, writer)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org