You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Matthew Schauer <ma...@ibm.com> on 2017/02/20 18:14:06 UTC

Output Committers for S3

I'm using Spark 1.5.2 and trying to append a data frame to partitioned
Parquet directory in S3.  It is known that the default
`ParquetOutputCommitter` performs poorly in S3 because move is implemented
as copy/delete, but the `DirectParquetOutputCommitter` is not safe to use
for append operations in case of failure.  I'm not very familiar with the
intricacies of job/task committing/aborting, but I've written a rough
replacement output committer that seems to work.  It writes the results
directly to their final locations and uses the write UUID to determine which
files to remove in the case of a job/task abort.  It seems to be a workable
concept in the simple tests that I've tried.  However, I can't make Spark
use this alternate output committer because the changes in SPARK-8578
categorically prohibit any custom output committer from being used, even if
it's safe for appending.  I have two questions: 1) Does anyone more familiar
with output committing have any feedback on my proposed "safe" append
strategy, and 2) is there any way to circumvent the restriction on append
committers without editing and recompiling Spark?  Discussion of solutions
in Spark 2.1 is also welcome. 



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Output-Committers-for-S3-tp21033.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: Output Committers for S3

Posted by Steve Loughran <st...@hortonworks.com>.
On 20 Feb 2017, at 18:14, Matthew Schauer <ma...@ibm.com>> wrote:

I'm using Spark 1.5.2 and trying to append a data frame to partitioned
Parquet directory in S3.  It is known that the default
`ParquetOutputCommitter` performs poorly in S3 because move is implemented
as copy/delete, but the `DirectParquetOutputCommitter` is not safe to use
for append operations in case of failure.  I'm not very familiar with the
intricacies of job/task committing/aborting, but I've written a rough
replacement output committer that seems to work.  It writes the results
directly to their final locations and uses the write UUID to determine which
files to remove in the case of a job/task abort.  It seems to be a workable
concept in the simple tests that I've tried.  However, I can't make Spark
use this alternate output committer because the changes in SPARK-8578
categorically prohibit any custom output committer from being used, even if
it's safe for appending.  I have two questions: 1) Does anyone more familiar
with output committing have any feedback on my proposed "safe" append
strategy, and 2) is there any way to circumvent the restriction on append
committers without editing and recompiling Spark?  Discussion of solutions
in Spark 2.1 is also welcome.




Matthew, as part of the S3guard committer I'm doing in the Hadoop codebase (which requires a consistent object store implemented natively or via a dynamo db database), I'm modifying FileOutputFormat to take alternate committers underneath.

Algorithm
https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer.md

Code:

https://github.com/steveloughran/hadoop/tree/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit


Modified FOF: https://github.com/steveloughran/hadoop/tree/s3guard/HADOOP-13786-committer/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output

Current status: getting the low level tests at the MR layer working. Spark committer exists to the point of compiling, but not yet tested. If you do want to get involved; the JIRA is: https://issues.apache.org/jira/browse/HADOOP-13786



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Output-Committers-for-S3-tp21033.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com<http://Nabble.com>.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org<ma...@spark.apache.org>




Re: Output Committers for S3

Posted by Steve Loughran <st...@hortonworks.com>.
> On 20 Jun 2017, at 07:49, sririshindra <sr...@gmail.com> wrote:
> 
> Is there anything similar to s3 connector for Google cloud storage?
> Since Google cloud Storage is also an object store rather than a file
> system, I imagine the same problem that the s3 connector is trying to solve
> arises with google cloud storage as well.
> 
> Thanks,
> rishi
> 

That's google's problem for now. 

S3 has some specific issues

1. there's no rename. The FileSystem.rename() command is mocked by LIST, COPY & DELETE, so takes O(data). This is the slow bit people complain about.
2. it has list inconsistency, so that LIST may actually miss the data. This is bit people should be worrying about.

The now deleted DirectOutputCommitter didn't use rename, so it avoided both issues, it just didn't handle failures or speculation. In the absence of failures, all the data did end up in the right place, whereas list inconsistency in rename() means that you may have unobserved data loss. That's the big problem.

Azure WASB has fast atomic rename, so doesn't have the specific S3 problem.

The work I'm doing in Hadoop for committers (HADOOP-13786) is designed to make it possible to put in different committers under the FIleOutputFormat data writer, without the things above worrying; if the other blobstores need a new commit algorithm, it will be more straightforward. And the tests I'm doing will be mostly written to be retargeted from the outset.

Finally, quick talk from last week about filesystems, posix, object stores and NVM, how the commit problems get pulled both ways. If your RAM persists, you'd better be doing atomic record updates and making sure that when you write back something you want persisted, it'd better not be cached by the CPU

https://www.youtube.com/watch?v=UOE2m_XUr3U&feature=youtu.be&list=PLq-odUc2x7i-9Nijx-WfoRMoAfHC9XzTt

-Steve


---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: Output Committers for S3

Posted by sririshindra <sr...@gmail.com>.
Is there anything similar to s3 connector for Google cloud storage?
Since Google cloud Storage is also an object store rather than a file
system, I imagine the same problem that the s3 connector is trying to solve
arises with google cloud storage as well.

Thanks,
rishi



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Output-Committers-for-S3-tp21033p21803.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: Output Committers for S3

Posted by Steve Loughran <st...@hortonworks.com>.
On 19 Jun 2017, at 16:55, Ryan Blue <rb...@netflix.com.INVALID>> wrote:

