You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "igor.berman" <ig...@gmail.com> on 2016/02/22 23:18:39 UTC

DirectFileOutputCommiter

Hi,
Wanted to understand if anybody uses DirectFileOutputCommitter or alikes
especially when working with s3? 
I know that there is one impl in spark distro for parquet format, but not
for files -  why?

Imho, it can bring huge performance boost. 
Using default FileOutputCommiter with s3 has big overhead at commit stage
when all parts are copied one-by-one to destination dir from _temporary,
which is bottleneck when number of partitions is high.

Also, wanted to know if there are some problems when using
DirectFileOutputCommitter? 
If writing one partition directly will fail in the middle is spark will
notice this and will fail job(say after all retries)?

thanks in advance




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: DirectFileOutputCommiter

Posted by Igor Berman <ig...@gmail.com>.
Alexander,
implementation you've attaches supports both modes configured by property "
mapred.output.direct." + fs.getClass().getSimpleName()
as soon as you see _temporary dir probably the mode is off i.e. the default
impl is working and you experiencing some other problem.

On 26 February 2016 at 10:57, Alexander Pivovarov <ap...@gmail.com>
wrote:

> Amazon uses the following impl
> https://gist.github.com/apivovarov/bb215f08318318570567
> But for some reason Spark show error at the end of the job
>
> 16/02/26 08:16:54 INFO scheduler.DAGScheduler: ResultStage 0
> (saveAsTextFile at <console>:28) finished in 14.305 s
> 16/02/26 08:16:54 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose
> tasks have all completed, from pool
> 16/02/26 08:16:54 INFO scheduler.DAGScheduler: Job 0 finished:
> saveAsTextFile at <console>:28, took 14.467271 s
> java.io.FileNotFoundException: File
> s3n://my-backup/test/test1/_temporary/0 does not exist.
> at
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:564)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:269)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:309)
> at
> org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
> at
> org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:112)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1214)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>
>
> Another implementation works fine
> https://gist.github.com/aarondav/c513916e72101bbe14ec
>
> On Thu, Feb 25, 2016 at 10:24 PM, Takeshi Yamamuro <li...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Great work!
>> What is the concrete performance gain of the committer on s3?
>> I'd like to know.
>>
>> I think there is no direct committer for files because these kinds of
>> committer has risks
>> to loss data (See: SPARK-10063).
>> Until this resolved, ISTM files cannot support direct commits.
>>
>> thanks,
>>
>>
>>
>> On Fri, Feb 26, 2016 at 8:39 AM, Teng Qiu <te...@gmail.com> wrote:
>>
>>> yes, should be this one
>>> https://gist.github.com/aarondav/c513916e72101bbe14ec
>>>
>>> then need to set it in spark-defaults.conf :
>>> https://github.com/zalando/spark/commit/3473f3f1ef27830813c1e0b3686e96a55f49269c#diff-f7a46208be9e80252614369be6617d65R13
>>>
>>> Am Freitag, 26. Februar 2016 schrieb Yin Yang :
>>> > The header of DirectOutputCommitter.scala says Databricks.
>>> > Did you get it from Databricks ?
>>> > On Thu, Feb 25, 2016 at 3:01 PM, Teng Qiu <te...@gmail.com> wrote:
>>> >>
>>> >> interesting in this topic as well, why the DirectFileOutputCommitter
>>> not included?
>>> >> we added it in our fork,
>>> under core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala
>>> >> moreover, this DirectFileOutputCommitter is not working for the
>>> insert operations in HiveContext, since the Committer is called by hive
>>> (means uses dependencies in hive package)
>>> >> we made some hack to fix this, you can take a look:
>>> >>
>>> https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando
>>> >>
>>> >> may bring some ideas to other spark contributors to find a better way
>>> to use s3.
>>> >>
>>> >> 2016-02-22 23:18 GMT+01:00 igor.berman <ig...@gmail.com>:
>>> >>>
>>> >>> Hi,
>>> >>> Wanted to understand if anybody uses DirectFileOutputCommitter or
>>> alikes
>>> >>> especially when working with s3?
>>> >>> I know that there is one impl in spark distro for parquet format,
>>> but not
>>> >>> for files -  why?
>>> >>>
>>> >>> Imho, it can bring huge performance boost.
>>> >>> Using default FileOutputCommiter with s3 has big overhead at commit
>>> stage
>>> >>> when all parts are copied one-by-one to destination dir from
>>> _temporary,
>>> >>> which is bottleneck when number of partitions is high.
>>> >>>
>>> >>> Also, wanted to know if there are some problems when using
>>> >>> DirectFileOutputCommitter?
>>> >>> If writing one partition directly will fail in the middle is spark
>>> will
>>> >>> notice this and will fail job(say after all retries)?
>>> >>>
>>> >>> thanks in advance
>>> >>>
>>> >>>
>>> >>>
>>> >>>
>>> >>> --
>>> >>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
>>> >>> Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com.
>>> >>>
>>> >>> ---------------------------------------------------------------------
>>> >>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> >>> For additional commands, e-mail: user-help@spark.apache.org
>>> >>>
>>> >>
>>> >
>>> >
>>>
>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>

Re: DirectFileOutputCommiter

Posted by Alexander Pivovarov <ap...@gmail.com>.
Amazon uses the following impl
https://gist.github.com/apivovarov/bb215f08318318570567
But for some reason Spark show error at the end of the job

16/02/26 08:16:54 INFO scheduler.DAGScheduler: ResultStage 0
(saveAsTextFile at <console>:28) finished in 14.305 s
16/02/26 08:16:54 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose
tasks have all completed, from pool
16/02/26 08:16:54 INFO scheduler.DAGScheduler: Job 0 finished:
saveAsTextFile at <console>:28, took 14.467271 s
java.io.FileNotFoundException: File s3n://my-backup/test/test1/_temporary/0
does not exist.
at
org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:564)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:269)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:309)
at
org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
at org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:112)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1214)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)


Another implementation works fine
https://gist.github.com/aarondav/c513916e72101bbe14ec

On Thu, Feb 25, 2016 at 10:24 PM, Takeshi Yamamuro <li...@gmail.com>
wrote:

> Hi,
>
> Great work!
> What is the concrete performance gain of the committer on s3?
> I'd like to know.
>
> I think there is no direct committer for files because these kinds of
> committer has risks
> to loss data (See: SPARK-10063).
> Until this resolved, ISTM files cannot support direct commits.
>
> thanks,
>
>
>
> On Fri, Feb 26, 2016 at 8:39 AM, Teng Qiu <te...@gmail.com> wrote:
>
>> yes, should be this one
>> https://gist.github.com/aarondav/c513916e72101bbe14ec
>>
>> then need to set it in spark-defaults.conf :
>> https://github.com/zalando/spark/commit/3473f3f1ef27830813c1e0b3686e96a55f49269c#diff-f7a46208be9e80252614369be6617d65R13
>>
>> Am Freitag, 26. Februar 2016 schrieb Yin Yang :
>> > The header of DirectOutputCommitter.scala says Databricks.
>> > Did you get it from Databricks ?
>> > On Thu, Feb 25, 2016 at 3:01 PM, Teng Qiu <te...@gmail.com> wrote:
>> >>
>> >> interesting in this topic as well, why the DirectFileOutputCommitter
>> not included?
>> >> we added it in our fork,
>> under core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala
>> >> moreover, this DirectFileOutputCommitter is not working for the insert
>> operations in HiveContext, since the Committer is called by hive (means
>> uses dependencies in hive package)
>> >> we made some hack to fix this, you can take a look:
>> >>
>> https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando
>> >>
>> >> may bring some ideas to other spark contributors to find a better way
>> to use s3.
>> >>
>> >> 2016-02-22 23:18 GMT+01:00 igor.berman <ig...@gmail.com>:
>> >>>
>> >>> Hi,
>> >>> Wanted to understand if anybody uses DirectFileOutputCommitter or
>> alikes
>> >>> especially when working with s3?
>> >>> I know that there is one impl in spark distro for parquet format, but
>> not
>> >>> for files -  why?
>> >>>
>> >>> Imho, it can bring huge performance boost.
>> >>> Using default FileOutputCommiter with s3 has big overhead at commit
>> stage
>> >>> when all parts are copied one-by-one to destination dir from
>> _temporary,
>> >>> which is bottleneck when number of partitions is high.
>> >>>
>> >>> Also, wanted to know if there are some problems when using
>> >>> DirectFileOutputCommitter?
>> >>> If writing one partition directly will fail in the middle is spark
>> will
>> >>> notice this and will fail job(say after all retries)?
>> >>>
>> >>> thanks in advance
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
>> >>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> >>>
>> >>> ---------------------------------------------------------------------
>> >>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >>> For additional commands, e-mail: user-help@spark.apache.org
>> >>>
>> >>
>> >
>> >
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>

Re: DirectFileOutputCommiter

Posted by Takeshi Yamamuro <li...@gmail.com>.
Hi,

I think the essential culprit is that these committers are not idempotent;
retry attempts will fail.
See codes below for details;
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala#L130

On Sat, Feb 27, 2016 at 7:38 PM, Igor Berman <ig...@gmail.com> wrote:

> Hi Reynold,
> thanks for the response
> Yes, speculation mode needs some coordination.
> Regarding job failure :
> correct me if I wrong - if one of jobs fails - client code will be sort of
> "notified" by exception or something similar, so the client can decide to
> re-submit action(job), i.e. it won't be "silent" failure.
>
>
> On 26 February 2016 at 11:50, Reynold Xin <rx...@databricks.com> wrote:
>
>> It could lose data in speculation mode, or if any job fails.
>>
>> On Fri, Feb 26, 2016 at 3:45 AM, Igor Berman <ig...@gmail.com>
>> wrote:
>>
>>> Takeshi, do you know the reason why they wanted to remove this commiter
>>> in SPARK-10063?
>>> the jira has no info inside
>>> as far as I understand the direct committer can't be used when either of
>>> two is true
>>> 1. speculation mode
>>> 2. append mode(ie. not creating new version of data but appending to
>>> existing data)
>>>
>>> On 26 February 2016 at 08:24, Takeshi Yamamuro <li...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Great work!
>>>> What is the concrete performance gain of the committer on s3?
>>>> I'd like to know.
>>>>
>>>> I think there is no direct committer for files because these kinds of
>>>> committer has risks
>>>> to loss data (See: SPARK-10063).
>>>> Until this resolved, ISTM files cannot support direct commits.
>>>>
>>>> thanks,
>>>>
>>>>
>>>>
>>>> On Fri, Feb 26, 2016 at 8:39 AM, Teng Qiu <te...@gmail.com> wrote:
>>>>
>>>>> yes, should be this one
>>>>> https://gist.github.com/aarondav/c513916e72101bbe14ec
>>>>>
>>>>> then need to set it in spark-defaults.conf :
>>>>> https://github.com/zalando/spark/commit/3473f3f1ef27830813c1e0b3686e96a55f49269c#diff-f7a46208be9e80252614369be6617d65R13
>>>>>
>>>>> Am Freitag, 26. Februar 2016 schrieb Yin Yang :
>>>>> > The header of DirectOutputCommitter.scala says Databricks.
>>>>> > Did you get it from Databricks ?
>>>>> > On Thu, Feb 25, 2016 at 3:01 PM, Teng Qiu <te...@gmail.com> wrote:
>>>>> >>
>>>>> >> interesting in this topic as well, why
>>>>> the DirectFileOutputCommitter not included?
>>>>> >> we added it in our fork,
>>>>> under core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala
>>>>> >> moreover, this DirectFileOutputCommitter is not working for the
>>>>> insert operations in HiveContext, since the Committer is called by hive
>>>>> (means uses dependencies in hive package)
>>>>> >> we made some hack to fix this, you can take a look:
>>>>> >>
>>>>> https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando
>>>>> >>
>>>>> >> may bring some ideas to other spark contributors to find a better
>>>>> way to use s3.
>>>>> >>
>>>>> >> 2016-02-22 23:18 GMT+01:00 igor.berman <ig...@gmail.com>:
>>>>> >>>
>>>>> >>> Hi,
>>>>> >>> Wanted to understand if anybody uses DirectFileOutputCommitter or
>>>>> alikes
>>>>> >>> especially when working with s3?
>>>>> >>> I know that there is one impl in spark distro for parquet format,
>>>>> but not
>>>>> >>> for files -  why?
>>>>> >>>
>>>>> >>> Imho, it can bring huge performance boost.
>>>>> >>> Using default FileOutputCommiter with s3 has big overhead at
>>>>> commit stage
>>>>> >>> when all parts are copied one-by-one to destination dir from
>>>>> _temporary,
>>>>> >>> which is bottleneck when number of partitions is high.
>>>>> >>>
>>>>> >>> Also, wanted to know if there are some problems when using
>>>>> >>> DirectFileOutputCommitter?
>>>>> >>> If writing one partition directly will fail in the middle is spark
>>>>> will
>>>>> >>> notice this and will fail job(say after all retries)?
>>>>> >>>
>>>>> >>> thanks in advance
>>>>> >>>
>>>>> >>>
>>>>> >>>
>>>>> >>>
>>>>> >>> --
>>>>> >>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
>>>>> >>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>> >>>
>>>>> >>>
>>>>> ---------------------------------------------------------------------
>>>>> >>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>> >>> For additional commands, e-mail: user-help@spark.apache.org
>>>>> >>>
>>>>> >>
>>>>> >
>>>>> >
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> ---
>>>> Takeshi Yamamuro
>>>>
>>>
>>>
>>
>


