You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Tao Xiao <xi...@gmail.com> on 2014/09/03 04:28:23 UTC

What is the appropriate privileges needed for writting files into checkpoint directory?

I tried to run  KafkaWordCount in a Spark standalone cluster.  In this
application, the checkpoint directory was set as follows :

    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
    val ssc =  new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")


After submitting my application into the cluster, I could see the correct
counting results on the console, but the running application kept
complaining the following:

14/09/03 10:01:22 WARN TaskSetManager: Loss was due to
java.io.FileNotFoundException
java.io.FileNotFoundException:
/usr/games/SparkStreaming/checkpoint/a03505c8-0183-4bc0-b674-bf0e16767564/rdd-96/.part-00000-attempt-171
(Permission denied)
  at java.io.FileOutputStream.open(Native Method)
  at java.io.FileOutputStream.<init>(FileOutputStream.java:194)
  at
org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:206)
  at
org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:202)
  at
org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:265)
  at
org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:252)
  at
org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:384)
  at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:443)
  at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:849)
  at
org.apache.spark.rdd.CheckpointRDD$.writeToFile(CheckpointRDD.scala:103)
  at
org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:96)
  at
org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:96)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
  at org.apache.spark.scheduler.Task.run(Task.scala:53)
  at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
  at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
  at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:396)
  at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
  at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
  at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
  at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
  at java.lang.Thread.run(Thread.java:662)


On the node where I submitted the applicaition, the checkpoint directory(
/usr/games/SparkStreaming/checkpoint) was created and some files was
created there, but there existed no such directory on other nodes of the
Spark cluster.

I guess that was because processes on other nodes of the cluster didn't
have appropriate privileges to create the checkpoint directory. So I
created that directory on each node manually and changed its mode to 777,
which means any user can write to that directory. But the SparkStreaming
application still kept throwing that exception.

So what is the real reason?  Thanks.

Re: What is the appropriate privileges needed for writting files into checkpoint directory?

Posted by Tao Xiao <xi...@gmail.com>.
I found the answer. Here the file system of the checkpoint should be a
fault-tolerant file system like HDFS, so we should set it to a HDFS path.
It is not a local file system path.


2014-09-03 10:28 GMT+08:00 Tao Xiao <xi...@gmail.com>:

> I tried to run  KafkaWordCount in a Spark standalone cluster.  In this
> application, the checkpoint directory was set as follows :
>
>     val sparkConf = new SparkConf().setAppName("KafkaWordCount")
>     val ssc =  new StreamingContext(sparkConf, Seconds(2))
>     ssc.checkpoint("checkpoint")
>
>
> After submitting my application into the cluster, I could see the correct
> counting results on the console, but the running application kept
> complaining the following:
>
> 14/09/03 10:01:22 WARN TaskSetManager: Loss was due to
> java.io.FileNotFoundException
> java.io.FileNotFoundException:
> /usr/games/SparkStreaming/checkpoint/a03505c8-0183-4bc0-b674-bf0e16767564/rdd-96/.part-00000-attempt-171
> (Permission denied)
>   at java.io.FileOutputStream.open(Native Method)
>   at java.io.FileOutputStream.<init>(FileOutputStream.java:194)
>   at
> org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:206)
>   at
> org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:202)
>   at
> org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:265)
>   at
> org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:252)
>   at
> org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:384)
>   at
> org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:443)
>   at
> org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
>   at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
>   at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
>   at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:849)
>   at
> org.apache.spark.rdd.CheckpointRDD$.writeToFile(CheckpointRDD.scala:103)
>   at
> org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:96)
>   at
> org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:96)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
>   at org.apache.spark.scheduler.Task.run(Task.scala:53)
>   at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
>   at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>   at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:396)
>   at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
>   at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>   at java.lang.Thread.run(Thread.java:662)
>
>
> On the node where I submitted the applicaition, the checkpoint directory(
> /usr/games/SparkStreaming/checkpoint) was created and some files was
> created there, but there existed no such directory on other nodes of the
> Spark cluster.
>
> I guess that was because processes on other nodes of the cluster didn't
> have appropriate privileges to create the checkpoint directory. So I
> created that directory on each node manually and changed its mode to 777,
> which means any user can write to that directory. But the SparkStreaming
> application still kept throwing that exception.
>
> So what is the real reason?  Thanks.
>
>
>
>