I agree, the problem is that Spark is trying to be safe and avoid the direct committer. We also modify Spark to avoid its logic. We added a property that causes Spark to always use the output committer if the destination is in S3.

I've changed Hadoop's FileOutputFormat to take a factory defining committers: any object store gets to write their own, etc. The FileOutputCommitter is now  just one example, a rename()-based algorithm (two, really).


Our committers are also slightly different and will get an AmazonS3 client from the destination file system using reflection. That way, it's always configured with the right credentials. The solution to set the credentials provider is another good one, thanks for sharing that. I think in the S3A version, the client is accessed by the committer using a package-private accessor.

pretty much: it extends the helper class used in the output streams to get at the whole set of low level ops, while keeping the client hidden. And we have a special inconsistent client for testing now too.

https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java

Auth setup is from 1+ of: per-bucket config, fs.s3a base config, env vars, IAM metadata -which apparently gets throttled if you try hard enough.


rb

On Sat, Jun 17, 2017 at 10:04 AM, sririshindra <sr...@gmail.com>> wrote:
Hi,

as @Venkata krishnan pointed out spark does not allow DFOC when append mode
is enabled.

in the following class in spark, there is a small check

org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol


    if (isAppend) {
      // If we are appending data to an existing dir, we will only use the
output committer
      // associated with the file output format since it is not safe to use
a custom
      // committer for appending. For example, in S3, direct parquet output
committer may
      // leave partial data in the destination dir when the appending job
fails.
      // See SPARK-8578 for more details.


There's a fair amount parquet-specific bits of the spark output codechain; I've been avoiding it while I get CSV & ORC working so I'll be better placed to insert stuff.

As it stands, the new hadoop committer plugin mech is working with the imported/migrated staging committer of Ryan, even when I turn on list inconsistency, which I consider a success. I do want more tests now: currently I'm picking up some of the subclasses of QueryTest & making them generic for any FileSystem rather than the local FS, then running with the new committers.

Things work, but I need to think of more ways to really stress the commit protocol in ways where failures can be observed.


However, the reasoning mentioned in the above comments is probably (maybe
ryan or steve can confirm this assumption) not applicable to the Netflix
commiter uploaded by Ryan blue. Because Ryan's commiter uses multipart
upload. So either the whole file is live or nothing is. partial data will
not be available for read. Whatever partial data that might have been
uploaded to s3 by a failed job will be removed after 1 day (I think this the
default in ryan's code. This can be modified using the following config
(fs.s3a.multipart.purge.age -- 86400))

That's pretty much it. the files don't materialize to the final close, so you can write them to their destination, as long as you remember what you have outstanding and either commit or abort it.  Commits of >1 file isn't atomic, but the time to commit is minimal, and indeed, the Hadoop MR protocol isn't quite as atomic as you think there.




So I simply changed the code to
     if (true) {

and rebuilt spark from scratch. everything is working well for me in my
initial tests.


There is one more problem I wanted to mention. For some reason, I am getting
an authentication issue while using ryan's code. I made the following change
inside ryan's code.

I changed the findClinet method in S3MultiPartOutputCommiter.java (Ryan's
repo) to the following

  protected Object findClient(Path path, Configuration conf) {
      System.out.println("findinClinet in S3MultipartOutPutCommiter");
      //AWSCredentials
      //AmazonS3Client cli = new AmazonS3Client(new
ProfileCredentialsProvider("/home/user/.aws/credentials", "default"));
      AmazonS3Client cli = new AmazonS3Client(new
com.amazonaws.auth.EnvironmentVariableCredentialsProvider()); //new
AmazonS3Client();
      System.out.println(cli);
      return cli;
    //return new AmazonS3Client(new
ProfileCredentialsProvider("/home/user/.aws/credentials", "default"));
  }


We just have to set the s3 credentials in the ~/.bashrc file.



Take a look at S3AUtils.createAWSCredentialProviderSet to see what goes on there; know that it's been/is undergoing some changes so you can't expect stability.




Re: Output Committers for S3

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
I agree, the problem is that Spark is trying to be safe and avoid the
direct committer. We also modify Spark to avoid its logic. We added a
property that causes Spark to always use the output committer if the
destination is in S3.

Our committers are also slightly different and will get an AmazonS3 client
from the destination file system using reflection. That way, it's always
configured with the right credentials. The solution to set the credentials
provider is another good one, thanks for sharing that. I think in the S3A
version, the client is accessed by the committer using a package-private
accessor.

rb

On Sat, Jun 17, 2017 at 10:04 AM, sririshindra <sr...@gmail.com>
wrote:

> Hi,
>
> as @Venkata krishnan pointed out spark does not allow DFOC when append mode
> is enabled.
>
> in the following class in spark, there is a small check
>
> org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtoc
> ol
>
>
>     if (isAppend) {
>       // If we are appending data to an existing dir, we will only use the
> output committer
>       // associated with the file output format since it is not safe to use
> a custom
>       // committer for appending. For example, in S3, direct parquet output
> committer may
>       // leave partial data in the destination dir when the appending job
> fails.
>       // See SPARK-8578 for more details.
>
> However, the reasoning mentioned in the above comments is probably (maybe
> ryan or steve can confirm this assumption) not applicable to the Netflix
> commiter uploaded by Ryan blue. Because Ryan's commiter uses multipart
> upload. So either the whole file is live or nothing is. partial data will
> not be available for read. Whatever partial data that might have been
> uploaded to s3 by a failed job will be removed after 1 day (I think this
> the
> default in ryan's code. This can be modified using the following config
> (fs.s3a.multipart.purge.age -- 86400))
>
>
> So I simply changed the code to
>      if (true) {
>
> and rebuilt spark from scratch. everything is working well for me in my
> initial tests.
>
>
> There is one more problem I wanted to mention. For some reason, I am
> getting
> an authentication issue while using ryan's code. I made the following
> change
> inside ryan's code.
>
> I changed the findClinet method in S3MultiPartOutputCommiter.java (Ryan's
> repo) to the following
>
>   protected Object findClient(Path path, Configuration conf) {
>       System.out.println("findinClinet in S3MultipartOutPutCommiter");
>       //AWSCredentials
>       //AmazonS3Client cli = new AmazonS3Client(new
> ProfileCredentialsProvider("/home/user/.aws/credentials", "default"));
>       AmazonS3Client cli = new AmazonS3Client(new
> com.amazonaws.auth.EnvironmentVariableCredentialsProvider()); //new
> AmazonS3Client();
>       System.out.println(cli);
>       return cli;
>     //return new AmazonS3Client(new
> ProfileCredentialsProvider("/home/user/.aws/credentials", "default"));
>   }
>
>
> We just have to set the s3 credentials in the ~/.bashrc file.
>
> Please add anything that I might have missed.
>
> Also please look at ryan's talk at spark summit a few days ago
> ( Imporoving Apache spark with s3 by ryan blue
> <https://www.youtube.com/watch?v=BgHrff5yAQo>  )
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/Output-Committers-
> for-S3-tp21033p21779.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>
>


-- 
Ryan Blue
Software Engineer
Netflix

Re: Output Committers for S3

Posted by sririshindra <sr...@gmail.com>.
Hi,

as @Venkata krishnan pointed out spark does not allow DFOC when append mode
is enabled.

in the following class in spark, there is a small check

org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol


    if (isAppend) {
      // If we are appending data to an existing dir, we will only use the
output committer
      // associated with the file output format since it is not safe to use
a custom
      // committer for appending. For example, in S3, direct parquet output
committer may
      // leave partial data in the destination dir when the appending job
fails.
      // See SPARK-8578 for more details.

However, the reasoning mentioned in the above comments is probably (maybe
ryan or steve can confirm this assumption) not applicable to the Netflix
commiter uploaded by Ryan blue. Because Ryan's commiter uses multipart
upload. So either the whole file is live or nothing is. partial data will
not be available for read. Whatever partial data that might have been
uploaded to s3 by a failed job will be removed after 1 day (I think this the
default in ryan's code. This can be modified using the following config
(fs.s3a.multipart.purge.age -- 86400))


So I simply changed the code to 
     if (true) {

and rebuilt spark from scratch. everything is working well for me in my
initial tests.


There is one more problem I wanted to mention. For some reason, I am getting
an authentication issue while using ryan's code. I made the following change
inside ryan's code.

I changed the findClinet method in S3MultiPartOutputCommiter.java (Ryan's
repo) to the following

  protected Object findClient(Path path, Configuration conf) {
      System.out.println("findinClinet in S3MultipartOutPutCommiter");
      //AWSCredentials
      //AmazonS3Client cli = new AmazonS3Client(new
ProfileCredentialsProvider("/home/user/.aws/credentials", "default"));
      AmazonS3Client cli = new AmazonS3Client(new
com.amazonaws.auth.EnvironmentVariableCredentialsProvider()); //new
AmazonS3Client();
      System.out.println(cli);
      return cli;
    //return new AmazonS3Client(new
ProfileCredentialsProvider("/home/user/.aws/credentials", "default"));
  }


We just have to set the s3 credentials in the ~/.bashrc file.

Please add anything that I might have missed.

Also please look at ryan's talk at spark summit a few days ago
( Imporoving Apache spark with s3 by ryan blue
<https://www.youtube.com/watch?v=BgHrff5yAQo>  )
















--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Output-Committers-for-S3-tp21033p21779.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: Output Committers for S3

Posted by Venkatakrishnan Sowrirajan <vs...@asu.edu>.
I think Spark in itself doesn't allow DFOC when append mode is enabled. So
DFOC works only for Insert overwrite queries/overwrite mode not for append
mode.

Regards
Venkata krishnan

On Fri, Jun 16, 2017 at 9:35 PM, sririshindra <sr...@gmail.com>
wrote:

> Hi Ryan and Steve,
>
> Thanks very much for your reply.
>
> I was finally able to get Ryan's repo work for me by changing the output
> committer to FileOutputFormat instead of ParquetOutputCommitter in spark as
> Steve suggested.
>
> However, It is not working for append mode while saving the data frame.
>
>     val hf =
> spark.read.parquet("/home/user/softwares/spark-2.1.0-
> bin-hadoop2.7/examples/src/main/resources/users.parquet")
>
>     hf.persist(StorageLevel.DISK_ONLY)
>     hf.show()
>     hf.write
>       .partitionBy("name").mode("append")
>       .save(S3Location + "data" + ".parquet")
>
>
>
> The above code is successfully saving the parquet file when I am running it
> for the first time. But When I rerun the code again the new parquet files
> are not getting added to s3
>
> I have put a print statement in the constructors of
> PartitionedOutputCommiter in Ryan's repo and realized that the partitioned
> output committer is not even getting called the second time I ran the code.
> It is being called only for the first time. Is there anything that I can do
> to make spark call the PartitionedOutputCommiter even when the file already
> exists in s3?
>
>
>
>
>
>
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/Output-Committers-
> for-S3-tp21033p21776.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>
>

Re: Output Committers for S3

Posted by sririshindra <sr...@gmail.com>.
Hi Ryan and Steve,

Thanks very much for your reply.

I was finally able to get Ryan's repo work for me by changing the output
committer to FileOutputFormat instead of ParquetOutputCommitter in spark as
Steve suggested. 

However, It is not working for append mode while saving the data frame. 

    val hf =
spark.read.parquet("/home/user/softwares/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/users.parquet")

    hf.persist(StorageLevel.DISK_ONLY)
    hf.show()
    hf.write
      .partitionBy("name").mode("append")
      .save(S3Location + "data" + ".parquet")



The above code is successfully saving the parquet file when I am running it
for the first time. But When I rerun the code again the new parquet files
are not getting added to s3

I have put a print statement in the constructors of
PartitionedOutputCommiter in Ryan's repo and realized that the partitioned
output committer is not even getting called the second time I ran the code.
It is being called only for the first time. Is there anything that I can do
to make spark call the PartitionedOutputCommiter even when the file already
exists in s3?






--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Output-Committers-for-S3-tp21033p21776.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: Output Committers for S3

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
Steve is right that the S3 committer isn't a ParquetOutputCommitter. I
think that the reason that check exists is to make sure Parquet writes
_metadata summary files to an output directory. But, I think the **summary
files are a bad idea**, so we bypass that logic and use the committer
directly if the output path is in S3.

Why are summary files a bad idea? Because they can easily get out of sync
with the real data files and cause correctness problems. There are two
reasons for using them, both optimizations to avoid reading all of the file
footers in a table. First, _metadata can be used to plan a job because it
has the row group offsets. But planning no longer reads all of the footers;
it uses regular Hadoop file splits instead. The second use is to get the
schema of a table more quickly, but this should be handled by a metastore
that tracks the latest schema. A metastore provides even faster access, a
more reliable schema, and can support schema evolution.

Even with the _metadata files, Spark has had to parallelize building a
table from Parquet files in S3 without a metastore, so I think this
requirement should be removed. In the mean time, you can probably just
build a version of the S3 committer that inherits from
ParquetOutputCommitter instead of FileOutputCommitter. That's probably the
easiest solution. Be sure you run the tests!

rb

On Tue, Mar 28, 2017 at 3:17 AM, Steve Loughran <st...@hortonworks.com>
wrote:

>
> > On 28 Mar 2017, at 05:20, sririshindra <sr...@gmail.com> wrote:
> >
> > Hi
> >
> > I have a job which saves a dataframe as parquet file to s3.
> >
> > The built a jar using your repository https://github.com/rdblue/
> s3committer.
> >
> > I added the following config in the to the Spark Session
> > config("spark.hadoop.spark.sql.parquet.output.committer.class",
> > "com.netflix.bdp.s3.S3PartitionedOutputCommitter")
> >
> >
> > I submitted the job to spark 2.0.2 as follows
> >
> > ./bin/spark-submit --master local[*] --driver-memory 4G --jars
> > /home/rishi/Downloads/hadoop-aws-2.7.3.jar,/home/rishi/
> Downloads/aws-java-sdk-1.7.4.jar,/home/user/Documents/
> s3committer/build/libs/s3committer-0.5.5.jar
> > --driver-library-path
> > /home/user/Downloads/hadoop-aws-2.7.3.jar,/home/user/
> Downloads/aws-java-sdk-1.7.4.jar,/home/user/Documents/
> s3committer/build/libs/s3committer-0.5.5.jar
> > --class main.streaming.scala.backupdatatos3.backupdatatos3Processorr
> > --packages
> > joda-time:joda-time:2.9.7,org.mongodb.mongo-hadoop:mongo-
> hadoop-core:1.5.2,org.mongodb:mongo-java-driver:3.3.0
> > /home/user/projects/backupjob/target/Backup-1.0-SNAPSHOT.jar
>
>
> The miracle of OSS is that you have the right to fix things, the curse,
> only you get to fix your problems on a timescale that suits
>
>
> >
> >
> > I am gettig the following runtime exception.
> > xception in thread "main" java.lang.RuntimeException:
> > java.lang.RuntimeException: class
> > com.netflix.bdp.s3.S3PartitionedOutputCommitter not
> > org.apache.parquet.hadoop.ParquetOutputCommitter
> >        at
> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
> >        at
> > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.
> prepareWrite(ParquetFileFormat.scala:81)
> >        at
>
>
> here:
>     val committerClass =
>       conf.getClass(
>         SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
>         classOf[ParquetOutputCommitter],
>         classOf[ParquetOutputCommitter])
>
>
> At a guess, Ryan's committer isn't a ParquetOutputCommitter.
>
> workarounds
>
> 1. Subclass ParquetOutputCommitter
> 2. Modify ParquetFileFormat to only look for a classOf[FileOutputFormat];
> the ParquetOutputCommitter doesn't do anything other than optionally add a
> metadata file. As that is a performance killer on S3, you should have
> disabled that option already.
>
> #2 is easiest., time to rebuild spark being the only overhead.
>
> HADOOP-13786  is sneaking in Ryan's work underneath things, but even there
> the ParquetFileFormat is going to have trouble. Which is odd, given my
> integration tests did appear to be writing things. I'll take that as a sign
> of coverage problems
>
>
>
>
> > org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(
> FileFormatWriter.scala:108)
> >        at
> > org.apache.spark.sql.execution.datasources.
> InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationComm
> and.scala:101)
> >        at
> > org.apache.spark.sql.execution.command.ExecutedCommandExec.
> sideEffectResult$lzycompute(commands.scala:58)
> >        at
> > org.apache.spark.sql.execution.command.ExecutedCommandExec.
> sideEffectResult(commands.scala:56)
> >        at
> > org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(
> commands.scala:74)
> >        at
> > org.apache.spark.sql.execution.SparkPlan$$anonfun$
> execute$1.apply(SparkPlan.scala:114)
> >        at
> > org.apache.spark.sql.execution.SparkPlan$$anonfun$
> execute$1.apply(SparkPlan.scala:114)
> >        at
> > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(
> SparkPlan.scala:135)
> >        at
> > org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
> >        at
> > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:
> 132)
> >        at
> > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
> >        at
> > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(
> QueryExecution.scala:87)
> >        at
> > org.apache.spark.sql.execution.QueryExecution.
> toRdd(QueryExecution.scala:87)
> >        at
> > org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.
> scala:492)
> >        at
> > org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
> >        at
> > org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
> >        at
> > main.streaming.scala.backupdatatos3.backupdatatos3Processorr$.
> main(backupdatatos3Processorr.scala:229)
> >        at
> > main.streaming.scala.backupdatatos3.backupdatatos3Processorr.main(
> backupdatatos3Processorr.scala)
> >        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >        at
> > sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> >        at
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> >        at java.lang.reflect.Method.invoke(Method.java:498)
> >        at
> > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
> deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
> >        at
> > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
> >        at
> > org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
> >        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.
> scala:126)
> >        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> > Caused by: java.lang.RuntimeException: class
> > com.netflix.bdp.s3.S3PartitionedOutputCommitter not
> > org.apache.parquet.hadoop.ParquetOutputCommitter
> >        at
> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2221)
> >        ... 28 more
> >
> > can you please point out my mistake.
> >
> > If possible can you give a working example of saving a dataframe as a
> > parquet file in s3.
> >
> >
> >
> >
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/Output-Committers-
> for-S3-tp21033p21246.html
> > Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
> >
> > ---------------------------------------------------------------------
> > To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
> >
> >
>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>
>


