You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by js...@apache.org on 2018/01/11 03:51:22 UTC

spark git commit: [SPARK-22587] Spark job fails if fs.defaultFS and application jar are different url

Repository: spark
Updated Branches:
  refs/heads/master 9b33dfc40 -> a6647ffbf


[SPARK-22587] Spark job fails if fs.defaultFS and application jar are different url

## What changes were proposed in this pull request?

Two filesystems comparing does not consider the authority of URI. This is specific for
WASB file storage system, where userInfo is honored to differentiate filesystems.
For example: wasbs://user1xyz.net, wasbs://user2xyz.net would consider as two filesystem.
Therefore, we have to add the authority to compare two filesystem, and  two filesystem with different authority can not be the same FS.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Mingjie Tang <mt...@hortonworks.com>

Closes #19885 from merlintang/EAR-7377.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6647ffb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6647ffb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6647ffb

Branch: refs/heads/master
Commit: a6647ffbf7a312a3e119a9beef90880cc915aa60
Parents: 9b33dfc
Author: Mingjie Tang <mt...@hortonworks.com>
Authored: Thu Jan 11 11:51:03 2018 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Thu Jan 11 11:51:03 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/deploy/yarn/Client.scala   | 24 +++++++++++---
 .../apache/spark/deploy/yarn/ClientSuite.scala  | 33 ++++++++++++++++++++
 2 files changed, 53 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a6647ffb/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 15328d0..8cd3cd9 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1421,15 +1421,20 @@ private object Client extends Logging {
   }
 
   /**
-   * Return whether the two file systems are the same.
+   * Return whether two URI represent file system are the same
    */
-  private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
-    val srcUri = srcFs.getUri()
-    val dstUri = destFs.getUri()
+  private[spark] def compareUri(srcUri: URI, dstUri: URI): Boolean = {
+
     if (srcUri.getScheme() == null || srcUri.getScheme() != dstUri.getScheme()) {
       return false
     }
 
+    val srcAuthority = srcUri.getAuthority()
+    val dstAuthority = dstUri.getAuthority()
+    if (srcAuthority != null && !srcAuthority.equalsIgnoreCase(dstAuthority)) {
+      return false
+    }
+
     var srcHost = srcUri.getHost()
     var dstHost = dstUri.getHost()
 
@@ -1447,6 +1452,17 @@ private object Client extends Logging {
     }
 
     Objects.equal(srcHost, dstHost) && srcUri.getPort() == dstUri.getPort()
+
+  }
+
+  /**
+   * Return whether the two file systems are the same.
+   */
+  protected def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
+    val srcUri = srcFs.getUri()
+    val dstUri = destFs.getUri()
+
+    compareUri(srcUri, dstUri)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/a6647ffb/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 9d5f5eb..7fa5971 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -357,6 +357,39 @@ class ClientSuite extends SparkFunSuite with Matchers {
     sparkConf.get(SECONDARY_JARS) should be (Some(Seq(new File(jar2.toURI).getName)))
   }
 
+  private val matching = Seq(
+    ("files URI match test1", "file:///file1", "file:///file2"),
+    ("files URI match test2", "file:///c:file1", "file://c:file2"),
+    ("files URI match test3", "file://host/file1", "file://host/file2"),
+    ("wasb URI match test", "wasb://bucket1@user", "wasb://bucket1@user/"),
+    ("hdfs URI match test", "hdfs:/path1", "hdfs:/path1")
+  )
+
+  matching.foreach { t =>
+      test(t._1) {
+        assert(Client.compareUri(new URI(t._2), new URI(t._3)),
+          s"No match between ${t._2} and ${t._3}")
+      }
+  }
+
+  private val unmatching = Seq(
+    ("files URI unmatch test1", "file:///file1", "file://host/file2"),
+    ("files URI unmatch test2", "file://host/file1", "file:///file2"),
+    ("files URI unmatch test3", "file://host/file1", "file://host2/file2"),
+    ("wasb URI unmatch test1", "wasb://bucket1@user", "wasb://bucket2@user/"),
+    ("wasb URI unmatch test2", "wasb://bucket1@user", "wasb://bucket1@user2/"),
+    ("s3 URI unmatch test", "s3a://user@pass:bucket1/", "s3a://user2@pass2:bucket1/"),
+    ("hdfs URI unmatch test1", "hdfs://namenode1/path1", "hdfs://namenode1:8080/path2"),
+    ("hdfs URI unmatch test2", "hdfs://namenode1:8020/path1", "hdfs://namenode1:8080/path2")
+  )
+
+  unmatching.foreach { t =>
+      test(t._1) {
+        assert(!Client.compareUri(new URI(t._2), new URI(t._3)),
+          s"match between ${t._2} and ${t._3}")
+      }
+  }
+
   object Fixtures {
 
     val knownDefYarnAppCP: Seq[String] =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org