You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Steven Wu <st...@gmail.com> on 2017/12/21 06:53:38 UTC

entrypoint for executing job in task manager

Here is my understanding of how job submission works in Flink. When
submitting a job to job manager via REST API, we provide a entry class. Job
manager then evaluate job graph and ship serialized operators to task
manager. Task manager then open operators and run tasks.

My app would typically requires some initialization phase to setup my own
running context in task manager (e.g. calling a static method of some
class). Does Flink provide any entry hook in task manager when executing a
job (and tasks)? As for job manager, the entry class provides such hook
where I can initialize my static context.

Thanks,
Steven

Re: entrypoint for executing job in task manager

Posted by Bob Tiernay <bo...@okta.com>.
Please see  FLINK-14184 <https://issues.apache.org/jira/browse/FLINK-14184>  
which should fully support such use cases in the future. Feel free to vote
for it if you believe it would help your use case.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: entrypoint for executing job in task manager

Posted by Bob Tiernay <bo...@okta.com>.
Hi Steven, 

Curious how you solved this for you use case. Did you ever find a
satisfactory approach?

Thanks in advance,

Bob



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: entrypoint for executing job in task manager

Posted by "Jimmy (Yi Pin) Cao" <ji...@gmail.com>.
Well this thread was super helpful. I'm also looking for ways to integrate
DI

In particular something like the JVM init hooks for TM and JM would be
nice.


On Wed, Mar 21, 2018 at 5:08 PM, Steven Wu <st...@gmail.com> wrote:

> Thanks, let me clarify the requirement. Sorry that it wasn't clear in the
> original email.
>
> Here is our setup.
>
> these 3 dirs are added to classpath
> * flink/lib: core flink jars (like flink-dist_2.11,
> flink-shaded-hadoop2-uber)
> * spaaslib: many jars pulled in our internal platform
> * jobs: a single fat jar for app/job code lives here
>
> running Flink
> * "jobmanager.web.upload.dir" is configured to use the "jobs" dir above.
> * we use REST api to submit job in a standalone cluster.
>
> Here are the requirements for two level of init hooks
> 1) provide *JVM init hook* in JobManager and TaskManager class during JVM
> startup for user to extend. right now, we overrides the main method and
> others (as you were also suggesting). that is a little fragile as it is
> tight coupling. JobManager and TaskManager class don't seem to implemented
> for override. we have seen breaking changes when upgrading Flink from 1.2
> -> 1.3 -> 1.4
> 2) *job init hook* during job execution. jobmanager computes the job
> graph, ship it to taskmanager, which then execute the job (open/run
> operators).
>   - my original email is to allow user (app developer) to supply
> additional Guice binding modules at taskmanager with a job init hook. then
> we can create a child injector with these additional modules. but Guice
> child injector has some issue with de-dup modules and run into duplicate
> binding problem. so we decided to not pursue this route
>   - my colleague (Monal) mentioned another use case where we can leverage
> such job init hook at taskmanager. e.g. Inside job init hook, we can decide
> and attach EBS volumes based on key group assignment. Our understanding is
> that such key group assignment is calculated by jobmanager during job
> planning.
>
> The problem with your suggestion of "InjectionUtil.installModulesI
> fNotYetInstalled()" is that it will also be loaded/executed inside
> JobManager when it loads this class. It is just a minor issue though.
>
> Thanks,
> Steven
>
>
> On Wed, Mar 21, 2018 at 9:47 AM, Stephan Ewen <se...@apache.org> wrote:
>
>> It would be great to understand a bit more what the exact requirements
>> here are, and what setup you use.
>>
>> I am not a dependency injection expert, so let me know if what I am
>> suggesting here is complete bogus.
>>
>>
>> *(1) Fix set of libraries for Dependency Injection, or dedicated
>> container images per application*
>>
>> If you have a dedicated JM and TM Flink image that you build per job, I
>> would assume that you also put all the required the libraries directly into
>> the lib folder, so everything is available on startup.
>>
>> In that case, could you just warp the TM and JM main methods to first
>> call the initialization methods to set up dependency injection?
>>
>> This would also work if you have container images that are not
>> job-specific, but all the libraries relevant to dependency injection are
>> part of the image (the lib folder).
>>
>> *(2) Generic container images, plus dynamic set of libraries for
>> dependency injection*
>>
>> Assuming you do not have job-specific container images, and each
>> application brings its own dependencies it wants to set up for dependency
>> injection,
>> we could look in the following direction.
>>
>> The dependencies need to be set up for each Task on the TaskManager  ,
>> because each task gets potentially a dedicated classloader.
>> Have you tried an approach like the following?
>>
>>   - Create a static dependency initializer utility class that has a
>> static " installModulesIfNotYetInstalled ()" method.
>>
>>   - Each class that you use should have as the first line a static
>> initializer block that calls that utility:
>>
>>     public class MyFunction implements MapFunction<A, B> {
>>
>>         static {
>>             InjectionUtil.installModulesIfNotYetInstalled();
>>         }
>>
>>         public A map(B value) {...}
>>
>>         ...
>>     }
>>
>>
>>   - You can probably create yourself a base class that does that from
>> which all you functions extend.
>>
>>
>> On Fri, Dec 22, 2017 at 11:23 AM, Piotr Nowojski <piotr@data-artisans.com
>> > wrote:
>>
>>> I don’t think there is such hook in the Flink code now. You will have to
>>> walk around this issue somehow in user space.
>>>
>>> Maybe you could make a contract that every operator before touching
>>> Guice, should call static synchronized method `initializeGuiceContext`.
>>> This method could search the classpath for classes with some specific
>>> annotations, for example `@MyInitializationHook` and install/add all of
>>> such hooks before actually using Guice?
>>>
>>> Piotrek
>>>
>>>
>>> On 21 Dec 2017, at 17:49, Steven Wu <st...@gmail.com> wrote:
>>>
>>> We use Guice for dependency injection. We need to install *additional*
>>> Guice modules (for bindings) when setting up this static context of Guice
>>> injector.
>>>
>>> Calling the static initializer from operator open method won't really
>>> help. Not all operators are implemented by app developer who want to
>>> install additional Guice modules. E.g. kafka source operator is
>>> implemented/provided by our platform. I think the source operator will open
>>> first, which means app operator won't get a chance to initialize the static
>>> context. What would really help if there is a entry hook (at task manager)
>>> that is executed before any operator opening.
>>>
>>> On Thu, Dec 21, 2017 at 12:27 AM, Piotr Nowojski <
>>> piotr@data-artisans.com> wrote:
>>>
>>>> Open method is called just before any elements are processed. You can
>>>> hook in any initialisation logic there, including initialisation of a
>>>> static context. However keep in mind, that since this context is static, it
>>>> will be shared between multiple operators (if you are running parallelism >
>>>> number of task managers), so accesses to it must be synchronized (including
>>>> initialisation). Another thing to consider is that managing the life cycle
>>>> of static context can be tricky (when to close it and release it’s
>>>> resources).
>>>>
>>>> The questions is, whether you really need a static context?
>>>>
>>>> Thanks,
>>>> Piotrek
>>>>
>>>>
>>>> > On 21 Dec 2017, at 07:53, Steven Wu <st...@gmail.com> wrote:
>>>> >
>>>> > Here is my understanding of how job submission works in Flink. When
>>>> submitting a job to job manager via REST API, we provide a entry class. Job
>>>> manager then evaluate job graph and ship serialized operators to task
>>>> manager. Task manager then open operators and run tasks.
>>>> >
>>>> > My app would typically requires some initialization phase to setup my
>>>> own running context in task manager (e.g. calling a static method of some
>>>> class). Does Flink provide any entry hook in task manager when executing a
>>>> job (and tasks)? As for job manager, the entry class provides such hook
>>>> where I can initialize my static context.
>>>> >
>>>> > Thanks,
>>>> > Steven
>>>>
>>>>
>>>
>>>
>>
>

Re: entrypoint for executing job in task manager

Posted by Steven Wu <st...@gmail.com>.
Thanks, let me clarify the requirement. Sorry that it wasn't clear in the
original email.

Here is our setup.

these 3 dirs are added to classpath
* flink/lib: core flink jars (like flink-dist_2.11,
flink-shaded-hadoop2-uber)
* spaaslib: many jars pulled in our internal platform
* jobs: a single fat jar for app/job code lives here

running Flink
* "jobmanager.web.upload.dir" is configured to use the "jobs" dir above.
* we use REST api to submit job in a standalone cluster.

Here are the requirements for two level of init hooks
1) provide *JVM init hook* in JobManager and TaskManager class during JVM
startup for user to extend. right now, we overrides the main method and
others (as you were also suggesting). that is a little fragile as it is
tight coupling. JobManager and TaskManager class don't seem to implemented
for override. we have seen breaking changes when upgrading Flink from 1.2
-> 1.3 -> 1.4
2) *job init hook* during job execution. jobmanager computes the job graph,
ship it to taskmanager, which then execute the job (open/run operators).
  - my original email is to allow user (app developer) to supply additional
