You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Łukasz Gajowy <lg...@apache.org> on 2019/07/26 15:48:13 UTC

Collecting metrics in JobInvocation - BEAM-4775

Hi all,

I'm currently working on BEAM-4775
<https://issues.apache.org/jira/browse/BEAM-4775>. The goal here is to pass
portable MetricResults over the RPC API to the PortableRunner (SDK) part
and allow reading them there. The metrics can be collected from the
pipeline result that is available in JobInvocation's callbacks. The
callbacks are registered in *start()
<https://github.com/apache/beam/blob/a999e858a7282c8c7d1eea2670df252dea78c537/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java#L87>*
and
*cancel()
<https://github.com/apache/beam/blob/a999e858a7282c8c7d1eea2670df252dea78c537/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java#L149>
*methods
of JobInvocation. This is the place where my problems begin:

I want to access the pipeline result and get the MetricResults from it.
This is possible *only in onSuccess(PipelineResult result) method* of the
callbacks registered in *start() and* *cancel() *in JobInvocation. Now,
when I cancel the job invocation, *invocationFuture.cancel()
<https://github.com/apache/beam/blob/a999e858a7282c8c7d1eea2670df252dea78c537/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java#L148>*
is
called and will result in invoking *onFailure(Throwable throwable) *in case
the pipeline is still running. *onFailure()* has no PipelineResult
parameter, hence there currently is no possibility to collect the metrics
there.

My questions currently are:

   - Should we collect metrics after the job is canceled? So far I assumed
   that we should.
   - If so, does anyone have some other ideas on how to collect metrics so
   that we could collect them when canceling the job?

PR I'm working on with more discussions on the topic: PR 9020
<https://github.com/apache/beam/pull/9020>
The current idea on how the metrics could be collected in JobInvocation:
link
<https://github.com/apache/beam/pull/9020/files#diff-19f1da178ef8693f13c026d3bf70398a>

Thanks,
Łukasz

Re: Collecting metrics in JobInvocation - BEAM-4775

Posted by Łukasz Gajowy <lg...@apache.org>.
Thanks, Kenn and Robert! It got me thinking and digging.

In terms of BEAM-4775 <https://issues.apache.org/jira/browse/BEAM-4775> I'm
currently working on, it seems the only thing I can do now is preparing a
version without collecting metrics when after cancel(). I think I'll go
this direction first (looks like a low hanging fruit) and then work on next
steps.

I think the question here is whether PipelineRunner::run is allowed to be
blocking.