-- 
Ryan Blue
Software Engineer
Netflix

Re: Output Committers for S3

Posted by Steve Loughran <st...@hortonworks.com>.
> On 28 Mar 2017, at 05:20, sririshindra <sr...@gmail.com> wrote:
> 
> Hi 
> 
> I have a job which saves a dataframe as parquet file to s3.
> 
> The built a jar using your repository https://github.com/rdblue/s3committer.
> 
> I added the following config in the to the Spark Session 
> config("spark.hadoop.spark.sql.parquet.output.committer.class",
> "com.netflix.bdp.s3.S3PartitionedOutputCommitter")
> 
> 
> I submitted the job to spark 2.0.2 as follows 
> 
> ./bin/spark-submit --master local[*] --driver-memory 4G --jars
> /home/rishi/Downloads/hadoop-aws-2.7.3.jar,/home/rishi/Downloads/aws-java-sdk-1.7.4.jar,/home/user/Documents/s3committer/build/libs/s3committer-0.5.5.jar
> --driver-library-path
> /home/user/Downloads/hadoop-aws-2.7.3.jar,/home/user/Downloads/aws-java-sdk-1.7.4.jar,/home/user/Documents/s3committer/build/libs/s3committer-0.5.5.jar 
> --class main.streaming.scala.backupdatatos3.backupdatatos3Processorr
> --packages
> joda-time:joda-time:2.9.7,org.mongodb.mongo-hadoop:mongo-hadoop-core:1.5.2,org.mongodb:mongo-java-driver:3.3.0
> /home/user/projects/backupjob/target/Backup-1.0-SNAPSHOT.jar


