You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Etienne Chauchot <ec...@apache.org> on 2018/09/10 09:06:57 UTC

Re: [portablility] metrics interrogations

Hi all,@Luke, @Alex I have a general question related to metrics in the Fn API: as the communication between runner
harness and SDK harness is done on a bundle basis. When the runner harness sends data to the sdk harness to execute a
transform that contains metrics, does it:   1. send metrics values (for the ones defined in the transform) alongside
with data and receive an updated value of the metrics from the sdk harness when the bundle is finished processing?
   2. or does it send only the data and the sdk harness responds with a diff value of the metrics so that the runner can
update them in its side?
My bet is option 2. But can you confirm?
Thanks
Etienne
Le jeudi 19 juillet 2018 à 15:10 +0200, Etienne Chauchot a écrit :
> Thanks for the confirmations Luke.
> Le mercredi 18 juillet 2018 à 07:56 -0700, Lukasz Cwik a écrit :
> > On Wed, Jul 18, 2018 at 7:01 AM Etienne Chauchot <ec...@apache.org> wrote:
> > > Hi,
> > > Luke, Alex, I have some portable metrics interrogations, can you confirm them ? 
> > > 
> > > 1 - As it is the SDK harness that will run the code of the UDFs, if a UDF defines a metric, then the SDK harness
> > > will give updates through GRPC calls to the runner so that the runner could update metrics cells, right?
> > 
> > Yes. 
> > > 2 - Alex, you mentioned in proto and design doc that there will be no aggreagation of metrics. But some runners
> > > (spark/flink) rely on accumulators and when they are merged, it triggers the merging of the whole chain to the
> > > metric cells. I know that Dataflow does not do the same, it uses non agregated metrics and sends them to an
> > > aggregation service. Will there be a change of paradigm with portability for runners that merge themselves ? 
> > 
> > There will be local aggregation of metrics scoped to a bundle; after the bundle is finished processing they are
> > discarded. This will require some kind of global aggregation support from a runner, whether that runner does it via
> > accumulators or via an aggregation service is up to the runner.
> > > 3 - Please confirm that the distinction between attempted and committed metrics is not the business of portable
> > > metrics. Indeed, it does not involve communication between the runner harness and the SDK harness as it is a
> > > runner only matter. I mean, when a runner commits a bundle it just updates its committed metrics and do not need
> > > to inform the SDK harness. But, of course, when the user requests committed metrics through the SDK, then the SDK
> > > harness will ask the runner harness to give them.
> > > 
> > > 
> >  You are correct in saying that during execution, the SDK does not differentiate between attempted and committed
> > metrics and only the runner does. We still lack an API definition and contract for how an SDK would query for
> > metrics from a runner but your right in saying that an SDK could request committed metrics and the Runner would
> > supply them some how. 
> > > Thanks
> > > BestEtienne
> > > 
> > > 

Re: [portablility] metrics interrogations

Posted by Etienne Chauchot <ec...@apache.org>.
Le lundi 10 septembre 2018 à 09:42 -0700, Lukasz Cwik a écrit :
> Alex is out on vacation for the next 3 weeks.
> Alex had proposed the types of metrics[1] but not the exact protocol as to what the SDK and runner do. I could
> envision Alex proposing that the SDK harness only sends diffs or dirty metrics in intermediate updates and all metrics
> values in the final update.
> Robert is referring to an integration that happened to an older set of messages[2] that preceeded Alex's proposal and
> that integration with Dataflow which is still incomplete works as you described in #2.

Thanks Luke and Robert for the confirmation.
> Robin had recently been considering adding an accessor to DoFns that would allow you to get access to the job
> information from within the pipeline (current state, poll for metrics, invoke actions like cancel / drain, ...). He
> wanted it so he could poll for attempted metrics to be able to test @RequiresStableInput. 
Yes, I remember, I voted +1 to his proposal.
> Integrating the MetricsPusher or something like that on the SDK side to be able to poll metrics over the job
> information accessor could be useful.

Well, in the design discussion, we decided to host Metrics Pusher as close as possible of the actual engine (inside the
runner code chosen over the sdk code) to allow the runner to send system metrics in the future. 
> 1: https://s.apache.org/beam-fn-api-metrics
> 2: https://github.com/apache/beam/blob/9b68f926628d727e917b6a33ccdafcfe693eef6a/model/fn-execution/src/main/proto/beam
> _fn_api.proto#L410