IMO, it shouldn't be blocking. There's also an existing issue (at least for
Flink) to make FlinkRunner::run() and cancel() unblocking - BEAM-593
<https://jira.apache.org/jira/browse/BEAM-593>. Solving this issue will
allow us collecting PipelineResult early (before the job finishes), and
call cancel() or any other method (e.g. getMetrics()) on it in any random
moment while the job is running without depending on its current state.
Other than that I think we need a way to get the state of FlinkRunner (any
other runner?) while it's still running (via callback from runner?). From
what I understand state transitioning in JobInvocation is not done as it
should be (see this TODO
<https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java#L85>).
Once we get state from the runner we could set it in JobInvocation
properly.

śr., 7 sie 2019 o 15:18 Robert Bradshaw <ro...@google.com> napisał(a):

> I think the question here is whether PipelineRunner::run is allowed to
> be blocking. If it is, then the futures make sense (but there's no way
> to properly cancel it). I'm OK with not being able to return metrics
> on cancel in this case, or the case the pipeline didn't even start up
> yet. Otherwise, we should quickly get a handle to the PipelineResult
> and be able to query that for all future use.
>
> On Fri, Jul 26, 2019 at 6:04 PM Kenneth Knowles <ke...@apache.org> wrote:
> >
> > Took a look at the code, too. It seems like a mismatch in a few ways
> >
> >  - PipelineRunner::run is async already and returns while the job is
> still running
> >  - PipelineResult is a legacy name - it is really meant to be a handle
> to a running job
> >  - cancel() on a future is just not really related to cancel() in a job.
> I would expect to cancel a job with PipelineResult::cancel and I would
> expect JobInvocation::cancel to cancel the "start job"
> RPC/request/whatever. So I would not expect metrics for a job which I
> decided to not even start.
> >
> > Kenn
> >
> > On Fri, Jul 26, 2019 at 8:48 AM Łukasz Gajowy <lg...@apache.org>
> wrote:
> >>
> >> Hi all,
> >>
> >> I'm currently working on BEAM-4775. The goal here is to pass portable
> MetricResults over the RPC API to the PortableRunner (SDK) part and allow
> reading them there. The metrics can be collected from the pipeline result
> that is available in JobInvocation's callbacks. The callbacks are
> registered in start() and cancel() methods of JobInvocation. This is the
> place where my problems begin:
> >>
> >> I want to access the pipeline result and get the MetricResults from it.
> This is possible only in onSuccess(PipelineResult result) method of the
> callbacks registered in start() and cancel() in JobInvocation. Now, when I
> cancel the job invocation, invocationFuture.cancel() is called and will
> result in invoking onFailure(Throwable throwable) in case the pipeline is
> still running. onFailure() has no PipelineResult parameter, hence there
> currently is no possibility to collect the metrics there.
> >>
> >> My questions currently are:
> >>
> >> Should we collect metrics after the job is canceled? So far I assumed
> that we should.
> >> If so, does anyone have some other ideas on how to collect metrics so
> that we could collect them when canceling the job?
> >>
> >> PR I'm working on with more discussions on the topic: PR 9020
> >> The current idea on how the metrics could be collected in
> JobInvocation: link
> >>
> >> Thanks,
> >> Łukasz
> >>
>

Re: Collecting metrics in JobInvocation - BEAM-4775

Posted by Robert Bradshaw <ro...@google.com>.
I think the question here is whether PipelineRunner::run is allowed to
be blocking. If it is, then the futures make sense (but there's no way
to properly cancel it). I'm OK with not being able to return metrics
on cancel in this case, or the case the pipeline didn't even start up
yet. Otherwise, we should quickly get a handle to the PipelineResult
and be able to query that for all future use.

On Fri, Jul 26, 2019 at 6:04 PM Kenneth Knowles <ke...@apache.org> wrote:
>
> Took a look at the code, too. It seems like a mismatch in a few ways
>
>  - PipelineRunner::run is async already and returns while the job is still running
>  - PipelineResult is a legacy name - it is really meant to be a handle to a running job
>  - cancel() on a future is just not really related to cancel() in a job. I would expect to cancel a job with PipelineResult::cancel and I would expect JobInvocation::cancel to cancel the "start job" RPC/request/whatever. So I would not expect metrics for a job which I decided to not even start.
>
> Kenn
>
> On Fri, Jul 26, 2019 at 8:48 AM Łukasz Gajowy <lg...@apache.org> wrote:
>>
>> Hi all,
>>
>> I'm currently working on BEAM-4775. The goal here is to pass portable MetricResults over the RPC API to the PortableRunner (SDK) part and allow reading them there. The metrics can be collected from the pipeline result that is available in JobInvocation's callbacks. The callbacks are registered in start() and cancel() methods of JobInvocation. This is the place where my problems begin:
>>
>> I want to access the pipeline result and get the MetricResults from it. This is possible only in onSuccess(PipelineResult result) method of the callbacks registered in start() and cancel() in JobInvocation. Now, when I cancel the job invocation, invocationFuture.cancel() is called and will result in invoking onFailure(Throwable throwable) in case the pipeline is still running. onFailure() has no PipelineResult parameter, hence there currently is no possibility to collect the metrics there.
>>
>> My questions currently are:
>>
>> Should we collect metrics after the job is canceled? So far I assumed that we should.
>> If so, does anyone have some other ideas on how to collect metrics so that we could collect them when canceling the job?
>>
>> PR I'm working on with more discussions on the topic: PR 9020
>> The current idea on how the metrics could be collected in JobInvocation: link
>>
>> Thanks,
>> Łukasz
>>

Re: Collecting metrics in JobInvocation - BEAM-4775

Posted by Kenneth Knowles <ke...@apache.org>.
Took a look at the code, too. It seems like a mismatch in a few ways

 - PipelineRunner::run is async already and returns while the job is still
running
 - PipelineResult is a legacy name - it is really meant to be a handle to a
running job
 - cancel() on a future is just not really related to cancel() in a job. I
would expect to cancel a job with PipelineResult::cancel and I would expect
JobInvocation::cancel to cancel the "start job" RPC/request/whatever. So I
would not expect metrics for a job which I decided to not even start.

Kenn

On Fri, Jul 26, 2019 at 8:48 AM Łukasz Gajowy <lg...@apache.org> wrote:

> Hi all,
>
> I'm currently working on BEAM-4775
> <https://issues.apache.org/jira/browse/BEAM-4775>. The goal here is to
> pass portable MetricResults over the RPC API to the PortableRunner (SDK)
> part and allow reading them there. The metrics can be collected from the
> pipeline result that is available in JobInvocation's callbacks. The
> callbacks are registered in *start()
> <https://github.com/apache/beam/blob/a999e858a7282c8c7d1eea2670df252dea78c537/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java#L87>* and
> *cancel()
> <https://github.com/apache/beam/blob/a999e858a7282c8c7d1eea2670df252dea78c537/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java#L149> *methods
> of JobInvocation. This is the place where my problems begin:
>
> I want to access the pipeline result and get the MetricResults from it.
> This is possible *only in onSuccess(PipelineResult result) method* of the
> callbacks registered in *start() and* *cancel() *in JobInvocation. Now,
> when I cancel the job invocation, *invocationFuture.cancel()
> <https://github.com/apache/beam/blob/a999e858a7282c8c7d1eea2670df252dea78c537/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java#L148>* is
> called and will result in invoking *onFailure(Throwable throwable) *in
> case the pipeline is still running. *onFailure()* has no PipelineResult
> parameter, hence there currently is no possibility to collect the metrics
> there.
>
> My questions currently are:
>
>    - Should we collect metrics after the job is canceled? So far I
>    assumed that we should.
>    - If so, does anyone have some other ideas on how to collect metrics
>    so that we could collect them when canceling the job?
>
> PR I'm working on with more discussions on the topic: PR 9020
> <https://github.com/apache/beam/pull/9020>
> The current idea on how the metrics could be collected in JobInvocation:
> link
> <https://github.com/apache/beam/pull/9020/files#diff-19f1da178ef8693f13c026d3bf70398a>
>
> Thanks,
> Łukasz
>
>