Guice binding modules at taskmanager with a job init hook. then we can
create a child injector with these additional modules. but Guice child
injector has some issue with de-dup modules and run into duplicate binding
problem. so we decided to not pursue this route
  - my colleague (Monal) mentioned another use case where we can leverage
such job init hook at taskmanager. e.g. Inside job init hook, we can decide
and attach EBS volumes based on key group assignment. Our understanding is
that such key group assignment is calculated by jobmanager during job
planning.

The problem with your suggestion of "InjectionUtil.installModulesIf
NotYetInstalled()" is that it will also be loaded/executed inside
JobManager when it loads this class. It is just a minor issue though.

Thanks,
Steven


On Wed, Mar 21, 2018 at 9:47 AM, Stephan Ewen <se...@apache.org> wrote:

> It would be great to understand a bit more what the exact requirements
> here are, and what setup you use.
>
> I am not a dependency injection expert, so let me know if what I am
> suggesting here is complete bogus.
>
>
> *(1) Fix set of libraries for Dependency Injection, or dedicated container
> images per application*
>
> If you have a dedicated JM and TM Flink image that you build per job, I
> would assume that you also put all the required the libraries directly into
> the lib folder, so everything is available on startup.
>
> In that case, could you just warp the TM and JM main methods to first call
> the initialization methods to set up dependency injection?
>
> This would also work if you have container images that are not
> job-specific, but all the libraries relevant to dependency injection are
> part of the image (the lib folder).
>
> *(2) Generic container images, plus dynamic set of libraries for
> dependency injection*
>
> Assuming you do not have job-specific container images, and each
> application brings its own dependencies it wants to set up for dependency
> injection,
> we could look in the following direction.
>
> The dependencies need to be set up for each Task on the TaskManager  ,
> because each task gets potentially a dedicated classloader.
> Have you tried an approach like the following?
>
>   - Create a static dependency initializer utility class that has a static "
> installModulesIfNotYetInstalled ()" method.
>
>   - Each class that you use should have as the first line a static
> initializer block that calls that utility:
>
>     public class MyFunction implements MapFunction<A, B> {
>
>         static {
>             InjectionUtil.installModulesIfNotYetInstalled();
>         }
>
>         public A map(B value) {...}
>
>         ...
>     }
>
>
>   - You can probably create yourself a base class that does that from
> which all you functions extend.
>
>
> On Fri, Dec 22, 2017 at 11:23 AM, Piotr Nowojski <pi...@data-artisans.com>
> wrote:
>
>> I don’t think there is such hook in the Flink code now. You will have to
>> walk around this issue somehow in user space.
>>
>> Maybe you could make a contract that every operator before touching
>> Guice, should call static synchronized method `initializeGuiceContext`.
>> This method could search the classpath for classes with some specific
>> annotations, for example `@MyInitializationHook` and install/add all of
>> such hooks before actually using Guice?
>>
>> Piotrek
>>
>>
>> On 21 Dec 2017, at 17:49, Steven Wu <st...@gmail.com> wrote:
>>
>> We use Guice for dependency injection. We need to install *additional*
>> Guice modules (for bindings) when setting up this static context of Guice
>> injector.
>>
>> Calling the static initializer from operator open method won't really
>> help. Not all operators are implemented by app developer who want to
>> install additional Guice modules. E.g. kafka source operator is
>> implemented/provided by our platform. I think the source operator will open
>> first, which means app operator won't get a chance to initialize the static
>> context. What would really help if there is a entry hook (at task manager)
>> that is executed before any operator opening.
>>
>> On Thu, Dec 21, 2017 at 12:27 AM, Piotr Nowojski <piotr@data-artisans.com
>> > wrote:
>>
>>> Open method is called just before any elements are processed. You can
>>> hook in any initialisation logic there, including initialisation of a
>>> static context. However keep in mind, that since this context is static, it
>>> will be shared between multiple operators (if you are running parallelism >
>>> number of task managers), so accesses to it must be synchronized (including
>>> initialisation). Another thing to consider is that managing the life cycle
>>> of static context can be tricky (when to close it and release it’s
>>> resources).
>>>
>>> The questions is, whether you really need a static context?
>>>
>>> Thanks,
>>> Piotrek
>>>
>>>
>>> > On 21 Dec 2017, at 07:53, Steven Wu <st...@gmail.com> wrote:
>>> >
>>> > Here is my understanding of how job submission works in Flink. When
>>> submitting a job to job manager via REST API, we provide a entry class. Job
>>> manager then evaluate job graph and ship serialized operators to task
>>> manager. Task manager then open operators and run tasks.
>>> >
>>> > My app would typically requires some initialization phase to setup my
>>> own running context in task manager (e.g. calling a static method of some
>>> class). Does Flink provide any entry hook in task manager when executing a
>>> job (and tasks)? As for job manager, the entry class provides such hook
>>> where I can initialize my static context.
>>> >
>>> > Thanks,
>>> > Steven
>>>
>>>
>>
>>
>