Besides, in his PR Alex talks about deprecated metrics. As he is off, can you tell me a little more about them ? What
metrics will be deprecated when the portability framework is 100% operational on all the runners?
ThxEtienne
> 
> On Mon, Sep 10, 2018 at 8:41 AM Robert Burke <ro...@frantil.com> wrote:
> > The way I entered them into the Go SDK is #2 (SDK sends diffs per bundle) and the Java Runner Harness appears to
> > aggregate them correctly from there.
> > On Mon, Sep 10, 2018, 2:07 AM Etienne Chauchot <ec...@apache.org> wrote:
> > > Hi all,
> > > @Luke, @Alex I have a general question related to metrics in the Fn API: as the communication between runner
> > > harness and SDK harness is done on a bundle basis. When the runner harness sends data to the sdk harness to
> > > execute a transform that contains metrics, does it:
> > > send metrics values (for the ones defined in the transform) alongside with data and receive an updated value of
> > > the metrics from the sdk harness when the bundle is finished processing?or does it send only the data and the sdk
> > > harness responds with a diff value of the metrics so that the runner can update them in its side?My bet is option
> > > 2. But can you confirm?
> > > 
> > > 
> > > Thanks
> > > 
> > > Etienne
> > > Le jeudi 19 juillet 2018 à 15:10 +0200, Etienne Chauchot a écrit :
> > > > Thanks for the confirmations Luke.
> > > > Le mercredi 18 juillet 2018 à 07:56 -0700, Lukasz Cwik a écrit :
> > > > > On Wed, Jul 18, 2018 at 7:01 AM Etienne Chauchot <ec...@apache.org> wrote:
> > > > > > Hi,
> > > > > > Luke, Alex, I have some portable metrics interrogations, can you confirm them ? 
> > > > > > 
> > > > > > 1 - As it is the SDK harness that will run the code of the UDFs, if a UDF defines a metric, then the SDK
> > > > > > harness will give updates through GRPC calls to the runner so that the runner could update metrics cells,
> > > > > > right?
> > > > > 
> > > > > Yes. 
> > > > > > 2 - Alex, you mentioned in proto and design doc that there will be no aggreagation of metrics. But some
> > > > > > runners (spark/flink) rely on accumulators and when they are merged, it triggers the merging of the whole
> > > > > > chain to the metric cells. I know that Dataflow does not do the same, it uses non agregated metrics and
> > > > > > sends them to an aggregation service. Will there be a change of paradigm with portability for runners that
> > > > > > merge themselves ? 
> > > > > 
> > > > > There will be local aggregation of metrics scoped to a bundle; after the bundle is finished processing they
> > > > > are discarded. This will require some kind of global aggregation support from a runner, whether that runner
> > > > > does it via accumulators or via an aggregation service is up to the runner.
> > > > > > 3 - Please confirm that the distinction between attempted and committed metrics is not the business of
> > > > > > portable metrics. Indeed, it does not involve communication between the runner harness and the SDK harness
> > > > > > as it is a runner only matter. I mean, when a runner commits a bundle it just updates its committed metrics
> > > > > > and do not need to inform the SDK harness. But, of course, when the user requests committed metrics through
> > > > > > the SDK, then the SDK harness will ask the runner harness to give them.
> > > > > > 
> > > > > > 
> > > > >  You are correct in saying that during execution, the SDK does not differentiate between attempted and
> > > > > committed metrics and only the runner does. We still lack an API definition and contract for how an SDK would
> > > > > query for metrics from a runner but your right in saying that an SDK could request committed metrics and the
> > > > > Runner would supply them some how. 
> > > > > > Thanks
> > > > > > BestEtienne
> > > > > > 
> > > > > > 

Re: [portablility] metrics interrogations

Posted by Lukasz Cwik <lc...@google.com>.
Alex is out on vacation for the next 3 weeks.

Alex had proposed the types of metrics[1] but not the exact protocol as to
what the SDK and runner do. I could envision Alex proposing that the SDK
harness only sends diffs or dirty metrics in intermediate updates and all
metrics values in the final update.
Robert is referring to an integration that happened to an older set of
messages[2] that preceeded Alex's proposal and that integration with
Dataflow which is still incomplete works as you described in #2.

Robin had recently been considering adding an accessor to DoFns that would
allow you to get access to the job information from within the pipeline
(current state, poll for metrics, invoke actions like cancel / drain, ...).
He wanted it so he could poll for attempted metrics to be able to test
@RequiresStableInput. Integrating the MetricsPusher or something like that
on the SDK side to be able to poll metrics over the job information
accessor could be useful.

1: https://s.apache.org/beam-fn-api-metrics
2:
https://github.com/apache/beam/blob/9b68f926628d727e917b6a33ccdafcfe693eef6a/model/fn-execution/src/main/proto/beam_fn_api.proto#L410



On Mon, Sep 10, 2018 at 8:41 AM Robert Burke <ro...@frantil.com> wrote:

> The way I entered them into the Go SDK is #2 (SDK sends diffs per bundle)
> and the Java Runner Harness appears to aggregate them correctly from there.
>
> On Mon, Sep 10, 2018, 2:07 AM Etienne Chauchot <ec...@apache.org>
> wrote:
>
>> Hi all,
>>
>> @Luke, @Alex I have a general question related to metrics in the Fn API:
>> as the communication between runner harness and SDK harness is done on a
>> bundle basis. When the runner harness sends data to the sdk harness to
>> execute a transform that contains metrics, does it:
>>
>>    1. send metrics values (for the ones defined in the transform)
>>    alongside with data and receive an updated value of the metrics from the
>>    sdk harness when the bundle is finished processing?
>>    2. or does it send only the data and the sdk harness responds with a
>>    diff value of the metrics so that the runner can update them in its side?
>>
>> My bet is option 2. But can you confirm?
>>
>> Thanks
>>
>> Etienne
>>
>> Le jeudi 19 juillet 2018 à 15:10 +0200, Etienne Chauchot a écrit :
>>
>> Thanks for the confirmations Luke.
>>
>> Le mercredi 18 juillet 2018 à 07:56 -0700, Lukasz Cwik a écrit :
>>
>>
>>
>> On Wed, Jul 18, 2018 at 7:01 AM Etienne Chauchot <ec...@apache.org>
>> wrote:
>>
>> Hi,
>> Luke, Alex, I have some portable metrics interrogations, can you confirm
>> them ?
>>
>> 1 - As it is the SDK harness that will run the code of the UDFs, if a UDF
>> defines a metric, then the SDK harness will give updates through GRPC calls
>> to the runner so that the runner could update metrics cells, right?
>>
>>
>> Yes.
>>
>>
>>
>> 2 - Alex, you mentioned in proto and design doc that there will be no
>> aggreagation of metrics. But some runners (spark/flink) rely on
>> accumulators and when they are merged, it triggers the merging of the whole
>> chain to the metric cells. I know that Dataflow does not do the same, it
>> uses non agregated metrics and sends them to an aggregation service. Will
>> there be a change of paradigm with portability for runners that merge
>> themselves ?
>>
>>
>> There will be local aggregation of metrics scoped to a bundle; after the
>> bundle is finished processing they are discarded. This will require some
>> kind of global aggregation support from a runner, whether that runner does
>> it via accumulators or via an aggregation service is up to the runner.
>>
>> 3 - Please confirm that the distinction between attempted and committed
>> metrics is not the business of portable metrics. Indeed, it does not
>> involve communication between the runner harness and the SDK harness as it
>> is a runner only matter. I mean, when a runner commits a bundle it just
>> updates its committed metrics and do not need to inform the SDK harness.
>> But, of course, when the user requests committed metrics through the SDK,
>> then the SDK harness will ask the runner harness to give them.
>>
>>
>>
>> You are correct in saying that during execution, the SDK does not
>> differentiate between attempted and committed metrics and only the runner
>> does. We still lack an API definition and contract for how an SDK would
>> query for metrics from a runner but your right in saying that an SDK could
>> request committed metrics and the Runner would supply them some how.
>>
>>
>> Thanks
>>
>> Best
>> Etienne
>>
>>
>>
>>

