You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Wing Yew Poon (JIRA)" <ji...@apache.org> on 2017/10/31 16:05:00 UTC

[jira] [Updated] (SPARK-22403) StructuredKafkaWordCount example fails in YARN cluster mode

     [ https://issues.apache.org/jira/browse/SPARK-22403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Wing Yew Poon updated SPARK-22403:
----------------------------------
    Description: 
When I run the StructuredKafkaWordCount example in YARN client mode, it runs fine. However, when I run it in YARN cluster mode, the application errors during initialization, and dies after the default number of YARN application attempts. In the AM log, I see
{noformat}
17/10/30 11:34:52 INFO execution.SparkSqlParser: Parsing command: CAST(value AS STRING)
17/10/30 11:34:53 ERROR streaming.StreamMetadata: Error writing stream metadata StreamMetadata(b71ca714-a7a1-467f-96aa-023375964429) to /data/yarn/nm/usercache/systest/appcache/application_1508800814252_0047/container_1508800814252_0047_01_000001/tmp/temporary-b5ced4ae-32e0-4725-b905-aad679aec9b5/metadata
org.apache.hadoop.security.AccessControlException: Permission denied: user=systest, access=WRITE, inode="/":hdfs:supergroup:drwxr-xr-x
	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:397)
	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256)
	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194)
	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1842)
	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1826)
	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1785)
	at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.resolvePathForStartFile(FSDirWriteFileOp.java:315)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2313)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2257)
...
        at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:280)
	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1235)
	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1214)
	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1152)
	at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:458)
	at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:455)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:469)
	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:396)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1103)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1083)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:972)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:960)
	at org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:76)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:116)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:114)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:114)
	at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:240)
	at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
	at org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:79)
	at org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKafkaWordCount.scala)
{noformat}
Looking at StreamingQueryManager#createQuery, we have
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L198
{code}
    val checkpointLocation = userSpecifiedCheckpointLocation.map { ...
      ...
    }.orElse {
      ...
    }.getOrElse {
      if (useTempCheckpointLocation) {
        // Delete the temp checkpoint when a query is being stopped without errors.
        deleteCheckpointOnStop = true
        Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
      } else {
        ...
      }
    }
{code}
And Utils.createTempDir has
{code}
  def createTempDir(
      root: String = System.getProperty("java.io.tmpdir"),
      namePrefix: String = "spark"): File = {
    val dir = createDirectory(root, namePrefix)
    ShutdownHookManager.registerShutdownDeleteDir(dir)
    dir
  }
{code}
In client mode, java.io.tmpdir is set to "/tmp", which also exists in HDFS and has permissions 1777. In cluster mode, java.io.tmpdir is set in the YARN AM to "$PWD/tmp", where PWD is "${yarn.nodemanager.local-dirs}/usercache/${user}/appcache/application_${appid}/container_${contid}".
The problem is that Spark is using java.io.tmpdir, which is a path in the local filesystem, as a path in HDFS. When that path is "/tmp", which happens to exist in HDFS, no problem arises, but that is just by coincidence.

  was:
When I run the StructuredKafkaWordCount example in YARN client mode, it runs fine. However, when I run it in YARN cluster mode, the application errors during initialization, and dies after the default number of YARN application attempts. In the AM log, I see
{noformat}
17/10/30 11:34:52 INFO execution.SparkSqlParser: Parsing command: CAST(value AS STRING)
17/10/30 11:34:53 ERROR streaming.StreamMetadata: Error writing stream metadata StreamMetadata(b71ca714-a7a1-467f-96aa-023375964429) to /data/yarn/nm/usercache/systest/appcache/application_1508800814252_0047/container_1508800814252_0047_01_000001/tmp/temporary-b5ced4ae-32e0-4725-b905-aad679aec9b5/metadata
org.apache.hadoop.security.AccessControlException: Permission denied: user=systest, access=WRITE, inode="/":hdfs:supergroup:drwxr-xr-x
	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:397)
	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256)
	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194)
	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1842)
	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1826)
	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1785)
	at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.resolvePathForStartFile(FSDirWriteFileOp.java:315)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2313)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2257)