-- 
---
Takeshi Yamamuro

Re: DirectFileOutputCommiter

Posted by Igor Berman <ig...@gmail.com>.
Hi Reynold,
thanks for the response
Yes, speculation mode needs some coordination.
Regarding job failure :
correct me if I wrong - if one of jobs fails - client code will be sort of
"notified" by exception or something similar, so the client can decide to
re-submit action(job), i.e. it won't be "silent" failure.


On 26 February 2016 at 11:50, Reynold Xin <rx...@databricks.com> wrote:

> It could lose data in speculation mode, or if any job fails.
>
> On Fri, Feb 26, 2016 at 3:45 AM, Igor Berman <ig...@gmail.com>
> wrote:
>
>> Takeshi, do you know the reason why they wanted to remove this commiter
>> in SPARK-10063?
>> the jira has no info inside
>> as far as I understand the direct committer can't be used when either of
>> two is true
>> 1. speculation mode
>> 2. append mode(ie. not creating new version of data but appending to
>> existing data)
>>
>> On 26 February 2016 at 08:24, Takeshi Yamamuro <li...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Great work!
>>> What is the concrete performance gain of the committer on s3?
>>> I'd like to know.
>>>
>>> I think there is no direct committer for files because these kinds of
>>> committer has risks
>>> to loss data (See: SPARK-10063).
>>> Until this resolved, ISTM files cannot support direct commits.
>>>
>>> thanks,
>>>
>>>
>>>
>>> On Fri, Feb 26, 2016 at 8:39 AM, Teng Qiu <te...@gmail.com> wrote:
>>>
>>>> yes, should be this one
>>>> https://gist.github.com/aarondav/c513916e72101bbe14ec
>>>>
>>>> then need to set it in spark-defaults.conf :
>>>> https://github.com/zalando/spark/commit/3473f3f1ef27830813c1e0b3686e96a55f49269c#diff-f7a46208be9e80252614369be6617d65R13
>>>>
>>>> Am Freitag, 26. Februar 2016 schrieb Yin Yang :
>>>> > The header of DirectOutputCommitter.scala says Databricks.
>>>> > Did you get it from Databricks ?
>>>> > On Thu, Feb 25, 2016 at 3:01 PM, Teng Qiu <te...@gmail.com> wrote:
>>>> >>
>>>> >> interesting in this topic as well, why the DirectFileOutputCommitter
>>>> not included?
>>>> >> we added it in our fork,
>>>> under core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala
>>>> >> moreover, this DirectFileOutputCommitter is not working for the
>>>> insert operations in HiveContext, since the Committer is called by hive
>>>> (means uses dependencies in hive package)
>>>> >> we made some hack to fix this, you can take a look:
>>>> >>
>>>> https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando
>>>> >>
>>>> >> may bring some ideas to other spark contributors to find a better
>>>> way to use s3.
>>>> >>
>>>> >> 2016-02-22 23:18 GMT+01:00 igor.berman <ig...@gmail.com>:
>>>> >>>
>>>> >>> Hi,
>>>> >>> Wanted to understand if anybody uses DirectFileOutputCommitter or
>>>> alikes
>>>> >>> especially when working with s3?
>>>> >>> I know that there is one impl in spark distro for parquet format,
>>>> but not
>>>> >>> for files -  why?
>>>> >>>
>>>> >>> Imho, it can bring huge performance boost.
>>>> >>> Using default FileOutputCommiter with s3 has big overhead at commit
>>>> stage
>>>> >>> when all parts are copied one-by-one to destination dir from
>>>> _temporary,
>>>> >>> which is bottleneck when number of partitions is high.
>>>> >>>
>>>> >>> Also, wanted to know if there are some problems when using
>>>> >>> DirectFileOutputCommitter?
>>>> >>> If writing one partition directly will fail in the middle is spark
>>>> will
>>>> >>> notice this and will fail job(say after all retries)?
>>>> >>>
>>>> >>> thanks in advance
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> --
>>>> >>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
>>>> >>> Sent from the Apache Spark User List mailing list archive at
>>>> Nabble.com.
>>>> >>>
>>>> >>>
>>>> ---------------------------------------------------------------------
>>>> >>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> >>> For additional commands, e-mail: user-help@spark.apache.org
>>>> >>>
>>>> >>
>>>> >
>>>> >
>>>>
>>>
>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>>
>

Re: DirectFileOutputCommiter

Posted by Alexander Pivovarov <ap...@gmail.com>.
DirectOutputCommitter doc says:
The FileOutputCommitter is required for HDFS + speculation, which allows
only one writer at
 a time for a file (so two people racing to write the same file would not
work). However, S3
 supports multiple writers outputting to the same file, where visibility is
guaranteed to be
 atomic. This is a monotonic operation: all writers should be writing the
same data, so which
 one wins is immaterial.