Re: [portablility] metrics interrogations

Posted by Robert Burke <ro...@frantil.com>.
The way I entered them into the Go SDK is #2 (SDK sends diffs per bundle)
and the Java Runner Harness appears to aggregate them correctly from there.

On Mon, Sep 10, 2018, 2:07 AM Etienne Chauchot <ec...@apache.org> wrote:

> Hi all,
>
> @Luke, @Alex I have a general question related to metrics in the Fn API:
> as the communication between runner harness and SDK harness is done on a
> bundle basis. When the runner harness sends data to the sdk harness to
> execute a transform that contains metrics, does it:
>
>    1. send metrics values (for the ones defined in the transform)
>    alongside with data and receive an updated value of the metrics from the
>    sdk harness when the bundle is finished processing?
>    2. or does it send only the data and the sdk harness responds with a
>    diff value of the metrics so that the runner can update them in its side?
>
> My bet is option 2. But can you confirm?
>
> Thanks
>
> Etienne
>
> Le jeudi 19 juillet 2018 à 15:10 +0200, Etienne Chauchot a écrit :
>
> Thanks for the confirmations Luke.
>
> Le mercredi 18 juillet 2018 à 07:56 -0700, Lukasz Cwik a écrit :
>
>
>
> On Wed, Jul 18, 2018 at 7:01 AM Etienne Chauchot <ec...@apache.org>
> wrote:
>
> Hi,
> Luke, Alex, I have some portable metrics interrogations, can you confirm
> them ?
>
> 1 - As it is the SDK harness that will run the code of the UDFs, if a UDF
> defines a metric, then the SDK harness will give updates through GRPC calls
> to the runner so that the runner could update metrics cells, right?
>
>
> Yes.
>
>
>
> 2 - Alex, you mentioned in proto and design doc that there will be no
> aggreagation of metrics. But some runners (spark/flink) rely on
> accumulators and when they are merged, it triggers the merging of the whole
> chain to the metric cells. I know that Dataflow does not do the same, it
> uses non agregated metrics and sends them to an aggregation service. Will
> there be a change of paradigm with portability for runners that merge
> themselves ?
>
>
> There will be local aggregation of metrics scoped to a bundle; after the
> bundle is finished processing they are discarded. This will require some
> kind of global aggregation support from a runner, whether that runner does
> it via accumulators or via an aggregation service is up to the runner.
>
> 3 - Please confirm that the distinction between attempted and committed
> metrics is not the business of portable metrics. Indeed, it does not
> involve communication between the runner harness and the SDK harness as it
> is a runner only matter. I mean, when a runner commits a bundle it just
> updates its committed metrics and do not need to inform the SDK harness.
> But, of course, when the user requests committed metrics through the SDK,
> then the SDK harness will ask the runner harness to give them.
>
>
>
> You are correct in saying that during execution, the SDK does not
> differentiate between attempted and committed metrics and only the runner
> does. We still lack an API definition and contract for how an SDK would
> query for metrics from a runner but your right in saying that an SDK could
> request committed metrics and the Runner would supply them some how.
>
>
> Thanks
>
> Best
> Etienne
>
>
>
>

Re: [portablility] metrics interrogations

Posted by Robert Bradshaw <ro...@google.com>.
On Thu, Sep 13, 2018 at 10:06 AM Etienne Chauchot <ec...@apache.org>
wrote:

> Hi,
> Ben, thanks for clarifying. It was indeed a terminology misunderstanding
>
> Le mercredi 12 septembre 2018 à 15:10 -0700, Ben Chambers a écrit :
>
> I think there is a confusion of terminology here. Let me attempt to
> clarify as a (mostly) outsider.
>
> I think that Etienne is correct, in that the SDK harness only reports the
> difference associated with a bundle. So, if we have a metric that measures
> execution time, the SDK harness reports the time spent executing *that
> bundle*.
>
>
> Exactly !
> I first supposed that the metrics value that Robert mentioned was the
> whole value at this point of execution. Thus, to compute it the SDK needed
> to receive the metric value for previously committed bundles. But I now
> think that Robert was referring to the whole (in the sense of an absolute
> value and not -x or +x) metric value * for the current bundle*. That way,
> the SDK does not need to receive previously committed value, just to
> compute the value for the current bundle and send it.
>

Yep. Sorry for not being as clear, and thanks Ben for seeing this and
clarifying.


