You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Sam Rohde <sr...@google.com> on 2019/01/14 17:39:47 UTC

Beam JobService Problem

Hello all,

While going through the codebase I noticed a problem with the Beam
JobService. In particular, the API allows for the possibility of never
seeing some messages or states with Get(State|Message)Stream. This is
because the  Get(State|Message)Stream calls need to have the job id which
can only be obtained from the RunJobResponse. But in order to see all
messages/states the streams need to be opened before the job starts.

This is fine in Dataflow as the preparation_id == job_id, but this is not
true in Flink. What do you all think of this? Am I misunderstanding
something?

Thanks,
Sam

Re: Beam JobService Problem

Posted by Lukasz Cwik <lc...@google.com>.
I believe at one point in time we wanted to separate the preparation_id
from the job_id so that you could have one definition but multiple
instances of it. (e.g. preparation_id is a class name while job_id is the
pointer to the instance of the class)

On Tue, Jan 15, 2019 at 1:45 PM Sam Rohde <sr...@google.com> wrote:

> On Tue, Jan 15, 2019 at 5:23 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Tue, Jan 15, 2019 at 1:19 AM Ankur Goenka <go...@google.com> wrote:
>> >
>> > Thanks Sam for bringing this to the list.
>> >
>> > As preparation_ids are not reusable, having preparation_id and job_id
>> same makes sense to me for Flink.
>>
>> I think we change the protocol and only have one kind of ID. As well
>> as solving the problem at hand, it also simplifies the API.
>>
> That sounds fantastic.
>
> On Tue, Jan 15, 2019 at 5:23 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Tue, Jan 15, 2019 at 1:19 AM Ankur Goenka <go...@google.com> wrote:
>
> > Another option is to have a subscription for all states/messages on the
>> JobServer.
>> The problem is forcing the job service to remember all logs that were
>> ever logged ever in case someone requests them at some future date.
>> Best to have a way to register a listener earlier.
>
> I agree with Robert that it should be the caller in charge of what to do
> with generated monitoring data. This is especially true with long-running
> jobs that generate potentially gigabytes worth of logs.
>
> I made https://issues.apache.org/jira/browse/BEAM-6442 to track this. Let
> me know if I missed anything.
>
> On Tue, Jan 15, 2019 at 5:23 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Tue, Jan 15, 2019 at 1:19 AM Ankur Goenka <go...@google.com> wrote:
>> >
>> > Thanks Sam for bringing this to the list.
>> >
>> > As preparation_ids are not reusable, having preparation_id and job_id
>> same makes sense to me for Flink.
>>
>> I think we change the protocol and only have one kind of ID. As well
>> as solving the problem at hand, it also simplifies the API.
>>
>> > Another option is to have a subscription for all states/messages on the
>> JobServer.
>>
>> The problem is forcing the job service to remember all logs that were
>> ever logged ever in case someone requests them at some future date.
>> Best to have a way to register a listener earlier.
>>
>> > This will be similar to "docker". As the container id is created after
>> the container creation, the only way to get the container creation even is
>> to start "docker events" before starting a container.
>> >
>> > On Mon, Jan 14, 2019 at 11:13 AM Maximilian Michels <mx...@apache.org>
>> wrote:
>> >>
>> >> Hi Sam,
>> >>
>> >> Good observation. Looks like we should fix that.
>> >>
>> >> Looking at InMemoryJobService, it appears that the state can only be
>> retrieved
>> >> by the client once the job is running with a job/invocation id
>> associated.
>> >> Indeed, any messages until that could be lost.
>> >>
>> >> For Flink the JobId is generated here:
>> >>
>> https://github.com/apache/beam/blob/3db71dd9f6f32684903c54b15a5368991cd41f36/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java#L64
>> >>
>> >> I don't see any benefit of having two separate IDs, as the IDs are
>> already
>> >> scoped by preparation and invocation phase.
>> >>
>> >> - Would it be possible to just pass the preparation id as the
>> invocation id at
>> >> JobInvoker#invoke(..)?
>> >>
>> >> - Alternatively, we could have an additional prepare phase for
>> JobInvoker to get
>> >> the job id for the invocation, before we start the job.
>> >>
>> >> Thanks,
>> >> Max
>> >>
>> >> On 14.01.19 12:39, Sam Rohde wrote:
>> >> > Hello all,
>> >> >
>> >> > While going through the codebase I noticed a problem with the Beam
>> JobService.
>> >> > In particular, the API allows for the possibility of never seeing
>> some messages
>> >> > or states with Get(State|Message)Stream. This is because the
>> >> > Get(State|Message)Stream calls need to have the job id which can
>> only be
>> >> > obtained from the RunJobResponse. But in order to see all
>> messages/states the
>> >> > streams need to be opened before the job starts.
>> >> >
>> >> > This is fine in Dataflow as the preparation_id == job_id, but this
>> is not true
>> >> > in Flink. What do you all think of this? Am I misunderstanding
>> something?
>> >> >
>> >> > Thanks,
>> >> > Sam
>> >> >
>>
>

