You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Eugene Kirpichov (JIRA)" <ji...@apache.org> on 2016/05/06 21:40:12 UTC

[jira] [Commented] (BEAM-217) BoundedSource.splitAtFraction should be splitAfterFraction

    [ https://issues.apache.org/jira/browse/BEAM-217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15274765#comment-15274765 ] 

Eugene Kirpichov commented on BEAM-217:
---------------------------------------

Seems like this is becoming more important as we are realizing service-side that the proper way to report progress is fraction of input _processed_, rather than _read_ (note: current RangeTracker classes report progress _read_, but we'll need to change this). In that case, splitAtFraction breaks down in the following example.

Assume an index-based source with range [3, 6) and we've read records 3, 4, and are currently stuck at record 4. Progress will be ⅓ (since ⅓ of the records have been processed). Since we're stuck, the mirror model will predict that we're going to stay at ⅓ forever, and we'll keep generating split suggestions at ~⅓.

However, the minimum split fraction the reader would accept is ⅔ (at position 5), but we'll never generate it and we have no way of knowing that this is the minimum fraction it will accept.

> BoundedSource.splitAtFraction should be splitAfterFraction
> ----------------------------------------------------------
>
>                 Key: BEAM-217
>                 URL: https://issues.apache.org/jira/browse/BEAM-217
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core
>            Reporter: Eugene Kirpichov
>            Assignee: Eugene Kirpichov
>            Priority: Minor
>
> Dynamic work rebalancing works by 1) determining how long the bundle should take in order to not be a straggler - the "deadline", 2) predicting where the bundle will be (position or fraction) by that deadline, and 3) requesting an atomic split (splitAtFraction).
> Currently all BoundedSource's and (in Dataflow runner) NativeReaderIterator's refuse splits if they have already consumed the requested split position.
> Splitting a task [A, C) at position B generates [A, B) and [B, C), so if we predict that by deadline the task will have last consumed position X, we should split not "at" X, but "after" X (i.e. at next(X)) - i.e. into [A, X] (because X is already consumed) and (X, C) equivalently [A, next(X)) and [next(X), C).
> One way to fit this into the current BoundedSource API is to rename splitAtFraction to splitAfterFraction and adjust the documentation. Documentation of getFractionConsumed also needs to be clarified to emphasize that it should return what fraction of all positions in the source have already been consumed, including the position of the last consumed record. For example, for an index-range task with range [0, 5), after it has read the first record at position 0, it has consumed 20%, rather than 0% (and of course not 40% even if an internal "next index" variable is now 1 - this mistake is especially easy to make in a file-based source if you base the calculations on the file's offset *after* consuming the record - the correct way is to calculate based on offsets of beginning of records).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)