aws impl is better because it uses DirectFileOutputCommitter only for
s3n:// files
https://gist.github.com/apivovarov/bb215f08318318570567

But for some reason it does not work for me.

On Fri, Feb 26, 2016 at 11:50 AM, Reynold Xin <rx...@databricks.com> wrote:

> It could lose data in speculation mode, or if any job fails.
>
> On Fri, Feb 26, 2016 at 3:45 AM, Igor Berman <ig...@gmail.com>
> wrote:
>
>> Takeshi, do you know the reason why they wanted to remove this commiter
>> in SPARK-10063?
>> the jira has no info inside
>> as far as I understand the direct committer can't be used when either of
>> two is true
>> 1. speculation mode
>> 2. append mode(ie. not creating new version of data but appending to
>> existing data)
>>
>> On 26 February 2016 at 08:24, Takeshi Yamamuro <li...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Great work!
>>> What is the concrete performance gain of the committer on s3?
>>> I'd like to know.
>>>
>>> I think there is no direct committer for files because these kinds of
>>> committer has risks
>>> to loss data (See: SPARK-10063).
>>> Until this resolved, ISTM files cannot support direct commits.
>>>
>>> thanks,
>>>
>>>
>>>
>>> On Fri, Feb 26, 2016 at 8:39 AM, Teng Qiu <te...@gmail.com> wrote:
>>>
>>>> yes, should be this one
>>>> https://gist.github.com/aarondav/c513916e72101bbe14ec
>>>>
>>>> then need to set it in spark-defaults.conf :
>>>> https://github.com/zalando/spark/commit/3473f3f1ef27830813c1e0b3686e96a55f49269c#diff-f7a46208be9e80252614369be6617d65R13
>>>>
>>>> Am Freitag, 26. Februar 2016 schrieb Yin Yang :
>>>> > The header of DirectOutputCommitter.scala says Databricks.
>>>> > Did you get it from Databricks ?
>>>> > On Thu, Feb 25, 2016 at 3:01 PM, Teng Qiu <te...@gmail.com> wrote:
>>>> >>
>>>> >> interesting in this topic as well, why the DirectFileOutputCommitter
>>>> not included?
>>>> >> we added it in our fork,
>>>> under core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala
>>>> >> moreover, this DirectFileOutputCommitter is not working for the
>>>> insert operations in HiveContext, since the Committer is called by hive
>>>> (means uses dependencies in hive package)
>>>> >> we made some hack to fix this, you can take a look:
>>>> >>
>>>> https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando
>>>> >>
>>>> >> may bring some ideas to other spark contributors to find a better
>>>> way to use s3.
>>>> >>
>>>> >> 2016-02-22 23:18 GMT+01:00 igor.berman <ig...@gmail.com>:
>>>> >>>
>>>> >>> Hi,
>>>> >>> Wanted to understand if anybody uses DirectFileOutputCommitter or
>>>> alikes
>>>> >>> especially when working with s3?
>>>> >>> I know that there is one impl in spark distro for parquet format,
>>>> but not
>>>> >>> for files -  why?
>>>> >>>
>>>> >>> Imho, it can bring huge performance boost.
>>>> >>> Using default FileOutputCommiter with s3 has big overhead at commit
>>>> stage
>>>> >>> when all parts are copied one-by-one to destination dir from
>>>> _temporary,
>>>> >>> which is bottleneck when number of partitions is high.
>>>> >>>
>>>> >>> Also, wanted to know if there are some problems when using
>>>> >>> DirectFileOutputCommitter?
>>>> >>> If writing one partition directly will fail in the middle is spark
>>>> will
>>>> >>> notice this and will fail job(say after all retries)?
>>>> >>>
>>>> >>> thanks in advance
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> --
>>>> >>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
>>>> >>> Sent from the Apache Spark User List mailing list archive at
>>>> Nabble.com.
>>>> >>>
>>>> >>>
>>>> ---------------------------------------------------------------------
>>>> >>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> >>> For additional commands, e-mail: user-help@spark.apache.org
>>>> >>>
>>>> >>
>>>> >
>>>> >
>>>>
>>>
>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>>
>

Re: DirectFileOutputCommiter

Posted by Reynold Xin <rx...@databricks.com>.
It could lose data in speculation mode, or if any job fails.

On Fri, Feb 26, 2016 at 3:45 AM, Igor Berman <ig...@gmail.com> wrote:

> Takeshi, do you know the reason why they wanted to remove this commiter in
> SPARK-10063?
> the jira has no info inside
> as far as I understand the direct committer can't be used when either of
> two is true
> 1. speculation mode
> 2. append mode(ie. not creating new version of data but appending to
> existing data)
>
> On 26 February 2016 at 08:24, Takeshi Yamamuro <li...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Great work!
>> What is the concrete performance gain of the committer on s3?
>> I'd like to know.
>>
>> I think there is no direct committer for files because these kinds of
>> committer has risks
>> to loss data (See: SPARK-10063).
>> Until this resolved, ISTM files cannot support direct commits.
>>
>> thanks,
>>
>>
>>
>> On Fri, Feb 26, 2016 at 8:39 AM, Teng Qiu <te...@gmail.com> wrote:
>>
>>> yes, should be this one
>>> https://gist.github.com/aarondav/c513916e72101bbe14ec
>>>
>>> then need to set it in spark-defaults.conf :
>>> https://github.com/zalando/spark/commit/3473f3f1ef27830813c1e0b3686e96a55f49269c#diff-f7a46208be9e80252614369be6617d65R13
>>>
>>> Am Freitag, 26. Februar 2016 schrieb Yin Yang :
>>> > The header of DirectOutputCommitter.scala says Databricks.
>>> > Did you get it from Databricks ?
>>> > On Thu, Feb 25, 2016 at 3:01 PM, Teng Qiu <te...@gmail.com> wrote:
>>> >>
>>> >> interesting in this topic as well, why the DirectFileOutputCommitter
>>> not included?
>>> >> we added it in our fork,
>>> under core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala
>>> >> moreover, this DirectFileOutputCommitter is not working for the
>>> insert operations in HiveContext, since the Committer is called by hive
>>> (means uses dependencies in hive package)
>>> >> we made some hack to fix this, you can take a look:
>>> >>
>>> https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando
>>> >>
>>> >> may bring some ideas to other spark contributors to find a better way
>>> to use s3.
>>> >>
>>> >> 2016-02-22 23:18 GMT+01:00 igor.berman <ig...@gmail.com>:
>>> >>>
>>> >>> Hi,
>>> >>> Wanted to understand if anybody uses DirectFileOutputCommitter or
>>> alikes
>>> >>> especially when working with s3?
>>> >>> I know that there is one impl in spark distro for parquet format,
>>> but not
>>> >>> for files -  why?
>>> >>>
>>> >>> Imho, it can bring huge performance boost.
>>> >>> Using default FileOutputCommiter with s3 has big overhead at commit
>>> stage
>>> >>> when all parts are copied one-by-one to destination dir from
>>> _temporary,
>>> >>> which is bottleneck when number of partitions is high.
>>> >>>
>>> >>> Also, wanted to know if there are some problems when using
>>> >>> DirectFileOutputCommitter?
>>> >>> If writing one partition directly will fail in the middle is spark
>>> will
>>> >>> notice this and will fail job(say after all retries)?
>>> >>>
>>> >>> thanks in advance
>>> >>>
>>> >>>
>>> >>>
>>> >>>
>>> >>> --
>>> >>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
>>> >>> Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com.
>>> >>>
>>> >>> ---------------------------------------------------------------------
>>> >>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> >>> For additional commands, e-mail: user-help@spark.apache.org
>>> >>>
>>> >>
>>> >
>>> >
>>>
>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>