Re: Beam JobService Problem

Posted by Sam Rohde <sr...@google.com>.
On Tue, Jan 15, 2019 at 5:23 AM Robert Bradshaw <ro...@google.com> wrote:

> On Tue, Jan 15, 2019 at 1:19 AM Ankur Goenka <go...@google.com> wrote:
> >
> > Thanks Sam for bringing this to the list.
> >
> > As preparation_ids are not reusable, having preparation_id and job_id
> same makes sense to me for Flink.
>
> I think we change the protocol and only have one kind of ID. As well
> as solving the problem at hand, it also simplifies the API.
>
That sounds fantastic.

On Tue, Jan 15, 2019 at 5:23 AM Robert Bradshaw <ro...@google.com> wrote:

> On Tue, Jan 15, 2019 at 1:19 AM Ankur Goenka <go...@google.com> wrote:

> Another option is to have a subscription for all states/messages on the
> JobServer.
> The problem is forcing the job service to remember all logs that were
> ever logged ever in case someone requests them at some future date.
> Best to have a way to register a listener earlier.

I agree with Robert that it should be the caller in charge of what to do
with generated monitoring data. This is especially true with long-running
jobs that generate potentially gigabytes worth of logs.

I made https://issues.apache.org/jira/browse/BEAM-6442 to track this. Let
me know if I missed anything.

On Tue, Jan 15, 2019 at 5:23 AM Robert Bradshaw <ro...@google.com> wrote:

> On Tue, Jan 15, 2019 at 1:19 AM Ankur Goenka <go...@google.com> wrote:
> >
> > Thanks Sam for bringing this to the list.
> >
> > As preparation_ids are not reusable, having preparation_id and job_id
> same makes sense to me for Flink.
>
> I think we change the protocol and only have one kind of ID. As well
> as solving the problem at hand, it also simplifies the API.
>
> > Another option is to have a subscription for all states/messages on the
> JobServer.
>
> The problem is forcing the job service to remember all logs that were
> ever logged ever in case someone requests them at some future date.
> Best to have a way to register a listener earlier.
>
> > This will be similar to "docker". As the container id is created after
> the container creation, the only way to get the container creation even is
> to start "docker events" before starting a container.
> >
> > On Mon, Jan 14, 2019 at 11:13 AM Maximilian Michels <mx...@apache.org>
> wrote:
> >>
> >> Hi Sam,
> >>
> >> Good observation. Looks like we should fix that.
> >>
> >> Looking at InMemoryJobService, it appears that the state can only be
> retrieved
> >> by the client once the job is running with a job/invocation id
> associated.
> >> Indeed, any messages until that could be lost.
> >>
> >> For Flink the JobId is generated here:
> >>
> https://github.com/apache/beam/blob/3db71dd9f6f32684903c54b15a5368991cd41f36/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java#L64
> >>
> >> I don't see any benefit of having two separate IDs, as the IDs are
> already
> >> scoped by preparation and invocation phase.
> >>
> >> - Would it be possible to just pass the preparation id as the
> invocation id at
> >> JobInvoker#invoke(..)?
> >>
> >> - Alternatively, we could have an additional prepare phase for
> JobInvoker to get
> >> the job id for the invocation, before we start the job.
> >>
> >> Thanks,
> >> Max
> >>
> >> On 14.01.19 12:39, Sam Rohde wrote:
> >> > Hello all,
> >> >
> >> > While going through the codebase I noticed a problem with the Beam
> JobService.
> >> > In particular, the API allows for the possibility of never seeing
> some messages
> >> > or states with Get(State|Message)Stream. This is because the
> >> > Get(State|Message)Stream calls need to have the job id which can only
> be
> >> > obtained from the RunJobResponse. But in order to see all
> messages/states the
> >> > streams need to be opened before the job starts.
> >> >
> >> > This is fine in Dataflow as the preparation_id == job_id, but this is
> not true
> >> > in Flink. What do you all think of this? Am I misunderstanding
> something?
> >> >
> >> > Thanks,
> >> > Sam
> >> >
>

Re: Beam JobService Problem

Posted by Robert Bradshaw <ro...@google.com>.
On Tue, Jan 15, 2019 at 1:19 AM Ankur Goenka <go...@google.com> wrote:
>
> Thanks Sam for bringing this to the list.
>
> As preparation_ids are not reusable, having preparation_id and job_id same makes sense to me for Flink.

I think we change the protocol and only have one kind of ID. As well
as solving the problem at hand, it also simplifies the API.

> Another option is to have a subscription for all states/messages on the JobServer.

The problem is forcing the job service to remember all logs that were
ever logged ever in case someone requests them at some future date.
Best to have a way to register a listener earlier.