The miracle of OSS is that you have the right to fix things, the curse, only you get to fix your problems on a timescale that suits


> 
> 
> I am gettig the following runtime exception.
> xception in thread "main" java.lang.RuntimeException:
> java.lang.RuntimeException: class
> com.netflix.bdp.s3.S3PartitionedOutputCommitter not
> org.apache.parquet.hadoop.ParquetOutputCommitter
>        at
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
>        at
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.prepareWrite(ParquetFileFormat.scala:81)
>        at


here:
    val committerClass =
      conf.getClass(
        SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
        classOf[ParquetOutputCommitter],
        classOf[ParquetOutputCommitter])


At a guess, Ryan's committer isn't a ParquetOutputCommitter. 

workarounds

1. Subclass ParquetOutputCommitter
2. Modify ParquetFileFormat to only look for a classOf[FileOutputFormat]; the ParquetOutputCommitter doesn't do anything other than optionally add a metadata file. As that is a performance killer on S3, you should have disabled that option already.

#2 is easiest., time to rebuild spark being the only overhead.

HADOOP-13786  is sneaking in Ryan's work underneath things, but even there the ParquetFileFormat is going to have trouble. Which is odd, given my integration tests did appear to be writing things. I'll take that as a sign of coverage problems




> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:108)
>        at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101)
>        at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
>        at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
>        at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
>        at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
>        at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
>        at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
>        at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>        at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
>        at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
>        at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
>        at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
>        at
> org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492)
>        at
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
>        at
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
>        at
> main.streaming.scala.backupdatatos3.backupdatatos3Processorr$.main(backupdatatos3Processorr.scala:229)
>        at
> main.streaming.scala.backupdatatos3.backupdatatos3Processorr.main(backupdatatos3Processorr.scala)
>        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>        at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>        at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>        at java.lang.reflect.Method.invoke(Method.java:498)
>        at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
>        at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
>        at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
>        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
>        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.lang.RuntimeException: class
> com.netflix.bdp.s3.S3PartitionedOutputCommitter not
> org.apache.parquet.hadoop.ParquetOutputCommitter
>        at
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2221)
>        ... 28 more
> 
> can you please point out my mistake.
> 
> If possible can you give a working example of saving a dataframe as a
> parquet file in s3.
> 
> 
> 
> 
> 
> 
> 
> --
> View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Output-Committers-for-S3-tp21033p21246.html
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
> 
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
> 
> 


