You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Robert Bradshaw <ro...@google.com> on 2018/11/30 10:45:07 UTC

[SDF] Why do we need markDone (or an equivalent claim)?

In looking at the SDF examples, it seems error-prone to have to
remember to write

    tryClaim([fake-end-position])

to indicate that a restriction is finished. IIRC, this was done to
decide whether the entire restriction had been processed on return in
the case that tryClaim never returned false. It seems preferable to
encode this into the return value (with a void return meaning iff
tryClaim returned false, and a non-void return being able to indicate
any hints as to when, if ever, process should be called again).

Can someone job my memory as to if there was a case in which this wouldn't work?

Re: [SDF] Why do we need markDone (or an equivalent claim)?

Posted by Lukasz Cwik <lc...@google.com>.
On Fri, Nov 30, 2018 at 4:43 PM Robert Bradshaw <ro...@google.com> wrote:

> On Fri, Nov 30, 2018 at 10:28 PM Lukasz Cwik <lc...@google.com> wrote:
> >
> > On Fri, Nov 30, 2018 at 12:47 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >>
> >> On Fri, Nov 30, 2018 at 7:10 PM Lukasz Cwik <lc...@google.com> wrote:
> >> >
> >> > Uh, I'm not sure what your asking.
> >>
> >> I'm asking why we wanted a markDone in the first place.
> >
> > When looking at the byte key restriction tracker code, I found a couple
> of bugs around how ranges were being compared and how the byte key range
> was being claimed (we weren't computing the next key correctly). The usage
> of markDone seemed to be a crutch when attempting to correctly implement
> the tryClaim code. Also, all the framework code that powered SDF wasn't
> aware of markDone so it couldn't validate that the last claim failed. So I
> fixed the tryClaim code and then didn't need markDone and removed it since
> this was the only restriction tracker that had it.
> >
> >> > The SDF API already has a void return on processElement means that a
> call to tryClaim must have returned false
> >>
> >> We could widen this to "or finished the restriction."
> >
> > Yes, having markDone could be added to the API. Is it a crutch for
> subtle bugs in tryClaim though?
>
> I'm proposing removing the requirement of having either a markDone or
> a tryClaim(EndKey).
>

Yes, I could see that.


> >> > while a non void return allows the caller to either return STOP
> (tryClaim must have returned false) or return RESUME (with a time of when
> to resume).
> >>
> >> We could also return STOP if tryClaim never returned false but the
> >> restriction was finished.
> >>
> >> > This allows the framework code to prevent user errors by ensuring the
> restriction has been completed.
> >>
> >> I don't think the framework can ensure this. (It can enforce the above
> >> constraints that on a STOP tryClaim did indeed return false on the
> >> last call, but I'm fuzzy on the value this actually provides when it
> >> just means the use must artificially force it to return a false value.
> >> It also means we can't make it an error to try claiming values outside
> >> the initial restriction. If we want to make things more explicit, we
> >> could require a STOP or RESUME return rather than allow a void
> >> return.)
> >
> > I don't think we want SDF authors to ensure that their values are in the
> initial range first before attempting to claim them as this is the purpose
> of tryClaim. The SDF code would then be checking that the range is valid
> twice.
> >
> > processElement() {
> >   readElement
> >   isElementInRange?
> >   if (!tryClaim) {
> >     return
> >   }
> > }
> > (both isElementInRange and tryClaim are now doing the same bounds
> checking which can lead to subtle bounds checking errors).
>
> Generally code would be iterating over the range, and it would likely
> be a bug to check past it, but if we want to support code that ignores
> range.getEndPosition() and lets tryClaim do all the work I buy that as
> a good argument to allow arbitrary claim attempts.
>
> >> Maybe I'm just not clever enough to come up with a kind of source
> >> where this could be good at catching errors?
> >
> > I think the value is that we expect to implement a few restriction
> tracker classes which will be re-used across many SDF implementations. In
> this case, we could point out to the SDF author that they haven't claimed
> all that they said they would process. This would be true whether markDone
> existed or not.
>
> The general pattern is
>
> processRestriction() {
>   for (element : source[restriction]) {
>     if (!tryClaim(element)) {
>       return STOP
>     } else {
>       emit(element)
>     }
>   }
>   tryClaim(everything)
>   return STOP  // or if CONTINUE is returned, omit the above line
> }
>
> and I'm having a hard time coming up with any bugs that would be
> caught if we didn't require the (seemingly boilerplate)
> tryClaim(everything) line. Maybe I'm not thinking of the right source?