> This will be similar to "docker". As the container id is created after the container creation, the only way to get the container creation even is to start "docker events" before starting a container.
>
> On Mon, Jan 14, 2019 at 11:13 AM Maximilian Michels <mx...@apache.org> wrote:
>>
>> Hi Sam,
>>
>> Good observation. Looks like we should fix that.
>>
>> Looking at InMemoryJobService, it appears that the state can only be retrieved
>> by the client once the job is running with a job/invocation id associated.
>> Indeed, any messages until that could be lost.
>>
>> For Flink the JobId is generated here:
>> https://github.com/apache/beam/blob/3db71dd9f6f32684903c54b15a5368991cd41f36/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java#L64
>>
>> I don't see any benefit of having two separate IDs, as the IDs are already
>> scoped by preparation and invocation phase.
>>
>> - Would it be possible to just pass the preparation id as the invocation id at
>> JobInvoker#invoke(..)?
>>
>> - Alternatively, we could have an additional prepare phase for JobInvoker to get
>> the job id for the invocation, before we start the job.
>>
>> Thanks,
>> Max
>>
>> On 14.01.19 12:39, Sam Rohde wrote:
>> > Hello all,
>> >
>> > While going through the codebase I noticed a problem with the Beam JobService.
>> > In particular, the API allows for the possibility of never seeing some messages
>> > or states with Get(State|Message)Stream. This is because the
>> > Get(State|Message)Stream calls need to have the job id which can only be
>> > obtained from the RunJobResponse. But in order to see all messages/states the
>> > streams need to be opened before the job starts.
>> >
>> > This is fine in Dataflow as the preparation_id == job_id, but this is not true
>> > in Flink. What do you all think of this? Am I misunderstanding something?
>> >
>> > Thanks,
>> > Sam
>> >

Re: Beam JobService Problem

Posted by Ankur Goenka <go...@google.com>.
Thanks Sam for bringing this to the list.

As preparation_ids are not reusable, having preparation_id and job_id same
makes sense to me for Flink.

Another option is to have a subscription for all states/messages on the
JobServer.
This will be similar to "docker". As the container id is created after the
container creation, the only way to get the container creation even is to
start "docker events" before starting a container.




On Mon, Jan 14, 2019 at 11:13 AM Maximilian Michels <mx...@apache.org> wrote:

> Hi Sam,
>
> Good observation. Looks like we should fix that.
>
> Looking at InMemoryJobService, it appears that the state can only be
> retrieved
> by the client once the job is running with a job/invocation id associated.
> Indeed, any messages until that could be lost.
>
> For Flink the JobId is generated here:
>
> https://github.com/apache/beam/blob/3db71dd9f6f32684903c54b15a5368991cd41f36/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java#L64
>
> I don't see any benefit of having two separate IDs, as the IDs are already
> scoped by preparation and invocation phase.
>
> - Would it be possible to just pass the preparation id as the invocation
> id at
> JobInvoker#invoke(..)?
>
> - Alternatively, we could have an additional prepare phase for JobInvoker
> to get
> the job id for the invocation, before we start the job.
>
> Thanks,
> Max
>
> On 14.01.19 12:39, Sam Rohde wrote:
> > Hello all,
> >
> > While going through the codebase I noticed a problem with the Beam
> JobService.
> > In particular, the API allows for the possibility of never seeing some
> messages
> > or states with Get(State|Message)Stream. This is because the
> > Get(State|Message)Stream calls need to have the job id which can only be
> > obtained from the RunJobResponse. But in order to see all
> messages/states the
> > streams need to be opened before the job starts.
> >
> > This is fine in Dataflow as the preparation_id == job_id, but this is
> not true
> > in Flink. What do you all think of this? Am I misunderstanding something?
> >
> > Thanks,
> > Sam
> >
>

Re: Beam JobService Problem

Posted by Maximilian Michels <mx...@apache.org>.
Hi Sam,

Good observation. Looks like we should fix that.

Looking at InMemoryJobService, it appears that the state can only be retrieved 
by the client once the job is running with a job/invocation id associated. 
Indeed, any messages until that could be lost.

For Flink the JobId is generated here:
https://github.com/apache/beam/blob/3db71dd9f6f32684903c54b15a5368991cd41f36/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java#L64

I don't see any benefit of having two separate IDs, as the IDs are already 
scoped by preparation and invocation phase.

- Would it be possible to just pass the preparation id as the invocation id at 
JobInvoker#invoke(..)?

- Alternatively, we could have an additional prepare phase for JobInvoker to get 
the job id for the invocation, before we start the job.

Thanks,
Max

On 14.01.19 12:39, Sam Rohde wrote:
> Hello all,
> 
> While going through the codebase I noticed a problem with the Beam JobService. 
> In particular, the API allows for the possibility of never seeing some messages 
> or states with Get(State|Message)Stream. This is because the  
> Get(State|Message)Stream calls need to have the job id which can only be 
> obtained from the RunJobResponse. But in order to see all messages/states the 
> streams need to be opened before the job starts.
> 
> This is fine in Dataflow as the preparation_id == job_id, but this is not true 
> in Flink. What do you all think of this? Am I misunderstanding something?
> 
> Thanks,
> Sam
>