>
> It does not need to report the total execution time which would require
> knowing the total execution time without that bundle. This would in fact be
> nearly impossible -- since multiple bundles may be executed simultaneously
> there is no "total execution time without this bundle" available within the
> system..
>
>
> sure
>
>
> Robert is also correct, in that the SDK harness may send a partial diff
> based on the current state, when asked to do so. This is important -- if a
> bundle were to take 1 hour to complete, do we want the execution time to be
> 5 hours (for the entire hour) and then jump to 6 hours, or do we want it to
> be able to increase somewhat smoothly during the hour? Supporting a smooth
> increase requires being able to send a partial result back -- "if this
> bundle were to complete now, here is the diff that would be commited".
>
>
> it makes sense. I agree, having frequent updates avoids the tunnel effect.
>
> Of course, if a runner uses these partial updates it may need logic to
> deal with the case of a partial update from a bundle that has failed, but
> that I think that is outside the scope of the original question. Needless
> to say, it requires something like a committed value and "tentative" value
> reflecting bundles that are in progress.W
>
>
> Maybe the runner will store such values into attempted counters and not
> into committed counters ?
>

Yes. I don't think we've yet fleshed out the counter-retrieval protocols,
or requirements on the runner in that regard, but if a runner wants to be
able to report committed (or logical) counters it needs to keep these
as-yet uncommitted values separate to do the right think in case of
failure, retry, etc.

Re: [portablility] metrics interrogations

Posted by Etienne Chauchot <ec...@apache.org>.
Hi,Ben, thanks for clarifying. It was indeed a terminology misunderstanding
Le mercredi 12 septembre 2018 à 15:10 -0700, Ben Chambers a écrit :
> I think there is a confusion of terminology here. Let me attempt to clarify as a (mostly) outsider.
> 
> I think that Etienne is correct, in that the SDK harness only reports the difference associated with a bundle. So, if
> we have a metric that measures execution time, the SDK harness reports the time spent executing *that bundle*.

Exactly ! I first supposed that the metrics value that Robert mentioned was the whole value at this point of execution.
Thus, to compute it the SDK needed to receive the metric value for previously committed bundles. But I now think that
Robert was referring to the whole (in the sense of an absolute value and not -x or +x) metric value * for the current
bundle*. That way, the SDK does not need to receive previously committed value, just to compute the value for the
current bundle and send it.
> It does not need to report the total execution time which would require knowing the total execution time without that
> bundle. This would in fact be nearly impossible -- since multiple bundles may be executed simultaneously there is no
> "total execution time without this bundle" available within the system..

sure
> Robert is also correct, in that the SDK harness may send a partial diff based on the current state, when asked to do
> so. This is important -- if a bundle were to take 1 hour to complete, do we want the execution time to be 5 hours (for
> the entire hour) and then jump to 6 hours, or do we want it to be able to increase somewhat smoothly during the hour?
> Supporting a smooth increase requires being able to send a partial result back -- "if this bundle were to complete
> now, here is the diff that would be commited".

it makes sense. I agree, having frequent updates avoids the tunnel effect.
> Of course, if a runner uses these partial updates it may need logic to deal with the case of a partial update from a
> bundle that has failed, but that I think that is outside the scope of the original question. Needless to say, it
> requires something like a committed value and "tentative" value reflecting bundles that are in progress.W