Re: DirectFileOutputCommiter

Posted by Igor Berman <ig...@gmail.com>.
Takeshi, do you know the reason why they wanted to remove this commiter in
SPARK-10063?
the jira has no info inside
as far as I understand the direct committer can't be used when either of
two is true
1. speculation mode
2. append mode(ie. not creating new version of data but appending to
existing data)

On 26 February 2016 at 08:24, Takeshi Yamamuro <li...@gmail.com>
wrote:

> Hi,
>
> Great work!
> What is the concrete performance gain of the committer on s3?
> I'd like to know.
>
> I think there is no direct committer for files because these kinds of
> committer has risks
> to loss data (See: SPARK-10063).
> Until this resolved, ISTM files cannot support direct commits.
>
> thanks,
>
>
>
> On Fri, Feb 26, 2016 at 8:39 AM, Teng Qiu <te...@gmail.com> wrote:
>
>> yes, should be this one
>> https://gist.github.com/aarondav/c513916e72101bbe14ec
>>
>> then need to set it in spark-defaults.conf :
>> https://github.com/zalando/spark/commit/3473f3f1ef27830813c1e0b3686e96a55f49269c#diff-f7a46208be9e80252614369be6617d65R13
>>
>> Am Freitag, 26. Februar 2016 schrieb Yin Yang :
>> > The header of DirectOutputCommitter.scala says Databricks.
>> > Did you get it from Databricks ?
>> > On Thu, Feb 25, 2016 at 3:01 PM, Teng Qiu <te...@gmail.com> wrote:
>> >>
>> >> interesting in this topic as well, why the DirectFileOutputCommitter
>> not included?
>> >> we added it in our fork,
>> under core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala
>> >> moreover, this DirectFileOutputCommitter is not working for the insert
>> operations in HiveContext, since the Committer is called by hive (means
>> uses dependencies in hive package)
>> >> we made some hack to fix this, you can take a look:
>> >>
>> https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando
>> >>
>> >> may bring some ideas to other spark contributors to find a better way
>> to use s3.
>> >>
>> >> 2016-02-22 23:18 GMT+01:00 igor.berman <ig...@gmail.com>:
>> >>>
>> >>> Hi,
>> >>> Wanted to understand if anybody uses DirectFileOutputCommitter or
>> alikes
>> >>> especially when working with s3?
>> >>> I know that there is one impl in spark distro for parquet format, but
>> not
>> >>> for files -  why?
>> >>>
>> >>> Imho, it can bring huge performance boost.
>> >>> Using default FileOutputCommiter with s3 has big overhead at commit
>> stage
>> >>> when all parts are copied one-by-one to destination dir from
>> _temporary,
>> >>> which is bottleneck when number of partitions is high.
>> >>>
>> >>> Also, wanted to know if there are some problems when using
>> >>> DirectFileOutputCommitter?
>> >>> If writing one partition directly will fail in the middle is spark
>> will
>> >>> notice this and will fail job(say after all retries)?
>> >>>
>> >>> thanks in advance
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
>> >>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> >>>
>> >>> ---------------------------------------------------------------------
>> >>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >>> For additional commands, e-mail: user-help@spark.apache.org
>> >>>
>> >>
>> >
>> >
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>

Re: DirectFileOutputCommiter

Posted by Teng Qiu <te...@gmail.com>.
Hi, thanks :) performance gain is huge, we have a INSERT INTO query, ca.
30GB in JSON format will be written to s3 at the end, without
DirectOutputCommitter and our hack in hive and InsertIntoHiveTable.scala,
it took more than 40min, with our changes, only 15min then.

DirectOutputCommitter works for SparkContext and SqlContext, but for
HiveContext, it only solved the problem with "staging folder" in target
table, problem for HiveContext is here:
https://github.com/apache/spark/blob/v1.6.0/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala#L132

besides staging folder created by Committer, Hive will use a temp location
as well... so we made some hack on this:
https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando#diff-d579db9a8f27e0bbef37720ab14ec3f6R134

mainly idea is, we added an internal var runID, and use HiveConf
spark.hive.insert.skip.temp to disable Hive to use temp location, but with
this hack, we need to change Hive's implementation... we put our Hive.java
file under
sql/hive/src/main/java/org/apache/hadoop/hive/ql/metadata/Hive.java

you can find the full change using this link:
https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando#diff-d579db9a8f27e0bbef37720ab14ec3f6R134


i would like to forward this discuss to spark-dev, hope spark team can
think about it, and hope there will be a better solution for this, like
some more official hack :D


2016-02-26 7:24 GMT+01:00 Takeshi Yamamuro <li...@gmail.com>:

> Hi,
>
> Great work!
> What is the concrete performance gain of the committer on s3?
> I'd like to know.
>
> I think there is no direct committer for files because these kinds of
> committer has risks
> to loss data (See: SPARK-10063).
> Until this resolved, ISTM files cannot support direct commits.
>
> thanks,
>
>
>
> On Fri, Feb 26, 2016 at 8:39 AM, Teng Qiu <te...@gmail.com> wrote:
>
>> yes, should be this one
>> https://gist.github.com/aarondav/c513916e72101bbe14ec
>>
>> then need to set it in spark-defaults.conf :
>> https://github.com/zalando/spark/commit/3473f3f1ef27830813c1e0b3686e96a55f49269c#diff-f7a46208be9e80252614369be6617d65R13
>>
>> Am Freitag, 26. Februar 2016 schrieb Yin Yang :
>> > The header of DirectOutputCommitter.scala says Databricks.
>> > Did you get it from Databricks ?
>> > On Thu, Feb 25, 2016 at 3:01 PM, Teng Qiu <te...@gmail.com> wrote:
>> >>
>> >> interesting in this topic as well, why the DirectFileOutputCommitter
>> not included?
>> >> we added it in our fork,
>> under core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala
>> >> moreover, this DirectFileOutputCommitter is not working for the insert
>> operations in HiveContext, since the Committer is called by hive (means
>> uses dependencies in hive package)
>> >> we made some hack to fix this, you can take a look:
>> >>
>> https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando
>> >>
>> >> may bring some ideas to other spark contributors to find a better way
>> to use s3.
>> >>
>> >> 2016-02-22 23:18 GMT+01:00 igor.berman <ig...@gmail.com>:
>> >>>
>> >>> Hi,
>> >>> Wanted to understand if anybody uses DirectFileOutputCommitter or
>> alikes
>> >>> especially when working with s3?
>> >>> I know that there is one impl in spark distro for parquet format, but
>> not
>> >>> for files -  why?
>> >>>
>> >>> Imho, it can bring huge performance boost.
>> >>> Using default FileOutputCommiter with s3 has big overhead at commit
>> stage
>> >>> when all parts are copied one-by-one to destination dir from
>> _temporary,
>> >>> which is bottleneck when number of partitions is high.
>> >>>
>> >>> Also, wanted to know if there are some problems when using
>> >>> DirectFileOutputCommitter?
>> >>> If writing one partition directly will fail in the middle is spark
>> will
>> >>> notice this and will fail job(say after all retries)?
>> >>>
>> >>> thanks in advance
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
>> >>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> >>>
>> >>> ---------------------------------------------------------------------
>> >>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >>> For additional commands, e-mail: user-help@spark.apache.org
>> >>>
>> >>
>> >
>> >
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>

Re: DirectFileOutputCommiter

Posted by Igor Berman <ig...@gmail.com>.
the performance gain is for commit stage when data is moved from _temporary
directory to distination directory
since s3 is key-value really the move operation is like copy operation


On 26 February 2016 at 08:24, Takeshi Yamamuro <li...@gmail.com>
wrote:

> Hi,
>
> Great work!
> What is the concrete performance gain of the committer on s3?
> I'd like to know.
>
> I think there is no direct committer for files because these kinds of
> committer has risks
> to loss data (See: SPARK-10063).
> Until this resolved, ISTM files cannot support direct commits.
>
> thanks,
>
>
>
> On Fri, Feb 26, 2016 at 8:39 AM, Teng Qiu <te...@gmail.com> wrote:
>
>> yes, should be this one
>> https://gist.github.com/aarondav/c513916e72101bbe14ec
>>
>> then need to set it in spark-defaults.conf :
>> https://github.com/zalando/spark/commit/3473f3f1ef27830813c1e0b3686e96a55f49269c#diff-f7a46208be9e80252614369be6617d65R13
>>
>> Am Freitag, 26. Februar 2016 schrieb Yin Yang :
>> > The header of DirectOutputCommitter.scala says Databricks.
>> > Did you get it from Databricks ?
>> > On Thu, Feb 25, 2016 at 3:01 PM, Teng Qiu <te...@gmail.com> wrote:
>> >>
>> >> interesting in this topic as well, why the DirectFileOutputCommitter
>> not included?
>> >> we added it in our fork,
>> under core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala
>> >> moreover, this DirectFileOutputCommitter is not working for the insert
>> operations in HiveContext, since the Committer is called by hive (means
>> uses dependencies in hive package)
>> >> we made some hack to fix this, you can take a look:
>> >>
>> https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando
>> >>
>> >> may bring some ideas to other spark contributors to find a better way
>> to use s3.
>> >>
>> >> 2016-02-22 23:18 GMT+01:00 igor.berman <ig...@gmail.com>:
>> >>>
>> >>> Hi,
>> >>> Wanted to understand if anybody uses DirectFileOutputCommitter or
>> alikes
>> >>> especially when working with s3?
>> >>> I know that there is one impl in spark distro for parquet format, but
>> not
>> >>> for files -  why?
>> >>>
>> >>> Imho, it can bring huge performance boost.
>> >>> Using default FileOutputCommiter with s3 has big overhead at commit
>> stage
>> >>> when all parts are copied one-by-one to destination dir from
>> _temporary,
>> >>> which is bottleneck when number of partitions is high.
>> >>>
>> >>> Also, wanted to know if there are some problems when using
>> >>> DirectFileOutputCommitter?
>> >>> If writing one partition directly will fail in the middle is spark
>> will
>> >>> notice this and will fail job(say after all retries)?
>> >>>
>> >>> thanks in advance
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
>> >>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> >>>
>> >>> ---------------------------------------------------------------------
>> >>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >>> For additional commands, e-mail: user-help@spark.apache.org
>> >>>
>> >>
>> >
>> >
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>

Re: DirectFileOutputCommiter

Posted by Steve Loughran <st...@hortonworks.com>.
> On 26 Feb 2016, at 06:24, Takeshi Yamamuro <li...@gmail.com> wrote:
> 
> Hi,
> 
> Great work!
> What is the concrete performance gain of the committer on s3?
> I'd like to know.
> 
> I think there is no direct committer for files because these kinds of committer has risks
> to loss data (See: SPARK-10063).
> Until this resolved, ISTM files cannot support direct commits.
> 

