You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Felix Cheung <fe...@hotmail.com> on 2017/01/07 08:29:01 UTC

Spark checkpointing

Thanks Steve.

As you have pointed out, we have seen some issues related to cloud storage as "file system". I'm looking at checkpointing recently. What do you think would be the improvement we could make for "non local" (== reliable?) checkpointing?


________________________________
From: Steve Loughran <st...@hortonworks.com>
Sent: Friday, January 6, 2017 9:57:05 AM
To: Ankur Srivastava
Cc: Felix Cheung; user@spark.apache.org
Subject: Re: Spark GraphFrame ConnectedComponents


On 5 Jan 2017, at 21:10, Ankur Srivastava <an...@gmail.com>> wrote:

Yes I did try it out and it choses the local file system as my checkpoint location starts with s3n://

I am not sure how can I make it load the S3FileSystem.

set fs.default.name to s3n://whatever , or, in spark context, spark.hadoop.fs.default.name

However

1. you should really use s3a, if you have the hadoop 2.7 JARs on your classpath.
2. neither s3n or s3a are real filesystems, and certain assumptions that checkpointing code tends to make "renames being O(1) atomic calls" do not hold. It may be that checkpointing to s3 isn't as robust as you'd like




On Thu, Jan 5, 2017 at 12:12 PM, Felix Cheung <fe...@hotmail.com>> wrote:
Right, I'd agree, it seems to be only with delete.

Could you by chance run just the delete to see if it fails

FileSystem.get(sc.hadoopConfiguration)
.delete(new Path(somepath), true)
________________________________
From: Ankur Srivastava <an...@gmail.com>>
Sent: Thursday, January 5, 2017 10:05:03 AM
To: Felix Cheung
Cc: user@spark.apache.org<ma...@spark.apache.org>

Subject: Re: Spark GraphFrame ConnectedComponents

Yes it works to read the vertices and edges data from S3 location and is also able to write the checkpoint files to S3. It only fails when deleting the data and that is because it tries to use the default file system. I tried looking up how to update the default file system but could not find anything in that regard.

Thanks
Ankur

On Thu, Jan 5, 2017 at 12:55 AM, Felix Cheung <fe...@hotmail.com>> wrote:
From the stack it looks to be an error from the explicit call to hadoop.fs.FileSystem.

Is the URL scheme for s3n registered?
Does it work when you try to read from s3 from Spark?

_____________________________
From: Ankur Srivastava <an...@gmail.com>>
Sent: Wednesday, January 4, 2017 9:23 PM
Subject: Re: Spark GraphFrame ConnectedComponents
To: Felix Cheung <fe...@hotmail.com>>
Cc: <us...@spark.apache.org>>



This is the exact trace from the driver logs

Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: s3n://<checkpoint-folder>/8ac233e4-10f9-4eb3-aa53-df6d9d7ea7be/connected-components-c1dbc2b0/3, expected: file:///
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:645)
at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:80)
at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:529)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:747)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:524)
at org.apache.hadoop.fs.ChecksumFileSystem.delete(ChecksumFileSystem.java:534)
at org.graphframes.lib.ConnectedComponents$.org$graphframes$lib$ConnectedComponents$$run(ConnectedComponents.scala:340)
at org.graphframes.lib.ConnectedComponents.run(ConnectedComponents.scala:139)
at GraphTest.main(GraphTest.java:31) ----------- Application Class
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

And I am running spark v 1.6.2 and graphframes v 0.3.0-spark1.6-s_2.10

Thanks
Ankur

On Wed, Jan 4, 2017 at 8:03 PM, Ankur Srivastava <an...@gmail.com>> wrote:
Hi

I am rerunning the pipeline to generate the exact trace, I have below part of trace from last run:

Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: s3n://<folder-path>, expected: file:///
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:642)
at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:69)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:516)
at org.apache.hadoop.fs.ChecksumFileSystem.delete(ChecksumFileSystem.java:528)