Maybe the runner will store such values into attempted counters and not into committed counters ?
Best,Etienne
> Does that match what both sides have in mind?
> 
> On Wed, Sep 12, 2018 at 8:05 AM Robert Bradshaw <ro...@google.com> wrote:
> > On Wed, Sep 12, 2018 at 4:20 PM Etienne Chauchot <ec...@apache.org> wrote:
> > > Thanks Robert for the details,
> > > 
> > > I did not know that the runner harness periodically asked the SDK harness for updates. I thought it was only
> > > communicating at the beginning of the bundle and at the end. Something like the simplified  sequence diagram
> > > bellow 
> > > 
> > > 
> > > 
> > 
> > The runner may periodically send a https://github.com/apache/beam/blob/9b68f926628d727e917b6a33ccdafcfe693eef6a/mode
> > l/fn-execution/src/main/proto/beam_fn_api.proto#L263 and the response may contain metrics. 
> > 
> >  
> > > but if the metrics are not a regular diff but more the not-yet-committed dirty value, that means that the runner
> > > sends the metrics value to the sdk before the bundle is started processing.So the sequence diagram becomes
> > > something more like:
> > > 
> > > WDYT ?
> > 
> > The runner never sends any metrics to the SDK, it just listens. It may aggregate them or send them upstream. An SDK
> > can choose to publish all, none, or some strategic subset of metrics in its progress responses, as "latest values."
> > On work output, it publishes everything. (Regarding your diagram, there may not even be any "metric cells" in the
> > Java Runner Harness.)
> >  
> > > Le mardi 11 septembre 2018 à 17:53 +0200, Robert Bradshaw a écrit :On Mon, Sep 10, 2018 at 11:07 AM Etienne
> > > Chauchot <ec...@apache.org> wrote:
> > > > Hi all,
> > > > @Luke, @Alex I have a general question related to metrics in the Fn API: as the communication between runner
> > > > harness and SDK harness is done on a bundle basis. When the runner harness sends data to the sdk harness to
> > > > execute a transform that contains metrics, does it:
> > > > send metrics values (for the ones defined in the transform) alongside with data and receive an updated value of
> > > > the metrics from the sdk harness when the bundle is finished processing?or does it send only the data and the
> > > > sdk harness responds with a diff value of the metrics so that the runner can update them in its side?My bet is
> > > > option 2. But can you confirm?
> > > 
> > > The runner harness periodically asks for the status of a bundle to which the runner harness may respond with a
> > > current snapshot of metrics. These metrics are deltas in the sense that only "dirty" metrics need to be reported
> > > (i.e. unreported metrics can be assumed to have their previous values) but are *not* deltas with respect to
> > > values, i.e. the full value is reported each time. As an example, suppose one were counting red and blue marbles.
> > > The first update may be something like
> > > { red: 5, blue: 7}
> > > and if two more blue ones were found, a valid update would be
> > > { blue: 9 }
> > > On bundle completion, the full set of metrics is reported as part of the same message that declares the bundle
> > > complete. 
> > > 
> > > 
> > > On Tue, Sep 11, 2018 at 11:43 AM Etienne Chauchot <ec...@apache.org> wrote:
> > > > Le lundi 10 septembre 2018 à 09:42 -0700, Lukasz Cwik a écrit :
> > > > > Alex is out on vacation for the next 3 weeks.
> > > > > Alex had proposed the types of metrics[1] but not the exact protocol as to what the SDK and runner do. I could
> > > > > envision Alex proposing that the SDK harness only sends diffs or dirty metrics in intermediate updates and all
> > > > > metrics values in the final update.
> > > > > Robert is referring to an integration that happened to an older set of messages[2] that preceeded Alex's
> > > > > proposal and that integration with Dataflow which is still incomplete works as you described in #2.
> > > > 
> > > > Thanks Luke and Robert for the confirmation.
> > > > > Robin had recently been considering adding an accessor to DoFns that would allow you to get access to the job
> > > > > information from within the pipeline (current state, poll for metrics, invoke actions like cancel / drain,
> > > > > ...). He wanted it so he could poll for attempted metrics to be able to test @RequiresStableInput.
> > > > Yes, I remember, I voted +1 to his proposal.
> > > > 
> > > > > Integrating the MetricsPusher or something like that on the SDK side to be able to poll metrics over the job
> > > > > information accessor could be useful.
> > > > 
> > > > Well, in the design discussion, we decided to host Metrics Pusher as close as possible of the actual engine
> > > > (inside the runner code chosen over the sdk code) to allow the runner to send system metrics in the future.
> > > 
> > > +1. The runner harness can then do whatever it wants (e.g. reporting back to its master, or pushing to another
> > > service, or simply dropping them), but the SDKs only have to follow the FnAPI contract. 
> > >  
> > > > > 1: https://s.apache.org/beam-fn-api-metrics
> > > > > 2: https://github.com/apache/beam/blob/9b68f926628d727e917b6a33ccdafcfe693eef6a/model/fn-execution/src/main/pr
> > > > > oto/beam_fn_api.proto#L410
> > > > 
> > > > Besides, in his PR Alex talks about deprecated metrics. As he is off, can you tell me a little more about them ?
> > > > What metrics will be deprecated when the portability framework is 100% operational on all the runners?
> > > 
> > > Currently, the SDKs return metrics to the FnAPI via the proto found at https://github.com/apache/beam/blob/release
> > > -2.6.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L410 (and specifically user metrics at https://github.c
> > > om/apache/beam/blob/release-2.6.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L483 ) The new metrics are
> > > the nested one-ofs defined at https://github.com/apache/beam/blob/release-2.6.0/model/fn-execution/src/main/proto/
> > > beam_fn_api.proto#L269 

Re: [portablility] metrics interrogations

Posted by Ben Chambers <bc...@apache.org>.
I think there is a confusion of terminology here. Let me attempt to clarify
as a (mostly) outsider.

I think that Etienne is correct, in that the SDK harness only reports the
difference associated with a bundle. So, if we have a metric that measures
execution time, the SDK harness reports the time spent executing *that
bundle*.

It does not need to report the total execution time which would require
knowing the total execution time without that bundle. This would in fact be
nearly impossible -- since multiple bundles may be executed simultaneously
there is no "total execution time without this bundle" available within the
system.

Robert is also correct, in that the SDK harness may send a partial diff
based on the current state, when asked to do so. This is important -- if a
bundle were to take 1 hour to complete, do we want the execution time to be
5 hours (for the entire hour) and then jump to 6 hours, or do we want it to
be able to increase somewhat smoothly during the hour? Supporting a smooth
increase requires being able to send a partial result back -- "if this
bundle were to complete now, here is the diff that would be commited".

Of course, if a runner uses these partial updates it may need logic to deal
with the case of a partial update from a bundle that has failed, but that I
think that is outside the scope of the original question. Needless to say,
it requires something like a committed value and "tentative" value
reflecting bundles that are in progress.W

Does that match what both sides have in mind?

On Wed, Sep 12, 2018 at 8:05 AM Robert Bradshaw <ro...@google.com> wrote:

> On Wed, Sep 12, 2018 at 4:20 PM Etienne Chauchot <ec...@apache.org>
> wrote:
>
>> Thanks Robert for the details,
>>
>> I did not know that the runner harness periodically asked the SDK harness
>> for updates. I thought it was only communicating at the beginning of the
>> bundle and at the end. Something like the simplified sequence diagram
>> bellow
>>
>>
>>
> The runner may periodically send a
> https://github.com/apache/beam/blob/9b68f926628d727e917b6a33ccdafcfe693eef6a/model/fn-execution/src/main/proto/beam_fn_api.proto#L263
> and the response may contain metrics.
>
>
>> but if the metrics are not a regular diff but more the not-yet-committed
>> dirty value, that means that the runner sends the metrics value to the sdk
>> before the bundle is started processing.
>> So the sequence diagram becomes something more like:
>>
>>
>> WDYT ?
>>
>
> The runner never sends any metrics to the SDK, it just listens. It may
> aggregate them or send them upstream. An SDK can choose to publish all,
> none, or some strategic subset of metrics in its progress responses, as
> "latest values." On work output, it publishes everything. (Regarding your
> diagram, there may not even be any "metric cells" in the Java Runner
> Harness.)
>
>
>> Le mardi 11 septembre 2018 à 17:53 +0200, Robert Bradshaw a écrit :
>> On Mon, Sep 10, 2018 at 11:07 AM Etienne Chauchot <ec...@apache.org>
>> wrote:
>>
>> Hi all,
>>
>> @Luke, @Alex I have a general question related to metrics in the Fn API:
>> as the communication between runner harness and SDK harness is done on a
>> bundle basis. When the runner harness sends data to the sdk harness to
>> execute a transform that contains metrics, does it:
>>
>>    1. send metrics values (for the ones defined in the transform)
>>    alongside with data and receive an updated value of the metrics from the
>>    sdk harness when the bundle is finished processing?
>>    2. or does it send only the data and the sdk harness responds with a
>>    diff value of the metrics so that the runner can update them in its side?
>>
>> My bet is option 2. But can you confirm?
>>
>>
>> The runner harness periodically asks for the status of a bundle to which
>> the runner harness may respond with a current snapshot of metrics. These
>> metrics are deltas in the sense that only "dirty" metrics need to be
>> reported (i.e. unreported metrics can be assumed to have their previous
>> values) but are *not* deltas with respect to values, i.e. the full value is
>> reported each time. As an example, suppose one were counting red and blue
>> marbles. The first update may be something like
>>
>> { red: 5, blue: 7}
>>
>> and if two more blue ones were found, a valid update would be
>>
>> { blue: 9 }
>>
>> On bundle completion, the full set of metrics is reported as part of the
>> same message that declares the bundle complete.
>>
>>
>>
>> On Tue, Sep 11, 2018 at 11:43 AM Etienne Chauchot <ec...@apache.org>
>> wrote:
>>
>> Le lundi 10 septembre 2018 à 09:42 -0700, Lukasz Cwik a écrit :
>>
>> Alex is out on vacation for the next 3 weeks.
>>
>> Alex had proposed the types of metrics[1] but not the exact protocol as
>> to what the SDK and runner do. I could envision Alex proposing that the SDK
>> harness only sends diffs or dirty metrics in intermediate updates and all
>> metrics values in the final update.
>> Robert is referring to an integration that happened to an older set of
>> messages[2] that preceeded Alex's proposal and that integration with
>> Dataflow which is still incomplete works as you described in #2.
>>
>>
>> Thanks Luke and Robert for the confirmation.
>>
>>
>> Robin had recently been considering adding an accessor to DoFns that
>> would allow you to get access to the job information from within the
>> pipeline (current state, poll for metrics, invoke actions like cancel /
>> drain, ...). He wanted it so he could poll for attempted metrics to be able
>> to test @RequiresStableInput.
>>
>> Yes, I remember, I voted +1 to his proposal.
>>
>> Integrating the MetricsPusher or something like that on the SDK side to
>> be able to poll metrics over the job information accessor could be useful.
>>
>>
>> Well, in the design discussion, we decided to host Metrics Pusher as
>> close as possible of the actual engine (inside the runner code chosen over
>> the sdk code) to allow the runner to send system metrics in the future.
>>
>>
>> +1. The runner harness can then do whatever it wants (e.g. reporting back
>> to its master, or pushing to another service, or simply dropping them), but
>> the SDKs only have to follow the FnAPI contract.
>>
>>
>>
>> 1: https://s.apache.org/beam-fn-api-metrics
>> 2:
>> https://github.com/apache/beam/blob/9b68f926628d727e917b6a33ccdafcfe693eef6a/model/fn-execution/src/main/proto/beam_fn_api.proto#L410
>>
>>
>> Besides, in his PR Alex talks about deprecated metrics. As he is off, can
>> you tell me a little more about them ? What metrics will be deprecated when
>> the portability framework is 100% operational on all the runners?
>>
>>
>> Currently, the SDKs return metrics to the FnAPI via the proto found at
>> https://github.com/apache/beam/blob/release-2.6.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L410
>> (and specifically user metrics at
>> https://github.com/apache/beam/blob/release-2.6.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L483
>> ) The new metrics are the nested one-ofs defined at
>> https://github.com/apache/beam/blob/release-2.6.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L269
>>
>>
>>

Re: [portablility] metrics interrogations

Posted by Robert Bradshaw <ro...@google.com>.
On Wed, Sep 12, 2018 at 4:20 PM Etienne Chauchot <ec...@apache.org>
wrote:

> Thanks Robert for the details,
>
> I did not know that the runner harness periodically asked the SDK harness
> for updates. I thought it was only communicating at the beginning of the
> bundle and at the end. Something like the simplified sequence diagram
> bellow
>
>
>
The runner may periodically send a
https://github.com/apache/beam/blob/9b68f926628d727e917b6a33ccdafcfe693eef6a/model/fn-execution/src/main/proto/beam_fn_api.proto#L263
and the response may contain metrics.


> but if the metrics are not a regular diff but more the not-yet-committed
> dirty value, that means that the runner sends the metrics value to the sdk
> before the bundle is started processing.
> So the sequence diagram becomes something more like:
>
>
> WDYT ?
>

The runner never sends any metrics to the SDK, it just listens. It may
aggregate them or send them upstream. An SDK can choose to publish all,
none, or some strategic subset of metrics in its progress responses, as
"latest values." On work output, it publishes everything. (Regarding your
diagram, there may not even be any "metric cells" in the Java Runner
Harness.)