...
        at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:280)
	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1235)
	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1214)
	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1152)
	at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:458)
	at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:455)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:469)
	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:396)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1103)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1083)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:972)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:960)
	at org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:76)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:116)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:114)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:114)
	at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:240)
	at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
	at org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:79)
	at org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKafkaWordCount.scala)
{noformat}
Looking at StreamingQueryManager#createQuery, we have
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L198
{code}
    val checkpointLocation = userSpecifiedCheckpointLocation.map { ...
      ...
    }.orElse {
      ...
    }.getOrElse {
      if (useTempCheckpointLocation) {
        // Delete the temp checkpoint when a query is being stopped without errors.
        deleteCheckpointOnStop = true
        Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
      } else {
        ...
      }
    }
{code}
And Utils.createTempDir has
{code}
  def createTempDir(
      root: String = System.getProperty("java.io.tmpdir"),
      namePrefix: String = "spark"): File = {
    val dir = createDirectory(root, namePrefix)
    ShutdownHookManager.registerShutdownDeleteDir(dir)
    dir
  }
{code}
In client mode, java.io.tmpdir is set to "/tmp", which also exists in HDFS and has permissions 1777. In cluster mode, java.io.tmpdir is set in the YARN AM to "$PWD/tmp", where PWD is "/yarn/nm/usercache/<user>/appcache/<app id>/<container id>".
The problem is that Spark is using java.io.tmpdir, which is a path in the local filesystem, as a path in HDFS. When that path is "/tmp", which happens to exist in HDFS, no problem arises, but that is just by coincidence.


> StructuredKafkaWordCount example fails in YARN cluster mode
> -----------------------------------------------------------
>
>                 Key: SPARK-22403
>                 URL: https://issues.apache.org/jira/browse/SPARK-22403
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.2.0
>            Reporter: Wing Yew Poon
>
> When I run the StructuredKafkaWordCount example in YARN client mode, it runs fine. However, when I run it in YARN cluster mode, the application errors during initialization, and dies after the default number of YARN application attempts. In the AM log, I see
> {noformat}
> 17/10/30 11:34:52 INFO execution.SparkSqlParser: Parsing command: CAST(value AS STRING)
> 17/10/30 11:34:53 ERROR streaming.StreamMetadata: Error writing stream metadata StreamMetadata(b71ca714-a7a1-467f-96aa-023375964429) to /data/yarn/nm/usercache/systest/appcache/application_1508800814252_0047/container_1508800814252_0047_01_000001/tmp/temporary-b5ced4ae-32e0-4725-b905-aad679aec9b5/metadata
> org.apache.hadoop.security.AccessControlException: Permission denied: user=systest, access=WRITE, inode="/":hdfs:supergroup:drwxr-xr-x
> 	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:397)
> 	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256)
> 	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194)
> 	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1842)
> 	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1826)
> 	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1785)
> 	at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.resolvePathForStartFile(FSDirWriteFileOp.java:315)
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2313)
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2257)
> ...
>         at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:280)
> 	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1235)
> 	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1214)
> 	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1152)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:458)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:455)
> 	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:469)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:396)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1103)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1083)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:972)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:960)
> 	at org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:76)
> 	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:116)
> 	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:114)
> 	at scala.Option.getOrElse(Option.scala:121)
> 	at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:114)
> 	at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:240)
> 	at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
> 	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
> 	at org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:79)
> 	at org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKafkaWordCount.scala)
> {noformat}
> Looking at StreamingQueryManager#createQuery, we have
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L198
> {code}
>     val checkpointLocation = userSpecifiedCheckpointLocation.map { ...
>       ...
>     }.orElse {
>       ...
>     }.getOrElse {
>       if (useTempCheckpointLocation) {
>         // Delete the temp checkpoint when a query is being stopped without errors.
>         deleteCheckpointOnStop = true
>         Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
>       } else {
>         ...
>       }
>     }
> {code}
> And Utils.createTempDir has
> {code}
>   def createTempDir(
>       root: String = System.getProperty("java.io.tmpdir"),
>       namePrefix: String = "spark"): File = {
>     val dir = createDirectory(root, namePrefix)
>     ShutdownHookManager.registerShutdownDeleteDir(dir)
>     dir
>   }
> {code}
> In client mode, java.io.tmpdir is set to "/tmp", which also exists in HDFS and has permissions 1777. In cluster mode, java.io.tmpdir is set in the YARN AM to "$PWD/tmp", where PWD is "${yarn.nodemanager.local-dirs}/usercache/${user}/appcache/application_${appid}/container_${contid}".
> The problem is that Spark is using java.io.tmpdir, which is a path in the local filesystem, as a path in HDFS. When that path is "/tmp", which happens to exist in HDFS, no problem arises, but that is just by coincidence.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org