I was looking at the RestrictionTracker(
https://github.com/apache/beam/blob/176851192bba449dba0b2bc7cc45a2342b587dbd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L869)
that
is part of the Watch PTransform and it is significantly more complicated
then the others since the RestrictionTracker is responsible for "polling"
for new elements to process and expects the caller to process everything
that is part of the set. This may not be the best way this
RestrictionTracker could have been implemented since it seemed like we were
misusing several of the RestrictionTracker concepts and significantly tying
the implementation to the implementation of Watch but this could be the
example worth studying.

The other case I could see that is worth investigating is whether it helps
ensure restrictions are completed in the case of poor/incorrect exception
handling on the part of the SDF implementation.

>> > Also, "" is the byte key range, the code could have just passed in
> range.getEndPosition() in to the final tryClaim, its just that "" is
> shorthand and would be similar to passing in Long.MAX_VALUE for the file
> offset range.
> >>
> >> Having to choose a value pass depending on the restriction tracker
> >> type is something that could simply be eliminated.
> >>
> >> > On Fri, Nov 30, 2018 at 2:45 AM Robert Bradshaw <ro...@google.com>
> wrote:
> >> >>
> >> >> In looking at the SDF examples, it seems error-prone to have to
> >> >> remember to write
> >> >>
> >> >>     tryClaim([fake-end-position])
> >> >>
> >> >> to indicate that a restriction is finished. IIRC, this was done to
> >> >> decide whether the entire restriction had been processed on return in
> >> >> the case that tryClaim never returned false. It seems preferable to
> >> >> encode this into the return value (with a void return meaning iff
> >> >> tryClaim returned false, and a non-void return being able to indicate
> >> >> any hints as to when, if ever, process should be called again).
> >> >>
> >> >> Can someone job my memory as to if there was a case in which this
> wouldn't work?
>

Re: [SDF] Why do we need markDone (or an equivalent claim)?

Posted by Robert Bradshaw <ro...@google.com>.
On Fri, Nov 30, 2018 at 10:28 PM Lukasz Cwik <lc...@google.com> wrote:
>
> On Fri, Nov 30, 2018 at 12:47 PM Robert Bradshaw <ro...@google.com> wrote:
>>
>> On Fri, Nov 30, 2018 at 7:10 PM Lukasz Cwik <lc...@google.com> wrote:
>> >
>> > Uh, I'm not sure what your asking.
>>
>> I'm asking why we wanted a markDone in the first place.
>
> When looking at the byte key restriction tracker code, I found a couple of bugs around how ranges were being compared and how the byte key range was being claimed (we weren't computing the next key correctly). The usage of markDone seemed to be a crutch when attempting to correctly implement the tryClaim code. Also, all the framework code that powered SDF wasn't aware of markDone so it couldn't validate that the last claim failed. So I fixed the tryClaim code and then didn't need markDone and removed it since this was the only restriction tracker that had it.
>
>> > The SDF API already has a void return on processElement means that a call to tryClaim must have returned false
>>
>> We could widen this to "or finished the restriction."
>
> Yes, having markDone could be added to the API. Is it a crutch for subtle bugs in tryClaim though?

I'm proposing removing the requirement of having either a markDone or
a tryClaim(EndKey).

>> > while a non void return allows the caller to either return STOP (tryClaim must have returned false) or return RESUME (with a time of when to resume).
>>
>> We could also return STOP if tryClaim never returned false but the
>> restriction was finished.
>>
>> > This allows the framework code to prevent user errors by ensuring the restriction has been completed.
>>
>> I don't think the framework can ensure this. (It can enforce the above
>> constraints that on a STOP tryClaim did indeed return false on the
>> last call, but I'm fuzzy on the value this actually provides when it
>> just means the use must artificially force it to return a false value.
>> It also means we can't make it an error to try claiming values outside
>> the initial restriction. If we want to make things more explicit, we
>> could require a STOP or RESUME return rather than allow a void
>> return.)
>
> I don't think we want SDF authors to ensure that their values are in the initial range first before attempting to claim them as this is the purpose of tryClaim. The SDF code would then be checking that the range is valid twice.
>
> processElement() {
>   readElement
>   isElementInRange?
>   if (!tryClaim) {
>     return
>   }
> }
> (both isElementInRange and tryClaim are now doing the same bounds checking which can lead to subtle bounds checking errors).

Generally code would be iterating over the range, and it would likely
be a bug to check past it, but if we want to support code that ignores
range.getEndPosition() and lets tryClaim do all the work I buy that as
a good argument to allow arbitrary claim attempts.

>> Maybe I'm just not clever enough to come up with a kind of source
>> where this could be good at catching errors?
>
> I think the value is that we expect to implement a few restriction tracker classes which will be re-used across many SDF implementations. In this case, we could point out to the SDF author that they haven't claimed all that they said they would process. This would be true whether markDone existed or not.

The general pattern is

processRestriction() {
  for (element : source[restriction]) {
    if (!tryClaim(element)) {
      return STOP
    } else {
      emit(element)
    }
  }
  tryClaim(everything)
  return STOP  // or if CONTINUE is returned, omit the above line
}

and I'm having a hard time coming up with any bugs that would be
caught if we didn't require the (seemingly boilerplate)
tryClaim(everything) line. Maybe I'm not thinking of the right source?

>> > Also, "" is the byte key range, the code could have just passed in range.getEndPosition() in to the final tryClaim, its just that "" is shorthand and would be similar to passing in Long.MAX_VALUE for the file offset range.
>>
>> Having to choose a value pass depending on the restriction tracker
>> type is something that could simply be eliminated.
>>
>> > On Fri, Nov 30, 2018 at 2:45 AM Robert Bradshaw <ro...@google.com> wrote:
>> >>
>> >> In looking at the SDF examples, it seems error-prone to have to
>> >> remember to write
>> >>
>> >>     tryClaim([fake-end-position])
>> >>
>> >> to indicate that a restriction is finished. IIRC, this was done to
>> >> decide whether the entire restriction had been processed on return in
>> >> the case that tryClaim never returned false. It seems preferable to
>> >> encode this into the return value (with a void return meaning iff
>> >> tryClaim returned false, and a non-void return being able to indicate
>> >> any hints as to when, if ever, process should be called again).
>> >>
>> >> Can someone job my memory as to if there was a case in which this wouldn't work?

Re: [SDF] Why do we need markDone (or an equivalent claim)?

Posted by Lukasz Cwik <lc...@google.com>.
On Fri, Nov 30, 2018 at 12:47 PM Robert Bradshaw <ro...@google.com>
wrote:

> On Fri, Nov 30, 2018 at 7:10 PM Lukasz Cwik <lc...@google.com> wrote:
> >
> > Uh, I'm not sure what your asking.
>
> I'm asking why we wanted a markDone in the first place.
>

When looking at the byte key restriction tracker code, I found a couple of
bugs around how ranges were being compared and how the byte key range was
being claimed (we weren't computing the next key correctly). The usage of
markDone seemed to be a crutch when attempting to correctly implement the
tryClaim code. Also, all the framework code that powered SDF wasn't aware
of markDone so it couldn't validate that the last claim failed. So I fixed
the tryClaim code and then didn't need markDone and removed it since this
was the only restriction tracker that had it.


> > The SDF API already has a void return on processElement means that a
> call to tryClaim must have returned false
>
> We could widen this to "or finished the restriction."
>

Yes, having markDone could be added to the API. Is it a crutch for subtle
bugs in tryClaim though?

> while a non void return allows the caller to either return STOP (tryClaim
> must have returned false) or return RESUME (with a time of when to resume).
>
> We could also return STOP if tryClaim never returned false but the
> restriction was finished.
>
> > This allows the framework code to prevent user errors by ensuring the
> restriction has been completed.
>
> I don't think the framework can ensure this. (It can enforce the above
> constraints that on a STOP tryClaim did indeed return false on the
> last call, but I'm fuzzy on the value this actually provides when it
> just means the use must artificially force it to return a false value.
> It also means we can't make it an error to try claiming values outside
> the initial restriction. If we want to make things more explicit, we
> could require a STOP or RESUME return rather than allow a void
> return.)


I don't think we want SDF authors to ensure that their values are in the
initial range first before attempting to claim them as this is the purpose
of tryClaim. The SDF code would then be checking that the range is valid
twice.

processElement() {
  readElement
  isElementInRange?
  if (!tryClaim) {
    return
  }
}
(both isElementInRange and tryClaim are now doing the same bounds checking
which can lead to subtle bounds checking errors).


> Maybe I'm just not clever enough to come up with a kind of source
> where this could be good at catching errors?
>

I think the value is that we expect to implement a few restriction tracker
classes which will be re-used across many SDF implementations. In this
case, we could point out to the SDF author that they haven't claimed all
that they said they would process. This would be true whether markDone
existed or not.


> > Also, "" is the byte key range, the code could have just passed in
> range.getEndPosition() in to the final tryClaim, its just that "" is
> shorthand and would be similar to passing in Long.MAX_VALUE for the file
> offset range.
>
> Having to choose a value pass depending on the restriction tracker
> type is something that could simply be eliminated.

> On Fri, Nov 30, 2018 at 2:45 AM Robert Bradshaw <ro...@google.com>
> wrote:
> >>
> >> In looking at the SDF examples, it seems error-prone to have to
> >> remember to write
> >>
> >>     tryClaim([fake-end-position])
> >>
> >> to indicate that a restriction is finished. IIRC, this was done to
> >> decide whether the entire restriction had been processed on return in
> >> the case that tryClaim never returned false. It seems preferable to
> >> encode this into the return value (with a void return meaning iff
> >> tryClaim returned false, and a non-void return being able to indicate
> >> any hints as to when, if ever, process should be called again).
> >>
> >> Can someone job my memory as to if there was a case in which this
> wouldn't work?
>

Re: [SDF] Why do we need markDone (or an equivalent claim)?

Posted by Robert Bradshaw <ro...@google.com>.
On Fri, Nov 30, 2018 at 7:10 PM Lukasz Cwik <lc...@google.com> wrote:
>
> Uh, I'm not sure what your asking.

I'm asking why we wanted a markDone in the first place.

> The SDF API already has a void return on processElement means that a call to tryClaim must have returned false

We could widen this to "or finished the restriction."

> while a non void return allows the caller to either return STOP (tryClaim must have returned false) or return RESUME (with a time of when to resume).

We could also return STOP if tryClaim never returned false but the
restriction was finished.

> This allows the framework code to prevent user errors by ensuring the restriction has been completed.

I don't think the framework can ensure this. (It can enforce the above
constraints that on a STOP tryClaim did indeed return false on the
last call, but I'm fuzzy on the value this actually provides when it
just means the use must artificially force it to return a false value.
It also means we can't make it an error to try claiming values outside
the initial restriction. If we want to make things more explicit, we
could require a STOP or RESUME return rather than allow a void
return.)