that's speculative output via any committer; you cannot use s3 as a speculative destination for spark, MR, hive, etc.

Speculative output relies on being able to commit a file operation (create with overwrite==false) file rename or directory rename being atomic with respect to the check for the destination existing and the operation of creating or renaming. There's also a tendency to assume that file directory/rename operations are O(1)

S3 (and openstack swift) don't offer those semantics. The check for existence is done client-side before the operation, against a remote store whose metadata may not be consistent anyway (i.e it says the blob isn't there when it is, and vice versa). With rename() and delete() being done client-side, they are O(files * len(files), and can fail partway through.

what the direct committer does is bypass any attempt to write then commit by renaming, which is the performance killer.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: DirectFileOutputCommiter

Posted by Teng Qiu <te...@gmail.com>.
Hi, thanks :) performance gain is huge, we have a INSERT INTO query, ca.
30GB in JSON format will be written to s3 at the end, without
DirectOutputCommitter and our hack in hive and InsertIntoHiveTable.scala,
it took more than 40min, with our changes, only 15min then.

DirectOutputCommitter works for SparkContext and SqlContext, but for
HiveContext, it only solved the problem with "staging folder" in target
table, problem for HiveContext is here:
https://github.com/apache/spark/blob/v1.6.0/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala#L132

besides staging folder created by Committer, Hive will use a temp location
as well... so we made some hack on this:
https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando#diff-d579db9a8f27e0bbef37720ab14ec3f6R134

mainly idea is, we added an internal var runID, and use HiveConf
spark.hive.insert.skip.temp to disable Hive to use temp location, but with
this hack, we need to change Hive's implementation... we put our Hive.java
file under
sql/hive/src/main/java/org/apache/hadoop/hive/ql/metadata/Hive.java

you can find the full change using this link:
https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando#diff-d579db9a8f27e0bbef37720ab14ec3f6R134


i would like to forward this discuss to spark-dev, hope spark team can
think about it, and hope there will be a better solution for this, like
some more official hack :D


2016-02-26 7:24 GMT+01:00 Takeshi Yamamuro <li...@gmail.com>:

> Hi,
>
> Great work!
> What is the concrete performance gain of the committer on s3?
> I'd like to know.
>
> I think there is no direct committer for files because these kinds of
> committer has risks
> to loss data (See: SPARK-10063).
> Until this resolved, ISTM files cannot support direct commits.
>
> thanks,
>
>
>
> On Fri, Feb 26, 2016 at 8:39 AM, Teng Qiu <te...@gmail.com> wrote:
>
>> yes, should be this one
>> https://gist.github.com/aarondav/c513916e72101bbe14ec
>>
>> then need to set it in spark-defaults.conf :
>> https://github.com/zalando/spark/commit/3473f3f1ef27830813c1e0b3686e96a55f49269c#diff-f7a46208be9e80252614369be6617d65R13
>>
>> Am Freitag, 26. Februar 2016 schrieb Yin Yang :
>> > The header of DirectOutputCommitter.scala says Databricks.
>> > Did you get it from Databricks ?
>> > On Thu, Feb 25, 2016 at 3:01 PM, Teng Qiu <te...@gmail.com> wrote:
>> >>
>> >> interesting in this topic as well, why the DirectFileOutputCommitter
>> not included?
>> >> we added it in our fork,
>> under core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala
>> >> moreover, this DirectFileOutputCommitter is not working for the insert
>> operations in HiveContext, since the Committer is called by hive (means
>> uses dependencies in hive package)
>> >> we made some hack to fix this, you can take a look:
>> >>
>> https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando
>> >>
>> >> may bring some ideas to other spark contributors to find a better way
>> to use s3.
>> >>
>> >> 2016-02-22 23:18 GMT+01:00 igor.berman <ig...@gmail.com>:
>> >>>
>> >>> Hi,
>> >>> Wanted to understand if anybody uses DirectFileOutputCommitter or
>> alikes
>> >>> especially when working with s3?
>> >>> I know that there is one impl in spark distro for parquet format, but
>> not
>> >>> for files -  why?
>> >>>
>> >>> Imho, it can bring huge performance boost.
>> >>> Using default FileOutputCommiter with s3 has big overhead at commit
>> stage
>> >>> when all parts are copied one-by-one to destination dir from
>> _temporary,
>> >>> which is bottleneck when number of partitions is high.
>> >>>
>> >>> Also, wanted to know if there are some problems when using
>> >>> DirectFileOutputCommitter?
>> >>> If writing one partition directly will fail in the middle is spark
>> will
>> >>> notice this and will fail job(say after all retries)?
>> >>>
>> >>> thanks in advance
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
>> >>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> >>>
>> >>> ---------------------------------------------------------------------
>> >>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >>> For additional commands, e-mail: user-help@spark.apache.org
>> >>>
>> >>
>> >
>> >
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>

Re: DirectFileOutputCommiter

Posted by Takeshi Yamamuro <li...@gmail.com>.
Hi,

Great work!
What is the concrete performance gain of the committer on s3?
I'd like to know.

I think there is no direct committer for files because these kinds of
committer has risks
to loss data (See: SPARK-10063).
Until this resolved, ISTM files cannot support direct commits.

thanks,



On Fri, Feb 26, 2016 at 8:39 AM, Teng Qiu <te...@gmail.com> wrote:

> yes, should be this one
> https://gist.github.com/aarondav/c513916e72101bbe14ec
>
> then need to set it in spark-defaults.conf :
> https://github.com/zalando/spark/commit/3473f3f1ef27830813c1e0b3686e96a55f49269c#diff-f7a46208be9e80252614369be6617d65R13
>
> Am Freitag, 26. Februar 2016 schrieb Yin Yang :
> > The header of DirectOutputCommitter.scala says Databricks.
> > Did you get it from Databricks ?
> > On Thu, Feb 25, 2016 at 3:01 PM, Teng Qiu <te...@gmail.com> wrote:
> >>
> >> interesting in this topic as well, why the DirectFileOutputCommitter
> not included?
> >> we added it in our fork,
> under core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala
> >> moreover, this DirectFileOutputCommitter is not working for the insert
> operations in HiveContext, since the Committer is called by hive (means
> uses dependencies in hive package)
> >> we made some hack to fix this, you can take a look:
> >>
> https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando
> >>
> >> may bring some ideas to other spark contributors to find a better way
> to use s3.
> >>
> >> 2016-02-22 23:18 GMT+01:00 igor.berman <ig...@gmail.com>:
> >>>
> >>> Hi,
> >>> Wanted to understand if anybody uses DirectFileOutputCommitter or
> alikes
> >>> especially when working with s3?
> >>> I know that there is one impl in spark distro for parquet format, but
> not
> >>> for files -  why?
> >>>
> >>> Imho, it can bring huge performance boost.
> >>> Using default FileOutputCommiter with s3 has big overhead at commit
> stage
> >>> when all parts are copied one-by-one to destination dir from
> _temporary,
> >>> which is bottleneck when number of partitions is high.
> >>>
> >>> Also, wanted to know if there are some problems when using
> >>> DirectFileOutputCommitter?
> >>> If writing one partition directly will fail in the middle is spark will
> >>> notice this and will fail job(say after all retries)?
> >>>
> >>> thanks in advance
> >>>
> >>>
> >>>
> >>>
> >>> --
> >>> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
> >>> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> >>>
> >>> ---------------------------------------------------------------------
> >>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >>> For additional commands, e-mail: user-help@spark.apache.org
> >>>
> >>
> >
> >
>



-- 
---
Takeshi Yamamuro

Re: DirectFileOutputCommiter

Posted by Teng Qiu <te...@gmail.com>.
yes, should be this one
https://gist.github.com/aarondav/c513916e72101bbe14ec

then need to set it in spark-defaults.conf :
https://github.com/zalando/spark/commit/3473f3f1ef27830813c1e0b3686e96a55f49269c#diff-f7a46208be9e80252614369be6617d65R13

Am Freitag, 26. Februar 2016 schrieb Yin Yang :
> The header of DirectOutputCommitter.scala says Databricks.
> Did you get it from Databricks ?
> On Thu, Feb 25, 2016 at 3:01 PM, Teng Qiu <te...@gmail.com> wrote:
>>
>> interesting in this topic as well, why the DirectFileOutputCommitter not
included?
>> we added it in our fork,
under core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala
>> moreover, this DirectFileOutputCommitter is not working for the insert
operations in HiveContext, since the Committer is called by hive (means
uses dependencies in hive package)
>> we made some hack to fix this, you can take a look:
>>
https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando
>>
>> may bring some ideas to other spark contributors to find a better way to
use s3.
>>
>> 2016-02-22 23:18 GMT+01:00 igor.berman <ig...@gmail.com>:
>>>
>>> Hi,
>>> Wanted to understand if anybody uses DirectFileOutputCommitter or alikes
>>> especially when working with s3?
>>> I know that there is one impl in spark distro for parquet format, but
not
>>> for files -  why?
>>>
>>> Imho, it can bring huge performance boost.
>>> Using default FileOutputCommiter with s3 has big overhead at commit
stage
>>> when all parts are copied one-by-one to destination dir from _temporary,
>>> which is bottleneck when number of partitions is high.
>>>
>>> Also, wanted to know if there are some problems when using
>>> DirectFileOutputCommitter?
>>> If writing one partition directly will fail in the middle is spark will
>>> notice this and will fail job(say after all retries)?
>>>
>>> thanks in advance
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>
>
>

Re: DirectFileOutputCommiter

Posted by Yin Yang <yy...@gmail.com>.
The header of DirectOutputCommitter.scala says Databricks.

Did you get it from Databricks ?

On Thu, Feb 25, 2016 at 3:01 PM, Teng Qiu <te...@gmail.com> wrote:

> interesting in this topic as well, why the DirectFileOutputCommitter not
> included?
>
> we added it in our fork, under
> core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala
>
> moreover, this DirectFileOutputCommitter is not working for the insert
> operations in HiveContext, since the Committer is called by hive (means
> uses dependencies in hive package)
>
> we made some hack to fix this, you can take a look:
>
> https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando
>
> may bring some ideas to other spark contributors to find a better way to
> use s3.
>
>
> 2016-02-22 23:18 GMT+01:00 igor.berman <ig...@gmail.com>:
>
>> Hi,
>> Wanted to understand if anybody uses DirectFileOutputCommitter or alikes
>> especially when working with s3?
>> I know that there is one impl in spark distro for parquet format, but not
>> for files -  why?
>>
>> Imho, it can bring huge performance boost.
>> Using default FileOutputCommiter with s3 has big overhead at commit stage
>> when all parts are copied one-by-one to destination dir from _temporary,
>> which is bottleneck when number of partitions is high.
>>
>> Also, wanted to know if there are some problems when using
>> DirectFileOutputCommitter?
>> If writing one partition directly will fail in the middle is spark will
>> notice this and will fail job(say after all retries)?
>>
>> thanks in advance
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>

Re: DirectFileOutputCommiter

Posted by Teng Qiu <te...@gmail.com>.
interesting in this topic as well, why the DirectFileOutputCommitter not
included?

we added it in our fork, under
core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala

moreover, this DirectFileOutputCommitter is not working for the insert
operations in HiveContext, since the Committer is called by hive (means
uses dependencies in hive package)

we made some hack to fix this, you can take a look:
https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando

may bring some ideas to other spark contributors to find a better way to
use s3.


2016-02-22 23:18 GMT+01:00 igor.berman <ig...@gmail.com>:

> Hi,
> Wanted to understand if anybody uses DirectFileOutputCommitter or alikes
> especially when working with s3?
> I know that there is one impl in spark distro for parquet format, but not
> for files -  why?
>
> Imho, it can bring huge performance boost.
> Using default FileOutputCommiter with s3 has big overhead at commit stage
> when all parts are copied one-by-one to destination dir from _temporary,
> which is bottleneck when number of partitions is high.
>
> Also, wanted to know if there are some problems when using
> DirectFileOutputCommitter?
> If writing one partition directly will fail in the middle is spark will
> notice this and will fail job(say after all retries)?
>
> thanks in advance
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>