---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: Output Committers for S3

Posted by sririshindra <sr...@gmail.com>.
Hi 

I have a job which saves a dataframe as parquet file to s3.

The built a jar using your repository https://github.com/rdblue/s3committer.

I added the following config in the to the Spark Session 
config("spark.hadoop.spark.sql.parquet.output.committer.class",
"com.netflix.bdp.s3.S3PartitionedOutputCommitter")


I submitted the job to spark 2.0.2 as follows 

./bin/spark-submit --master local[*] --driver-memory 4G --jars
/home/rishi/Downloads/hadoop-aws-2.7.3.jar,/home/rishi/Downloads/aws-java-sdk-1.7.4.jar,/home/user/Documents/s3committer/build/libs/s3committer-0.5.5.jar
--driver-library-path
/home/user/Downloads/hadoop-aws-2.7.3.jar,/home/user/Downloads/aws-java-sdk-1.7.4.jar,/home/user/Documents/s3committer/build/libs/s3committer-0.5.5.jar 
--class main.streaming.scala.backupdatatos3.backupdatatos3Processorr
--packages
joda-time:joda-time:2.9.7,org.mongodb.mongo-hadoop:mongo-hadoop-core:1.5.2,org.mongodb:mongo-java-driver:3.3.0
/home/user/projects/backupjob/target/Backup-1.0-SNAPSHOT.jar


I am gettig the following runtime exception.
xception in thread "main" java.lang.RuntimeException:
java.lang.RuntimeException: class
com.netflix.bdp.s3.S3PartitionedOutputCommitter not
org.apache.parquet.hadoop.ParquetOutputCommitter
        at
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
        at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.prepareWrite(ParquetFileFormat.scala:81)
        at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:108)
        at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101)
        at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
        at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
        at
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
        at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
        at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
        at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
        at
org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492)
        at
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
        at
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
        at
main.streaming.scala.backupdatatos3.backupdatatos3Processorr$.main(backupdatatos3Processorr.scala:229)
        at
main.streaming.scala.backupdatatos3.backupdatatos3Processorr.main(backupdatatos3Processorr.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
        at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
        at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.RuntimeException: class
com.netflix.bdp.s3.S3PartitionedOutputCommitter not
org.apache.parquet.hadoop.ParquetOutputCommitter
        at
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2221)
        ... 28 more

can you please point out my mistake.

If possible can you give a working example of saving a dataframe as a
parquet file in s3.







--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Output-Committers-for-S3-tp21033p21246.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: Output Committers for S3

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
On Tue, Feb 21, 2017 at 6:15 AM, Steve Loughran <st...@hortonworks.com> wrote:
> On 21 Feb 2017, at 01:00, Ryan Blue <rb...@netflix.com.INVALID> wrote:
> > You'd have to encode the task ID in the output file name to identify files to roll back in the event you need to revert a task, but if you have partitioned output, you have to do a lot of directory listing to find all the files that need to be removed. That, or you could risk duplicate data by not rolling back tasks.
>
> Bear in mind that recursive directory listing isn't so expensive once you have the O(1)-ish listFiles(files, recursive) operation of HADOOP-13208.

This doesn't work so well for us because we have an additional layer
to provide atomic commits by swapping locations in the metastore. That
causes areas in the prefix listing where we have old data mixed with
new. You make a good point that it won't be quite so expensive to list
an entire repository in the future, but I'd still rather avoid it.

> . . . I'm choreographing the task and committer via data serialized to S3 itself. On a task failure that will allow us to roll back all completely written files, without the need for any task-job communications. I'm still thinking about having an optional+async scan for pending commits to the dest path, to identify problems and keep bills down.

What do you mean by avoiding task-job communications? Don't you still
need to communicate to the job commit what files the tasks produced?
It sounds like you're using S3 for that instead of HDFS with the
FileOutputCommitter like this does. Long-term, I'd probably agree that
we shouldn't rely on HDFS, but it seems like a good way to get
something working right now.

> > The flaw in this approach is that you can still get partial writes if the driver fails while running the job committer, but it covers the other cases.
>
> There's a bit of that in both the FileOutputFormat and indeed, in HadoopMapReduceCommitProtocol. It's just a small window, especially if you do those final PUTs in parallel

Yeah, it is unavoidable right now with the current table format.