Re: entrypoint for executing job in task manager

Posted by Stephan Ewen <se...@apache.org>.
It would be great to understand a bit more what the exact requirements here
are, and what setup you use.

I am not a dependency injection expert, so let me know if what I am
suggesting here is complete bogus.


*(1) Fix set of libraries for Dependency Injection, or dedicated container
images per application*

If you have a dedicated JM and TM Flink image that you build per job, I
would assume that you also put all the required the libraries directly into
the lib folder, so everything is available on startup.

In that case, could you just warp the TM and JM main methods to first call
the initialization methods to set up dependency injection?

This would also work if you have container images that are not
job-specific, but all the libraries relevant to dependency injection are
part of the image (the lib folder).

*(2) Generic container images, plus dynamic set of libraries for dependency
injection*

Assuming you do not have job-specific container images, and each
application brings its own dependencies it wants to set up for dependency
injection,
we could look in the following direction.

The dependencies need to be set up for each Task on the TaskManager  ,
because each task gets potentially a dedicated classloader.
Have you tried an approach like the following?

  - Create a static dependency initializer utility class that has a static "
installModulesIfNotYetInstalled ()" method.

  - Each class that you use should have as the first line a static
initializer block that calls that utility:

    public class MyFunction implements MapFunction<A, B> {

        static {
            InjectionUtil.installModulesIfNotYetInstalled();
        }

        public A map(B value) {...}

        ...
    }


  - You can probably create yourself a base class that does that from which
all you functions extend.


On Fri, Dec 22, 2017 at 11:23 AM, Piotr Nowojski <pi...@data-artisans.com>
wrote:

> I don’t think there is such hook in the Flink code now. You will have to
> walk around this issue somehow in user space.
>
> Maybe you could make a contract that every operator before touching Guice,
> should call static synchronized method `initializeGuiceContext`. This
> method could search the classpath for classes with some specific
> annotations, for example `@MyInitializationHook` and install/add all of
> such hooks before actually using Guice?
>
> Piotrek
>
>
> On 21 Dec 2017, at 17:49, Steven Wu <st...@gmail.com> wrote:
>
> We use Guice for dependency injection. We need to install *additional*
> Guice modules (for bindings) when setting up this static context of Guice
> injector.
>
> Calling the static initializer from operator open method won't really
> help. Not all operators are implemented by app developer who want to
> install additional Guice modules. E.g. kafka source operator is
> implemented/provided by our platform. I think the source operator will open
> first, which means app operator won't get a chance to initialize the static
> context. What would really help if there is a entry hook (at task manager)
> that is executed before any operator opening.
>
> On Thu, Dec 21, 2017 at 12:27 AM, Piotr Nowojski <pi...@data-artisans.com>
> wrote:
>
>> Open method is called just before any elements are processed. You can
>> hook in any initialisation logic there, including initialisation of a
>> static context. However keep in mind, that since this context is static, it
>> will be shared between multiple operators (if you are running parallelism >
>> number of task managers), so accesses to it must be synchronized (including
>> initialisation). Another thing to consider is that managing the life cycle
>> of static context can be tricky (when to close it and release it’s
>> resources).
>>
>> The questions is, whether you really need a static context?
>>
>> Thanks,
>> Piotrek
>>
>>
>> > On 21 Dec 2017, at 07:53, Steven Wu <st...@gmail.com> wrote:
>> >
>> > Here is my understanding of how job submission works in Flink. When
>> submitting a job to job manager via REST API, we provide a entry class. Job
>> manager then evaluate job graph and ship serialized operators to task
>> manager. Task manager then open operators and run tasks.
>> >
>> > My app would typically requires some initialization phase to setup my
>> own running context in task manager (e.g. calling a static method of some
>> class). Does Flink provide any entry hook in task manager when executing a
>> job (and tasks)? As for job manager, the entry class provides such hook
>> where I can initialize my static context.
>> >
>> > Thanks,
>> > Steven
>>
>>
>
>

Re: entrypoint for executing job in task manager

Posted by Piotr Nowojski <pi...@data-artisans.com>.
I don’t think there is such hook in the Flink code now. You will have to walk around this issue somehow in user space. 

Maybe you could make a contract that every operator before touching Guice, should call static synchronized method `initializeGuiceContext`. This method could search the classpath for classes with some specific annotations, for example `@MyInitializationHook` and install/add all of such hooks before actually using Guice?