> Le mardi 11 septembre 2018 à 17:53 +0200, Robert Bradshaw a écrit :
> On Mon, Sep 10, 2018 at 11:07 AM Etienne Chauchot <ec...@apache.org>
> wrote:
>
> Hi all,
>
> @Luke, @Alex I have a general question related to metrics in the Fn API:
> as the communication between runner harness and SDK harness is done on a
> bundle basis. When the runner harness sends data to the sdk harness to
> execute a transform that contains metrics, does it:
>
>    1. send metrics values (for the ones defined in the transform)
>    alongside with data and receive an updated value of the metrics from the
>    sdk harness when the bundle is finished processing?
>    2. or does it send only the data and the sdk harness responds with a
>    diff value of the metrics so that the runner can update them in its side?
>
> My bet is option 2. But can you confirm?
>
>
> The runner harness periodically asks for the status of a bundle to which
> the runner harness may respond with a current snapshot of metrics. These
> metrics are deltas in the sense that only "dirty" metrics need to be
> reported (i.e. unreported metrics can be assumed to have their previous
> values) but are *not* deltas with respect to values, i.e. the full value is
> reported each time. As an example, suppose one were counting red and blue
> marbles. The first update may be something like
>
> { red: 5, blue: 7}
>
> and if two more blue ones were found, a valid update would be
>
> { blue: 9 }
>
> On bundle completion, the full set of metrics is reported as part of the
> same message that declares the bundle complete.
>
>
>
> On Tue, Sep 11, 2018 at 11:43 AM Etienne Chauchot <ec...@apache.org>
> wrote:
>
> Le lundi 10 septembre 2018 à 09:42 -0700, Lukasz Cwik a écrit :
>
> Alex is out on vacation for the next 3 weeks.
>
> Alex had proposed the types of metrics[1] but not the exact protocol as to
> what the SDK and runner do. I could envision Alex proposing that the SDK
> harness only sends diffs or dirty metrics in intermediate updates and all
> metrics values in the final update.
> Robert is referring to an integration that happened to an older set of
> messages[2] that preceeded Alex's proposal and that integration with
> Dataflow which is still incomplete works as you described in #2.
>
>
> Thanks Luke and Robert for the confirmation.
>
>
> Robin had recently been considering adding an accessor to DoFns that would
> allow you to get access to the job information from within the pipeline
> (current state, poll for metrics, invoke actions like cancel / drain, ...).
> He wanted it so he could poll for attempted metrics to be able to test
> @RequiresStableInput.
>
> Yes, I remember, I voted +1 to his proposal.
>
> Integrating the MetricsPusher or something like that on the SDK side to be
> able to poll metrics over the job information accessor could be useful.
>
>
> Well, in the design discussion, we decided to host Metrics Pusher as close
> as possible of the actual engine (inside the runner code chosen over the
> sdk code) to allow the runner to send system metrics in the future.
>
>
> +1. The runner harness can then do whatever it wants (e.g. reporting back
> to its master, or pushing to another service, or simply dropping them), but
> the SDKs only have to follow the FnAPI contract.
>
>
>
> 1: https://s.apache.org/beam-fn-api-metrics
> 2:
> https://github.com/apache/beam/blob/9b68f926628d727e917b6a33ccdafcfe693eef6a/model/fn-execution/src/main/proto/beam_fn_api.proto#L410
>
>
> Besides, in his PR Alex talks about deprecated metrics. As he is off, can
> you tell me a little more about them ? What metrics will be deprecated when
> the portability framework is 100% operational on all the runners?
>
>
> Currently, the SDKs return metrics to the FnAPI via the proto found at
> https://github.com/apache/beam/blob/release-2.6.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L410
> (and specifically user metrics at
> https://github.com/apache/beam/blob/release-2.6.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L483
> ) The new metrics are the nested one-ofs defined at
> https://github.com/apache/beam/blob/release-2.6.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L269
>
>
>

Re: [portablility] metrics interrogations

Posted by Etienne Chauchot <ec...@apache.org>.
Thanks Robert for the details,
I did not know that the runner harness periodically asked the SDK harness for updates. I thought it was only
communicating at the beginning of the bundle and at the end. Something like the simplified  sequence diagram bellow 
but if the metrics are not a regular diff but more the not-yet-committed dirty value, that means that the runner sends
the metrics value to the sdk before the bundle is started processing.So the sequence diagram becomes something more
like:
WDYT ?
Etienne 
Le mardi 11 septembre 2018 à 17:53 +0200, Robert Bradshaw a écrit :
> On Mon, Sep 10, 2018 at 11:07 AM Etienne Chauchot <ec...@apache.org> wrote:
> > Hi all,
> > @Luke, @Alex I have a general question related to metrics in the Fn API: as the communication between runner harness
> > and SDK harness is done on a bundle basis. When the runner harness sends data to the sdk harness to execute a
> > transform that contains metrics, does it:
> > send metrics values (for the ones defined in the transform) alongside with data and receive an updated value of the
> > metrics from the sdk harness when the bundle is finished processing?or does it send only the data and the sdk
> > harness responds with a diff value of the metrics so that the runner can update them in its side?My bet is option 2.
> > But can you confirm?
> 
> The runner harness periodically asks for the status of a bundle to which the runner harness may respond with a current
> snapshot of metrics. These metrics are deltas in the sense that only "dirty" metrics need to be reported (i.e.
> unreported metrics can be assumed to have their previous values) but are *not* deltas with respect to values, i.e. the
> full value is reported each time. As an example, suppose one were counting red and blue marbles. The first update may
> be something like
> { red: 5, blue: 7}
> and if two more blue ones were found, a valid update would be
> { blue: 9 }
> On bundle completion, the full set of metrics is reported as part of the same message that declares the bundle
> complete. 
> 
> 
> On Tue, Sep 11, 2018 at 11:43 AM Etienne Chauchot <ec...@apache.org> wrote:
> > Le lundi 10 septembre 2018 à 09:42 -0700, Lukasz Cwik a écrit :
> > > Alex is out on vacation for the next 3 weeks.
> > > Alex had proposed the types of metrics[1] but not the exact protocol as to what the SDK and runner do. I could
> > > envision Alex proposing that the SDK harness only sends diffs or dirty metrics in intermediate updates and all
> > > metrics values in the final update.
> > > Robert is referring to an integration that happened to an older set of messages[2] that preceeded Alex's proposal
> > > and that integration with Dataflow which is still incomplete works as you described in #2.
> > 
> > Thanks Luke and Robert for the confirmation.
> > > Robin had recently been considering adding an accessor to DoFns that would allow you to get access to the job
> > > information from within the pipeline (current state, poll for metrics, invoke actions like cancel / drain, ...).
> > > He wanted it so he could poll for attempted metrics to be able to test @RequiresStableInput.
> > Yes, I remember, I voted +1 to his proposal.
> > 
> > > Integrating the MetricsPusher or something like that on the SDK side to be able to poll metrics over the job
> > > information accessor could be useful.
> > 
> > Well, in the design discussion, we decided to host Metrics Pusher as close as possible of the actual engine (inside
> > the runner code chosen over the sdk code) to allow the runner to send system metrics in the future.
> 
> +1. The runner harness can then do whatever it wants (e.g. reporting back to its master, or pushing to another
> service, or simply dropping them), but the SDKs only have to follow the FnAPI contract. 
>  
> > > 1: https://s.apache.org/beam-fn-api-metrics
> > > 2: https://github.com/apache/beam/blob/9b68f926628d727e917b6a33ccdafcfe693eef6a/model/fn-execution/src/main/proto/
> > > beam_fn_api.proto#L410
> > 
> > Besides, in his PR Alex talks about deprecated metrics. As he is off, can you tell me a little more about them ?
> > What metrics will be deprecated when the portability framework is 100% operational on all the runners?
> 
> Currently, the SDKs return metrics to the FnAPI via the proto found at https://github.com/apache/beam/blob/release-2.6
> .0/model/fn-execution/src/main/proto/beam_fn_api.proto#L410 (and specifically user metrics at https://github.com/apach
> e/beam/blob/release-2.6.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L483 ) The new metrics are the nested
> one-ofs defined at https://github.com/apache/beam/blob/release-2.6.0/model/fn-execution/src/main/proto/beam_fn_api.pro
> to#L269 