> > We're working on getting users moved over to the new committers, so now seems like a good time to get a copy out to the community. Please let me know what you think.
>
> I'll have a look at your code, see how it compares to mine. I'm able to take advantage of the fact that we can tune the S3A FS, for example, by modifying the block output stream to *not* commit its work in the final close()
>
> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
>
> This means that provided the output writer doesn't attempt to read the file it's just written, we can do a write straight to the final destination

We're also considering an implementation that does exactly what we use
the local FS for today, but keeps data in memory. When you close the
stream, you'd get the same PendingUpload object we build from the file
system. This is something we put off to get something out more
quickly.

Thanks for taking a look!

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: Output Committers for S3

Posted by Matthew Schauer <ma...@ibm.com>.
Well, the issue I'm trying to solve is slow writing due to S3's
implementation of move as copy/delete.  It seems like your S3 committers and
S3Guard both ameliorate that somewhat by parallelizing the copy.  I assume
there's no better way to solve this issue without sacrificing safety.  Even
if there were, I couldn't use it, because I'm stuck on Spark 1.5 and there
doesn't seem to be a way to force the use of a given output committer.



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Output-Committers-for-S3-tp21033p21049.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: Output Committers for S3

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
Does S3Guard help with this? I thought it was like S3mper and could
help detect eventual consistency problems, but wouldn't help with the
committer problem.

rb

On Tue, Feb 21, 2017 at 12:39 PM, Matthew Schauer
<ma...@ibm.com> wrote:
> Thanks for the repo, Ryan!  I had heard that Netflix had a committer that
> used the local filesystem as a temporary store, but I wasn't able to find
> that anywhere until now.  I implemented something similar that writes to
> HDFS and then copies to S3, but it doesn't use the multipart upload API, so
> I'm sure yours will be faster.  I think this is the best thing until S3Guard
> comes out.
>
> As far as my UUID-tracking approach goes, I was under the impression that a
> given task would write the same set of files on each attempt.  Thus, if the
> task fails, either the whole job is aborted and the files are removed, or
> the task is retried and the files are overwritten.  On the other and, I can
> see how having partially-written data visible to readers immediately could
> cause problems, and that is a good reason to avoid my approach.
>
> Steve -- that design document was a very enlightening read.  I will be
> interested in following and possibly contributing to S3Guard in the future.
>
>
>
> --
> View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Output-Committers-for-S3-tp21033p21041.html
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>



-- 
Ryan Blue
Software Engineer
Netflix

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: Output Committers for S3

Posted by Matthew Schauer <ma...@ibm.com>.
Thanks for the repo, Ryan!  I had heard that Netflix had a committer that
used the local filesystem as a temporary store, but I wasn't able to find
that anywhere until now.  I implemented something similar that writes to
HDFS and then copies to S3, but it doesn't use the multipart upload API, so
I'm sure yours will be faster.  I think this is the best thing until S3Guard
comes out.

As far as my UUID-tracking approach goes, I was under the impression that a
given task would write the same set of files on each attempt.  Thus, if the
task fails, either the whole job is aborted and the files are removed, or
the task is retried and the files are overwritten.  On the other and, I can
see how having partially-written data visible to readers immediately could
cause problems, and that is a good reason to avoid my approach.

Steve -- that design document was a very enlightening read.  I will be
interested in following and possibly contributing to S3Guard in the future.



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Output-Committers-for-S3-tp21033p21041.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: Output Committers for S3

Posted by Steve Loughran <st...@hortonworks.com>.
On 21 Feb 2017, at 14:15, Steve Loughran <st...@hortonworks.com>> wrote:

What your patch has made me realise is that I could also do a delayed-commit copy by reading in a file, doing a multipart put to its final destination, and again, postponing the final commit. this is something which tasks could do in their commit rather than a normal COPY+DELETE  rename, passing the final pending commit information to the job committer. This'd make the rename() slower as it will read and write the data again, rather than the 6-10 MB/s of in-S3 copies, but as these happen in-task-commit, rather than in-job-commit, they slow down the overall job less. That could be used for the absolute path commit phase.


though as you can do specify a copy-range in a multipart put, you could do a parallelized copies of parts of a file in the s3 filestore itself and leave the result pending, reducing copy time in seconds to ~ filesize / (parts * 6e6), the same as you get from a parallel copy in s3 today. That is: same time as a rename, merely not visible until the final job chooses to materialize the object

Re: Output Committers for S3

Posted by Steve Loughran <st...@hortonworks.com>.
On 21 Feb 2017, at 01:00, Ryan Blue <rb...@netflix.com.INVALID>> wrote:

We just wrote a couple new committers for S3 that we're beginning to roll out to our Spark users. I've uploaded a repo with it if you'd like to take a look:

  https://github.com/rdblue/s3committer

The main problem with the UUID approach is that data is live as soon as the S3 upload completes. That means that readers can get partial results while a job is running that may not be eventually committed (since you will remove the UUID later). You may also have a problem with partitioned task outputs.


You'd have to encode the task ID in the output file name to identify files to roll back in the event you need to revert a task, but if you have partitioned output, you have to do a lot of directory listing to find all the files that need to be removed. That, or you could risk duplicate data by not rolling back tasks.


Bear in mind that recursive directory listing isn't so expensive once you have the O(1)-ish listFiles(files, recursive) operation of HADOOP-13208.