Also I think the error is happening in this part of the code "ConnectedComponents.scala:339" I am referring the code @https://github.com/graphframes/graphframes/blob/master/src/main/scala/org/graphframes/lib/ConnectedComponents.scala

      if (shouldCheckpoint && (iteration % checkpointInterval == 0)) {
        // TODO: remove this after DataFrame.checkpoint is implemented
        val out = s"${checkpointDir.get}/$iteration"
        ee.write.parquet(out)
        // may hit S3 eventually consistent issue
        ee = sqlContext.read.parquet(out)

        // remove previous checkpoint
        if (iteration > checkpointInterval) {
          FileSystem.get(sc.hadoopConfiguration)
            .delete(new Path(s"${checkpointDir.get}/${iteration - checkpointInterval}"), true)
        }

        System.gc() // hint Spark to clean shuffle directories
      }


Thanks
Ankur

On Wed, Jan 4, 2017 at 5:19 PM, Felix Cheung <fe...@hotmail.com>> wrote:
Do you have more of the exception stack?


________________________________
From: Ankur Srivastava <an...@gmail.com>>
Sent: Wednesday, January 4, 2017 4:40:02 PM
To: user@spark.apache.org<ma...@spark.apache.org>
Subject: Spark GraphFrame ConnectedComponents

Hi,

I am trying to use the ConnectedComponent algorithm of GraphFrames but by default it needs a checkpoint directory. As I am running my spark cluster with S3 as the DFS and do not have access to HDFS file system I tried using a s3 directory as checkpoint directory but I run into below exception:


Exception in thread "main"java.lang.IllegalArgumentException: Wrong FS: s3n://<folder-path>, expected: file:///

at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:642)

at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:69)

If I set checkpoint interval to -1 to avoid checkpointing the driver just hangs after 3 or 4 iterations.

Is there some way I can set the default FileSystem to S3 for Spark or any other option?

Thanks
Ankur









Re: Spark checkpointing

Posted by Steve Loughran <st...@hortonworks.com>.
On 7 Jan 2017, at 08:29, Felix Cheung <fe...@hotmail.com>> wrote:

Thanks Steve.

As you have pointed out, we have seen some issues related to cloud storage as "file system". I'm looking at checkpointing recently. What do you think would be the improvement we could make for "non local" (== reliable?) checkpointing?


right now? I wouldn't checkpoint to S3. Azure WASB works, S3: not reliably. Checkpoint to HDFS, use distCp to back up S3



________________________________
From: Steve Loughran <st...@hortonworks.com>>
Sent: Friday, January 6, 2017 9:57:05 AM
To: Ankur Srivastava
Cc: Felix Cheung; user@spark.apache.org<ma...@spark.apache.org>
Subject: Re: Spark GraphFrame ConnectedComponents


On 5 Jan 2017, at 21:10, Ankur Srivastava <an...@gmail.com>> wrote:

Yes I did try it out and it choses the local file system as my checkpoint location starts with s3n://

I am not sure how can I make it load the S3FileSystem.

set fs.default.name to s3n://whatever , or, in spark context, spark.hadoop.fs.default.name

However

1. you should really use s3a, if you have the hadoop 2.7 JARs on your classpath.
2. neither s3n or s3a are real filesystems, and certain assumptions that checkpointing code tends to make "renames being O(1) atomic calls" do not hold. It may be that checkpointing to s3 isn't as robust as you'd like




On Thu, Jan 5, 2017 at 12:12 PM, Felix Cheung <fe...@hotmail.com>> wrote:
Right, I'd agree, it seems to be only with delete.

Could you by chance run just the delete to see if it fails

FileSystem.get(sc.hadoopConfiguration)
.delete(new Path(somepath), true)
________________________________
From: Ankur Srivastava <an...@gmail.com>>
Sent: Thursday, January 5, 2017 10:05:03 AM
To: Felix Cheung
Cc: user@spark.apache.org<ma...@spark.apache.org>

Subject: Re: Spark GraphFrame ConnectedComponents