Maybe I'm just not clever enough to come up with a kind of source
where this could be good at catching errors?

> Also, "" is the byte key range, the code could have just passed in range.getEndPosition() in to the final tryClaim, its just that "" is shorthand and would be similar to passing in Long.MAX_VALUE for the file offset range.

Having to choose a value pass depending on the restriction tracker
type is something that could simply be eliminated.

> On Fri, Nov 30, 2018 at 2:45 AM Robert Bradshaw <ro...@google.com> wrote:
>>
>> In looking at the SDF examples, it seems error-prone to have to
>> remember to write
>>
>>     tryClaim([fake-end-position])
>>
>> to indicate that a restriction is finished. IIRC, this was done to
>> decide whether the entire restriction had been processed on return in
>> the case that tryClaim never returned false. It seems preferable to
>> encode this into the return value (with a void return meaning iff
>> tryClaim returned false, and a non-void return being able to indicate
>> any hints as to when, if ever, process should be called again).
>>
>> Can someone job my memory as to if there was a case in which this wouldn't work?

Re: [SDF] Why do we need markDone (or an equivalent claim)?

Posted by Lukasz Cwik <lc...@google.com>.
Uh, I'm not sure what your asking. The SDF API already has a void return on
processElement means that a call to tryClaim must have returned false while
a non void return allows the caller to either return STOP (tryClaim must
have returned false) or return RESUME (with a time of when to resume). This
allows the framework code to prevent user errors by ensuring the
restriction has been completed.

Also, "" is the byte key range, the code could have just passed in
range.getEndPosition() in to the final tryClaim, its just that "" is
shorthand and would be similar to passing in Long.MAX_VALUE for the file
offset range.



On Fri, Nov 30, 2018 at 2:45 AM Robert Bradshaw <ro...@google.com> wrote:

> In looking at the SDF examples, it seems error-prone to have to
> remember to write
>
>     tryClaim([fake-end-position])
>
> to indicate that a restriction is finished. IIRC, this was done to
> decide whether the entire restriction had been processed on return in
> the case that tryClaim never returned false. It seems preferable to
> encode this into the return value (with a void return meaning iff
> tryClaim returned false, and a non-void return being able to indicate
> any hints as to when, if ever, process should be called again).
>
> Can someone job my memory as to if there was a case in which this wouldn't
> work?
>