You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Roland Harangozo <ro...@gmail.com> on 2016/05/12 17:19:44 UTC

BEAM-206

Hi All,

I would like to fix this issue:
https://issues.apache.org/jira/browse/BEAM-206

Could you please revise my design proposal?

I would copy and optionally remove the temporary files one by one as an
atomic operation rather then copying all of the temporary files and then
removing them (if we need to remove). It has the following benefits:
* If the move operation supported by the file system and the file retention
is remove, we can use the native file move operation (or rename). Could be
significantly faster than the copy and remove.
* By moving the remove operation close to the copy operation, the
probability is lower to copy the file again because of any failure (if one
file of two is moved but the other one failed, when we replay, it moves
only the one that failed rather than starting from scratch)

Regarding the concurrency, I would use an ExecutorService to run the
aforementioned operation simultaneously. The first exception would stop
(interrupt) all operation.

The level of the concurrency (number of threads) would be file system
specific and configurable. I can imagine 10+ threads gives a good
performance on GCS but gives bad performance on local file system.

Best regards,
Roland Harangozo

Re: BEAM-206

Posted by Dan Halperin <dh...@google.com.INVALID>.
Hi Roland,

Sorry for the delay -- I did not know how to use JIRA.

I have made you a BEAM contributor and have assigned BEAM-284 to you.

Thanks!
Dan

On Sat, May 14, 2016 at 3:58 AM, Roland Harangozo <ro...@gmail.com> wrote:

> Hi Dan,
>
> No problem ;)
>
> *"Would you be willing to review it when it's ready?" - Sure.*
> *"I think the notion of making this a file-at-a-time operation is wrong --
> we*
> *still want to preserve the ability to make a batch request for a network
> file system like GCS or S3."* - Agree, I wanted to say (but did badly) to
> combine the copy & remove into a single operation and execute it
> concurrently rather than copy all the files concurrently and then remove
> them. Later one (existing solution) rules out to take advantage of the
> native move/rename operation.
>
> Thank you for the review, I find your comments really useful.
>
> *"Would you like to try implementing a FileOperationsFactory*
> *<https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java#L554
> <https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java#L554>> for another
> endpoint such as AWS S3?" - *New JIRA ticket has been created on this:
> BEAM-284 <https://issues.apache.org/jira/browse/BEAM-284> Could you
> please take a look at it and assign it to me if it is okay?
>
> Thanks,
> Roland
>
> 2016-05-13 17:36 GMT+01:00 Dan Halperin <dh...@google.com.invalid>:
>
>> Hi Roland,
>>
>> I'm a big jerk, but apparently I forgot to assign this issue to myself
>> when
>> I created it to track my own work. Sorry for the inconvenience, but I am
>> already testing a fairly complex solution here. Would you be willing to
>> review it when it's ready?
>>
>> Comments on the design inline.
>>
>> On Fri, May 13, 2016 at 8:48 AM, Tyler Akidau <ta...@apache.org> wrote:
>>
>> > +Daniel Halperin <dh...@google.com>
>>
>> >
>> >
>> > On Thu, May 12, 2016 at 10:20 AM Roland Harangozo <ro...@gmail.com>
>> > wrote:
>> >
>> >> Hi All,
>> >>
>> >> I would like to fix this issue:
>> >> https://issues.apache.org/jira/browse/BEAM-206
>> >>
>> >> Could you please revise my design proposal?
>> >>
>> >> I would copy and optionally remove the temporary files one by one as an
>> >> atomic operation rather then copying all of the temporary files and
>> then
>> >> removing them (if we need to remove). It has the following benefits:
>> >>
>> >
>> I think the notion of making this a file-at-a-time operation is wrong --
>> we
>> still want
>> to preserve the ability to make a batch request for a network file system
>> like GCS
>> or S3.
>>
>> * If the move operation supported by the file system and the file
>> retention
>> >> is remove, we can use the native file move operation (or rename).
>> Could be
>> >> significantly faster than the copy and remove.
>> >>
>> >
>> You're right that we should change the interface from copy & remove to
>> rename (which can be internally implemented as copy & remove if a file
>> storage
>> requires it). This will admit faster implementations for systems that have
>> an atomic rename operation (like file systems ;).
>>
>>
>> > * By moving the remove operation close to the copy operation, the
>> >> probability is lower to copy the file again because of any failure (if
>> one
>> >> file of two is moved but the other one failed, when we replay, it moves
>> >> only the one that failed rather than starting from scratch)
>> >>
>> >
>> I'm not sure this part follows inside the Beam model. There is no easy way
>> to force
>> each file to be in its own bundle, so we can't really do retries (at the
>> model level)
>> independently for each file.
>>
>> You can of course follow this model inside a bulk-rename step, but you'll
>> have to carefully consider the semantics if some rename fails and the
>> entire operation is retried. I'm confident that this could be made into a
>> good design!
>>
>> I think it's not trivial to detect correctness here. If the move "source"
>> does not exist,
>> can you tell a successful move from an error? (Note that it's common for
>> users
>> to rerun a job without deleting the old output, so the "destination" may
>> already exist. *sigh ;)*.)
>>
>> Regarding the concurrency, I would use an ExecutorService to run the
>> >> aforementioned operation simultaneously.
>> >
>> >
>> Seems right to me!
>>
>>
>> > The first exception would stop
>> >> (interrupt) all operation.
>> >>
>> >
>> Per the above comments -- we need to design this step to idempotent (or as
>> close as we can).
>> Stopping at the first exception may be a good thing to do, as long as
>> retrying or resuming
>> will result in the correct output.
>>
>>
>> >
>> >> The level of the concurrency (number of threads) would be file system
>> >> specific and configurable. I can imagine 10+ threads gives a good
>> >> performance on GCS but gives bad performance on local file system.
>> >>
>> >
>> This is true -- you will want to tune the implementation for each file
>> storage.
>>
>> I have done many experiments in the past week about GCS in particular --
>> the
>> conclusion here was to use batches of size 100 and about 32 concurrent
>> threads
>> for best performance and also robustness to failures.
>>
>>
>> >
>> >> Best regards,
>> >> Roland Harangozo
>> >>
>> >
>> Thanks so much for this email and design -- it's great. Let's keep
>> discussing, and, would you be willing to review a pull request from me
>> for the GCS part of this change?
>>
>> Would you like to try implementing a FileOperationsFactory
>> <
>> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java#L554
>> >
>> for
>> another endpoint such as AWS S3?
>>
>> Thanks,
>> Dan
>>
>
>

Re: BEAM-206

Posted by Roland Harangozo <ro...@gmail.com>.
Hi Dan,

No problem ;)

*"Would you be willing to review it when it's ready?" - Sure.*
*"I think the notion of making this a file-at-a-time operation is wrong --
we*
*still want to preserve the ability to make a batch request for a network
file system like GCS or S3."* - Agree, I wanted to say (but did badly) to
combine the copy & remove into a single operation and execute it
concurrently rather than copy all the files concurrently and then remove
them. Later one (existing solution) rules out to take advantage of the
native move/rename operation.

Thank you for the review, I find your comments really useful.

*"Would you like to try implementing a FileOperationsFactory*
*<https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java#L554
<https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java#L554>>
for another
endpoint such as AWS S3?" - *New JIRA ticket has been created on this:
BEAM-284 <https://issues.apache.org/jira/browse/BEAM-284> Could you please
take a look at it and assign it to me if it is okay?

Thanks,
Roland

2016-05-13 17:36 GMT+01:00 Dan Halperin <dh...@google.com.invalid>:

> Hi Roland,
>
> I'm a big jerk, but apparently I forgot to assign this issue to myself when
> I created it to track my own work. Sorry for the inconvenience, but I am
> already testing a fairly complex solution here. Would you be willing to
> review it when it's ready?
>
> Comments on the design inline.
>
> On Fri, May 13, 2016 at 8:48 AM, Tyler Akidau <ta...@apache.org> wrote:
>
> > +Daniel Halperin <dh...@google.com>
> >
> >
> > On Thu, May 12, 2016 at 10:20 AM Roland Harangozo <ro...@gmail.com>
> > wrote:
> >
> >> Hi All,
> >>
> >> I would like to fix this issue:
> >> https://issues.apache.org/jira/browse/BEAM-206
> >>
> >> Could you please revise my design proposal?
> >>
> >> I would copy and optionally remove the temporary files one by one as an
> >> atomic operation rather then copying all of the temporary files and then
> >> removing them (if we need to remove). It has the following benefits:
> >>
> >
> I think the notion of making this a file-at-a-time operation is wrong -- we
> still want
> to preserve the ability to make a batch request for a network file system
> like GCS
> or S3.
>
> * If the move operation supported by the file system and the file retention
> >> is remove, we can use the native file move operation (or rename). Could
> be
> >> significantly faster than the copy and remove.
> >>
> >
> You're right that we should change the interface from copy & remove to
> rename (which can be internally implemented as copy & remove if a file
> storage
> requires it). This will admit faster implementations for systems that have
> an atomic rename operation (like file systems ;).
>
>
> > * By moving the remove operation close to the copy operation, the
> >> probability is lower to copy the file again because of any failure (if
> one
> >> file of two is moved but the other one failed, when we replay, it moves
> >> only the one that failed rather than starting from scratch)
> >>
> >
> I'm not sure this part follows inside the Beam model. There is no easy way
> to force
> each file to be in its own bundle, so we can't really do retries (at the
> model level)
> independently for each file.
>
> You can of course follow this model inside a bulk-rename step, but you'll
> have to carefully consider the semantics if some rename fails and the
> entire operation is retried. I'm confident that this could be made into a
> good design!
>
> I think it's not trivial to detect correctness here. If the move "source"
> does not exist,
> can you tell a successful move from an error? (Note that it's common for
> users
> to rerun a job without deleting the old output, so the "destination" may
> already exist. *sigh ;)*.)
>
> Regarding the concurrency, I would use an ExecutorService to run the
> >> aforementioned operation simultaneously.
> >
> >
> Seems right to me!
>
>
> > The first exception would stop
> >> (interrupt) all operation.
> >>
> >
> Per the above comments -- we need to design this step to idempotent (or as
> close as we can).
> Stopping at the first exception may be a good thing to do, as long as
> retrying or resuming
> will result in the correct output.
>
>
> >
> >> The level of the concurrency (number of threads) would be file system
> >> specific and configurable. I can imagine 10+ threads gives a good
> >> performance on GCS but gives bad performance on local file system.
> >>
> >
> This is true -- you will want to tune the implementation for each file
> storage.
>
> I have done many experiments in the past week about GCS in particular --
> the
> conclusion here was to use batches of size 100 and about 32 concurrent
> threads
> for best performance and also robustness to failures.
>
>
> >
> >> Best regards,
> >> Roland Harangozo
> >>
> >
> Thanks so much for this email and design -- it's great. Let's keep
> discussing, and, would you be willing to review a pull request from me
> for the GCS part of this change?
>
> Would you like to try implementing a FileOperationsFactory
> <
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java#L554
> >
> for
> another endpoint such as AWS S3?
>
> Thanks,
> Dan
>

Re: BEAM-206

Posted by Dan Halperin <dh...@google.com.INVALID>.
Hi Roland,

I'm a big jerk, but apparently I forgot to assign this issue to myself when
I created it to track my own work. Sorry for the inconvenience, but I am
already testing a fairly complex solution here. Would you be willing to
review it when it's ready?

Comments on the design inline.

On Fri, May 13, 2016 at 8:48 AM, Tyler Akidau <ta...@apache.org> wrote:

> +Daniel Halperin <dh...@google.com>
>
>
> On Thu, May 12, 2016 at 10:20 AM Roland Harangozo <ro...@gmail.com>
> wrote:
>
>> Hi All,
>>
>> I would like to fix this issue:
>> https://issues.apache.org/jira/browse/BEAM-206
>>
>> Could you please revise my design proposal?
>>
>> I would copy and optionally remove the temporary files one by one as an
>> atomic operation rather then copying all of the temporary files and then
>> removing them (if we need to remove). It has the following benefits:
>>
>
I think the notion of making this a file-at-a-time operation is wrong -- we
still want
to preserve the ability to make a batch request for a network file system
like GCS
or S3.

* If the move operation supported by the file system and the file retention
>> is remove, we can use the native file move operation (or rename). Could be
>> significantly faster than the copy and remove.
>>
>
You're right that we should change the interface from copy & remove to
rename (which can be internally implemented as copy & remove if a file
storage
requires it). This will admit faster implementations for systems that have
an atomic rename operation (like file systems ;).


> * By moving the remove operation close to the copy operation, the
>> probability is lower to copy the file again because of any failure (if one
>> file of two is moved but the other one failed, when we replay, it moves
>> only the one that failed rather than starting from scratch)
>>
>
I'm not sure this part follows inside the Beam model. There is no easy way
to force
each file to be in its own bundle, so we can't really do retries (at the
model level)
independently for each file.

You can of course follow this model inside a bulk-rename step, but you'll
have to carefully consider the semantics if some rename fails and the
entire operation is retried. I'm confident that this could be made into a
good design!

I think it's not trivial to detect correctness here. If the move "source"
does not exist,
can you tell a successful move from an error? (Note that it's common for
users
to rerun a job without deleting the old output, so the "destination" may
already exist. *sigh ;)*.)

Regarding the concurrency, I would use an ExecutorService to run the
>> aforementioned operation simultaneously.
>
>
Seems right to me!


> The first exception would stop
>> (interrupt) all operation.
>>
>
Per the above comments -- we need to design this step to idempotent (or as
close as we can).
Stopping at the first exception may be a good thing to do, as long as
retrying or resuming
will result in the correct output.


>
>> The level of the concurrency (number of threads) would be file system
>> specific and configurable. I can imagine 10+ threads gives a good
>> performance on GCS but gives bad performance on local file system.
>>
>
This is true -- you will want to tune the implementation for each file
storage.

I have done many experiments in the past week about GCS in particular -- the
conclusion here was to use batches of size 100 and about 32 concurrent
threads
for best performance and also robustness to failures.


>
>> Best regards,
>> Roland Harangozo
>>
>
Thanks so much for this email and design -- it's great. Let's keep
discussing, and, would you be willing to review a pull request from me
for the GCS part of this change?

Would you like to try implementing a FileOperationsFactory
<https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java#L554>
for
another endpoint such as AWS S3?

Thanks,
Dan

Re: BEAM-206

Posted by Tyler Akidau <ta...@apache.org>.
+Daniel Halperin <dh...@google.com>

On Thu, May 12, 2016 at 10:20 AM Roland Harangozo <ro...@gmail.com> wrote:

> Hi All,
>
> I would like to fix this issue:
> https://issues.apache.org/jira/browse/BEAM-206
>
> Could you please revise my design proposal?
>
> I would copy and optionally remove the temporary files one by one as an
> atomic operation rather then copying all of the temporary files and then
> removing them (if we need to remove). It has the following benefits:
> * If the move operation supported by the file system and the file retention
> is remove, we can use the native file move operation (or rename). Could be
> significantly faster than the copy and remove.
> * By moving the remove operation close to the copy operation, the
> probability is lower to copy the file again because of any failure (if one
> file of two is moved but the other one failed, when we replay, it moves
> only the one that failed rather than starting from scratch)
>
> Regarding the concurrency, I would use an ExecutorService to run the
> aforementioned operation simultaneously. The first exception would stop
> (interrupt) all operation.
>
> The level of the concurrency (number of threads) would be file system
> specific and configurable. I can imagine 10+ threads gives a good
> performance on GCS but gives bad performance on local file system.
>
> Best regards,
> Roland Harangozo
>