Piotrek

> On 21 Dec 2017, at 17:49, Steven Wu <st...@gmail.com> wrote:
> 
> We use Guice for dependency injection. We need to install additional Guice modules (for bindings) when setting up this static context of Guice injector.
> 
> Calling the static initializer from operator open method won't really help. Not all operators are implemented by app developer who want to install additional Guice modules. E.g. kafka source operator is implemented/provided by our platform. I think the source operator will open first, which means app operator won't get a chance to initialize the static context. What would really help if there is a entry hook (at task manager) that is executed before any operator opening.
> 
> On Thu, Dec 21, 2017 at 12:27 AM, Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> wrote:
> Open method is called just before any elements are processed. You can hook in any initialisation logic there, including initialisation of a static context. However keep in mind, that since this context is static, it will be shared between multiple operators (if you are running parallelism > number of task managers), so accesses to it must be synchronized (including initialisation). Another thing to consider is that managing the life cycle of static context can be tricky (when to close it and release it’s resources).
> 
> The questions is, whether you really need a static context?
> 
> Thanks,
> Piotrek
> 
> 
> > On 21 Dec 2017, at 07:53, Steven Wu <stevenz3wu@gmail.com <ma...@gmail.com>> wrote:
> >
> > Here is my understanding of how job submission works in Flink. When submitting a job to job manager via REST API, we provide a entry class. Job manager then evaluate job graph and ship serialized operators to task manager. Task manager then open operators and run tasks.
> >
> > My app would typically requires some initialization phase to setup my own running context in task manager (e.g. calling a static method of some class). Does Flink provide any entry hook in task manager when executing a job (and tasks)? As for job manager, the entry class provides such hook where I can initialize my static context.
> >
> > Thanks,
> > Steven
> 
> 


Re: entrypoint for executing job in task manager

Posted by Steven Wu <st...@gmail.com>.
We use Guice for dependency injection. We need to install *additional*
Guice modules (for bindings) when setting up this static context of Guice
injector.

Calling the static initializer from operator open method won't really help.
Not all operators are implemented by app developer who want to install
additional Guice modules. E.g. kafka source operator is
implemented/provided by our platform. I think the source operator will open
first, which means app operator won't get a chance to initialize the static
context. What would really help if there is a entry hook (at task manager)
that is executed before any operator opening.

On Thu, Dec 21, 2017 at 12:27 AM, Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Open method is called just before any elements are processed. You can hook
> in any initialisation logic there, including initialisation of a static
> context. However keep in mind, that since this context is static, it will
> be shared between multiple operators (if you are running parallelism >
> number of task managers), so accesses to it must be synchronized (including
> initialisation). Another thing to consider is that managing the life cycle
> of static context can be tricky (when to close it and release it’s
> resources).
>
> The questions is, whether you really need a static context?
>
> Thanks,
> Piotrek
>
>
> > On 21 Dec 2017, at 07:53, Steven Wu <st...@gmail.com> wrote:
> >
> > Here is my understanding of how job submission works in Flink. When
> submitting a job to job manager via REST API, we provide a entry class. Job
> manager then evaluate job graph and ship serialized operators to task
> manager. Task manager then open operators and run tasks.
> >
> > My app would typically requires some initialization phase to setup my
> own running context in task manager (e.g. calling a static method of some
> class). Does Flink provide any entry hook in task manager when executing a
> job (and tasks)? As for job manager, the entry class provides such hook
> where I can initialize my static context.
> >
> > Thanks,
> > Steven
>
>

Re: entrypoint for executing job in task manager

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Open method is called just before any elements are processed. You can hook in any initialisation logic there, including initialisation of a static context. However keep in mind, that since this context is static, it will be shared between multiple operators (if you are running parallelism > number of task managers), so accesses to it must be synchronized (including initialisation). Another thing to consider is that managing the life cycle of static context can be tricky (when to close it and release it’s resources).

The questions is, whether you really need a static context?

Thanks,
Piotrek


> On 21 Dec 2017, at 07:53, Steven Wu <st...@gmail.com> wrote:
> 
> Here is my understanding of how job submission works in Flink. When submitting a job to job manager via REST API, we provide a entry class. Job manager then evaluate job graph and ship serialized operators to task manager. Task manager then open operators and run tasks.
> 
> My app would typically requires some initialization phase to setup my own running context in task manager (e.g. calling a static method of some class). Does Flink provide any entry hook in task manager when executing a job (and tasks)? As for job manager, the entry class provides such hook where I can initialize my static context.
> 
> Thanks,
> Steven