Yes it works to read the vertices and edges data from S3 location and is also able to write the checkpoint files to S3. It only fails when deleting the data and that is because it tries to use the default file system. I tried looking up how to update the default file system but could not find anything in that regard.

Thanks
Ankur

On Thu, Jan 5, 2017 at 12:55 AM, Felix Cheung <fe...@hotmail.com>> wrote:
From the stack it looks to be an error from the explicit call to hadoop.fs.FileSystem.

Is the URL scheme for s3n registered?
Does it work when you try to read from s3 from Spark?

_____________________________
From: Ankur Srivastava <an...@gmail.com>>
Sent: Wednesday, January 4, 2017 9:23 PM
Subject: Re: Spark GraphFrame ConnectedComponents
To: Felix Cheung <fe...@hotmail.com>>
Cc: <us...@spark.apache.org>>



This is the exact trace from the driver logs

Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: s3n://<checkpoint-folder>/8ac233e4-10f9-4eb3-aa53-df6d9d7ea7be/connected-components-c1dbc2b0/3, expected: file:///
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:645)
at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:80)
at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:529)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:747)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:524)
at org.apache.hadoop.fs.ChecksumFileSystem.delete(ChecksumFileSystem.java:534)
at org.graphframes.lib.ConnectedComponents$.org$graphframes$lib$ConnectedComponents$$run(ConnectedComponents.scala:340)
at org.graphframes.lib.ConnectedComponents.run(ConnectedComponents.scala:139)
at GraphTest.main(GraphTest.java:31) ----------- Application Class
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

And I am running spark v 1.6.2 and graphframes v 0.3.0-spark1.6-s_2.10

Thanks
Ankur

On Wed, Jan 4, 2017 at 8:03 PM, Ankur Srivastava <an...@gmail.com>> wrote:
Hi

I am rerunning the pipeline to generate the exact trace, I have below part of trace from last run:

Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: s3n://<folder-path>, expected: file:///
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:642)
at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:69)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:516)
at org.apache.hadoop.fs.ChecksumFileSystem.delete(ChecksumFileSystem.java:528)

Also I think the error is happening in this part of the code "ConnectedComponents.scala:339" I am referring the code @https://github.com/graphframes/graphframes/blob/master/src/main/scala/org/graphframes/lib/ConnectedComponents.scala

      if (shouldCheckpoint && (iteration % checkpointInterval == 0)) {
        // TODO: remove this after DataFrame.checkpoint is implemented
        val out = s"${checkpointDir.get}/$iteration"
        ee.write.parquet(out)
        // may hit S3 eventually consistent issue
        ee = sqlContext.read.parquet(out)

        // remove previous checkpoint
        if (iteration > checkpointInterval) {
          FileSystem.get(sc.hadoopConfiguration)
            .delete(new Path(s"${checkpointDir.get}/${iteration - checkpointInterval}"), true)
        }

        System.gc() // hint Spark to clean shuffle directories
      }


Thanks
Ankur

On Wed, Jan 4, 2017 at 5:19 PM, Felix Cheung <fe...@hotmail.com>> wrote:
Do you have more of the exception stack?


________________________________
From: Ankur Srivastava <an...@gmail.com>>
Sent: Wednesday, January 4, 2017 4:40:02 PM
To: user@spark.apache.org<ma...@spark.apache.org>
Subject: Spark GraphFrame ConnectedComponents

Hi,

I am trying to use the ConnectedComponent algorithm of GraphFrames but by default it needs a checkpoint directory. As I am running my spark cluster with S3 as the DFS and do not have access to HDFS file system I tried using a s3 directory as checkpoint directory but I run into below exception:


Exception in thread "main"java.lang.IllegalArgumentException: Wrong FS: s3n://<folder-path>, expected: file:///

at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:642)

at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:69)

If I set checkpoint interval to -1 to avoid checkpointing the driver just hangs after 3 or 4 iterations.

Is there some way I can set the default FileSystem to S3 for Spark or any other option?

Thanks
Ankur