Re: [portablility] metrics interrogations

Posted by Robert Bradshaw <ro...@google.com>.
On Mon, Sep 10, 2018 at 11:07 AM Etienne Chauchot <ec...@apache.org>
wrote:

> Hi all,
>
> @Luke, @Alex I have a general question related to metrics in the Fn API:
> as the communication between runner harness and SDK harness is done on a
> bundle basis. When the runner harness sends data to the sdk harness to
> execute a transform that contains metrics, does it:
>
>    1. send metrics values (for the ones defined in the transform)
>    alongside with data and receive an updated value of the metrics from the
>    sdk harness when the bundle is finished processing?
>    2. or does it send only the data and the sdk harness responds with a
>    diff value of the metrics so that the runner can update them in its side?
>
> My bet is option 2. But can you confirm?
>

The runner harness periodically asks for the status of a bundle to which
the runner harness may respond with a current snapshot of metrics. These
metrics are deltas in the sense that only "dirty" metrics need to be
reported (i.e. unreported metrics can be assumed to have their previous
values) but are *not* deltas with respect to values, i.e. the full value is
reported each time. As an example, suppose one were counting red and blue
marbles. The first update may be something like

{ red: 5, blue: 7}

and if two more blue ones were found, a valid update would be

{ blue: 9 }

On bundle completion, the full set of metrics is reported as part of the
same message that declares the bundle complete.



On Tue, Sep 11, 2018 at 11:43 AM Etienne Chauchot <ec...@apache.org>
wrote:

> Le lundi 10 septembre 2018 à 09:42 -0700, Lukasz Cwik a écrit :
>
> Alex is out on vacation for the next 3 weeks.
>
> Alex had proposed the types of metrics[1] but not the exact protocol as to
> what the SDK and runner do. I could envision Alex proposing that the SDK
> harness only sends diffs or dirty metrics in intermediate updates and all
> metrics values in the final update.
> Robert is referring to an integration that happened to an older set of
> messages[2] that preceeded Alex's proposal and that integration with
> Dataflow which is still incomplete works as you described in #2.
>
>
> Thanks Luke and Robert for the confirmation.
>
>
> Robin had recently been considering adding an accessor to DoFns that would
> allow you to get access to the job information from within the pipeline
> (current state, poll for metrics, invoke actions like cancel / drain, ...).
> He wanted it so he could poll for attempted metrics to be able to test
> @RequiresStableInput.
>
> Yes, I remember, I voted +1 to his proposal.
>
> Integrating the MetricsPusher or something like that on the SDK side to be
> able to poll metrics over the job information accessor could be useful.
>
>
> Well, in the design discussion, we decided to host Metrics Pusher as close
> as possible of the actual engine (inside the runner code chosen over the
> sdk code) to allow the runner to send system metrics in the future.
>

+1. The runner harness can then do whatever it wants (e.g. reporting back
to its master, or pushing to another service, or simply dropping them), but
the SDKs only have to follow the FnAPI contract.


>
> 1: https://s.apache.org/beam-fn-api-metrics
> 2:
> https://github.com/apache/beam/blob/9b68f926628d727e917b6a33ccdafcfe693eef6a/model/fn-execution/src/main/proto/beam_fn_api.proto#L410
>
>
> Besides, in his PR Alex talks about deprecated metrics. As he is off, can
> you tell me a little more about them ? What metrics will be deprecated when
> the portability framework is 100% operational on all the runners?
>

Currently, the SDKs return metrics to the FnAPI via the proto found at
https://github.com/apache/beam/blob/release-2.6.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L410
(and specifically user metrics at
https://github.com/apache/beam/blob/release-2.6.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L483
) The new metrics are the nested one-ofs defined at
https://github.com/apache/beam/blob/release-2.6.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L269


>