The approach we took is to use the multi-part upload API to stage data from tasks without issuing the final call to complete the upload and make the data live in S3. That way, we get distributed uploads without any visible data until the job committer runs. The job committer reads all of the pending uploads and commits them. If the job has failed, then it can roll back the known uploads by aborting them instead, with the data never visible to readers.


Yes, that's what I've been doing too. I'm choreographing the task and committer via data serialized to S3 itself. On a task failure that will allow us to roll back all completely written files, without the need for any task-job communications. I'm still thinking about having an optional+async scan for pending commits to the dest path, to identify problems and keep bills down.



The flaw in this approach is that you can still get partial writes if the driver fails while running the job committer, but it covers the other cases.

There's a bit of that in both the FileOutputFormat and indeed, in HadoopMapReduceCommitProtocol. It's just a small window, especially if you do those final PUTs in parallel


We're working on getting users moved over to the new committers, so now seems like a good time to get a copy out to the community. Please let me know what you think.

I'll have a look at your code, see how it compares to mine. I'm able to take advantage of the fact that we can tune the S3A FS, for example, by modifying the block output stream to *not* commit its work in the final close()

https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java

This means that provided the output writer doesn't attempt to read the file it's just written, we can do a write straight to the final destination


What your patch has made me realise is that I could also do a delayed-commit copy by reading in a file, doing a multipart put to its final destination, and again, postponing the final commit. this is something which tasks could do in their commit rather than a normal COPY+DELETE  rename, passing the final pending commit information to the job committer. This'd make the rename() slower as it will read and write the data again, rather than the 6-10 MB/s of in-S3 copies, but as these happen in-task-commit, rather than in-job-commit, they slow down the overall job less. That could be used for the absolute path commit phase.


-Steve

rb

On Mon, Feb 20, 2017 at 10:14 AM, Matthew Schauer <ma...@ibm.com>> wrote:
I'm using Spark 1.5.2 and trying to append a data frame to partitioned
Parquet directory in S3.  It is known that the default
`ParquetOutputCommitter` performs poorly in S3 because move is implemented
as copy/delete, but the `DirectParquetOutputCommitter` is not safe to use
for append operations in case of failure.  I'm not very familiar with the
intricacies of job/task committing/aborting, but I've written a rough
replacement output committer that seems to work.  It writes the results
directly to their final locations and uses the write UUID to determine which
files to remove in the case of a job/task abort.  It seems to be a workable
concept in the simple tests that I've tried.  However, I can't make Spark
use this alternate output committer because the changes in SPARK-8578
categorically prohibit any custom output committer from being used, even if
it's safe for appending.  I have two questions: 1) Does anyone more familiar
with output committing have any feedback on my proposed "safe" append
strategy, and 2) is there any way to circumvent the restriction on append
committers without editing and recompiling Spark?  Discussion of solutions
in Spark 2.1 is also welcome.



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Output-Committers-for-S3-tp21033.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com<http://Nabble.com>.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org<ma...@spark.apache.org>




--
Ryan Blue
Software Engineer
Netflix


Re: Output Committers for S3

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
We just wrote a couple new committers for S3 that we're beginning to roll
out to our Spark users. I've uploaded a repo with it if you'd like to take
a look:

  https://github.com/rdblue/s3committer

The main problem with the UUID approach is that data is live as soon as the
S3 upload completes. That means that readers can get partial results while
a job is running that may not be eventually committed (since you will
remove the UUID later). You may also have a problem with partitioned task
outputs. You'd have to encode the task ID in the output file name to
identify files to roll back in the event you need to revert a task, but if
you have partitioned output, you have to do a lot of directory listing to
find all the files that need to be removed. That, or you could risk
duplicate data by not rolling back tasks.

The approach we took is to use the multi-part upload API to stage data from
tasks without issuing the final call to complete the upload and make the
data live in S3. That way, we get distributed uploads without any visible
data until the job committer runs. The job committer reads all of the
pending uploads and commits them. If the job has failed, then it can roll
back the known uploads by aborting them instead, with the data never
visible to readers.

The flaw in this approach is that you can still get partial writes if the
driver fails while running the job committer, but it covers the other cases.

We're working on getting users moved over to the new committers, so now
seems like a good time to get a copy out to the community. Please let me
know what you think.

rb

On Mon, Feb 20, 2017 at 10:14 AM, Matthew Schauer <ma...@ibm.com>
wrote:

> I'm using Spark 1.5.2 and trying to append a data frame to partitioned
> Parquet directory in S3.  It is known that the default
> `ParquetOutputCommitter` performs poorly in S3 because move is implemented
> as copy/delete, but the `DirectParquetOutputCommitter` is not safe to use
> for append operations in case of failure.  I'm not very familiar with the
> intricacies of job/task committing/aborting, but I've written a rough
> replacement output committer that seems to work.  It writes the results
> directly to their final locations and uses the write UUID to determine
> which
> files to remove in the case of a job/task abort.  It seems to be a workable
> concept in the simple tests that I've tried.  However, I can't make Spark
> use this alternate output committer because the changes in SPARK-8578
> categorically prohibit any custom output committer from being used, even if
> it's safe for appending.  I have two questions: 1) Does anyone more
> familiar
> with output committing have any feedback on my proposed "safe" append
> strategy, and 2) is there any way to circumvent the restriction on append
> committers without editing and recompiling Spark?  Discussion of solutions
> in Spark 2.1 is also welcome.
>
>
>
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/Output-Committers-
> for-S3-tp21033.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>
>


-- 
Ryan Blue
Software Engineer
Netflix