You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Shyam (Jira)" <ji...@apache.org> on 2020/02/24 15:10:02 UTC

[jira] [Commented] (SPARK-26825) Spark Structure Streaming job failing when submitted in cluster mode

    [ https://issues.apache.org/jira/browse/SPARK-26825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17043593#comment-17043593 ] 

Shyam commented on SPARK-26825:
-------------------------------

[~asdaraujo] [~gsomogyi] I am facing the same issue in spark-sql-2.4.1version , how to replace/overwrite what is returning by this Utils.createTempDir code ?

Any more clue.

> Spark Structure Streaming job failing when submitted in cluster mode
> --------------------------------------------------------------------
>
>                 Key: SPARK-26825
>                 URL: https://issues.apache.org/jira/browse/SPARK-26825
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>            Reporter: Andre Araujo
>            Priority: Major
>
> I have a structured streaming job that runs successfully when launched in "client" mode. However, when launched in "cluster" mode it fails with the following weird messages on the error log. Note that the path in the error message is actually a local filesystem path that has been mistakenly prefixed with a {{hdfs://}} scheme.
> {code}
> 19/02/01 12:53:14 ERROR streaming.StreamMetadata: Error writing stream metadata StreamMetadata(68f9fb30-5853-49b4-b192-f1e0483e0d95) to hdfs://ns1/data/yarn/nm/usercache/root/appcache/application_1548823131831_0160/container_1548823131831_0160_02_000001/tmp/temporary-3789423a-6ded-4084-aab3-3b6301c34e07/metadataorg.apache.hadoop.security.AccessControlException: Permission denied: user=root, access=WRITE, inode="/":hdfs:supergroup:drwxr-xr-x
> 	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:400)
> 	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256)
> 	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194)
> 	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1853)
> {code}
> I dug a little bit into this and here's what I think it's going on:
> # When a new streaming query is created, the {{StreamingQueryManager}} determines the checkpoint location [here|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L216]. If neither the user nor the Spark conf specify a checkpoint location, the location is returned by a call to {{Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath}}. 
>    Here, I see two issues:
> #* The canonical path returned by {{Utils.createTempDir}} does *not* have a scheme ({{hdfs://}} or {{file://}}), so, it's ambiguous as to what type of file system the path belongs to.
> #* Also note that the path returned by the {{Utils.createTempDir}} call is a local path, not a HDFS path, as the paths returned by the other two conditions. I executed {{Utils.createTempDir}} in a test job, both in cluster and client modes, and the results are these:
> {code}
> *Client mode:*
> java.io.tmpdir=/tmp
> createTempDir(namePrefix = s"temporary") => /tmp/temporary-c51f1466-fd50-40c7-b136-1f2f06672e25
> *Cluster mode:*
> java.io.tmpdir=/yarn/nm/usercache/root/appcache/application_1549064555573_0029/container_1549064555573_0029_01_000001/tmp/
> createTempDir(namePrefix = s"temporary") => /yarn/nm/usercache/root/appcache/application_1549064555573_0029/container_1549064555573_0029_01_000001/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e
> {code}
> # This temporary checkpoint location is then [passed to the constructor|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L276] of the {{MicroBatchExecution}} instance
> # This is the point where [{{resolvedCheckpointRoot}} is calculated|https://github.com/apache/spark/blob/755f9c20761e3db900c6c2b202cd3d2c5bbfb7c0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L89]. Here, it's where things start to break: since the path returned by {{Utils.createTempDir}} doesn't have a scheme, and since HDFS is the default filesystem, the code resolves the path as being a HDFS path, rather than a local one, as shown below:
> {code}
> scala> import org.apache.hadoop.fs.Path
> import org.apache.hadoop.fs.Path
> scala> // value returned by the Utils.createTempDir method
> scala> val checkpointRoot = "/yarn/nm/usercache/root/appcache/application_1549064555573_0029/container_1549064555573_0029_01_000001/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e"
> checkpointRoot: String = /yarn/nm/usercache/root/appcache/application_1549064555573_0029/container_1549064555573_0029_01_000001/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e
> scala> val checkpointPath = new Path(checkpointRoot)
> checkpointPath: org.apache.hadoop.fs.Path = /yarn/nm/usercache/root/appcache/application_1549064555573_0029/container_1549064555573_0029_01_000001/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e
> scala> // The resolved FS is a DFS, rather than a local filesystem, due to the lack of a scheme in the path
> scala> val fs = checkpointPath.getFileSystem(spark.sessionState.newHadoopConf())
> fs: org.apache.hadoop.fs.FileSystem = DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_632752661_1, ugi=root (auth:SIMPLE)]]
> scala> // The generated path is invalid: it's a local path prefixed with a hdfs: scheme
> scala> checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toUri.toString
> res1: String = hdfs://ns1/yarn/nm/usercache/root/appcache/application_1549064555573_0029/container_1549064555573_0029_01_000001/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e
> {code}
> # Then, when the job tries to save the metadata in the path above it fails because the path doesn't exist (the actual message is a "permission denied" message because the user doesn't have permission to create the "/data" directory in the HDFS root)
> I believe this could be fixed by simply:
> * Replacing the call to {{Utils.createTempDir}} with something that creates a temp dir on HDFS, rather than local filesystem
> * Ensuring this method returns a path qualified with a scheme (hdfs:), to avoid later fs resolution mistakes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org