You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by XiaoChuan Yu <xi...@kik.com> on 2018/03/13 18:30:29 UTC

Dependency Injection and Flink

Hi,

I'm evaluating Flink with the intent to integrate it into a Java project
that uses a lot of dependency injection via Guice. What would be the best
way to work with DI/Guice given that injected fields aren't Serializable?
I looked at this StackOverflow answer so far. To my understanding the
strategy is as follows but I'm not sure about step 3:

   1. Use a RichFunction any time injection required.
   2. Do not use @Inject, instead mark each injected field as transient.
   3. Implement open() / close() and manually assign values to injected
   fields using Injector.getInstance(SomeClass.class)? But where do I get the
   injector? Create one on the spot each time? Keep one as a static var
   somewhere and use everywhere?

Example:
 public class MyFilter extends FilterFunction<String> {
     private transient DbClient dbClient;
     //@Inject DbClient dbClient; //typical Guice field injection

     public void open(Configuration parameters) {
         // where am I suppose to get the injector?
         // keep it as a static variable somewhere and init it in Main?
         this.dbClient =
MyInjectorHolder.injector().getInstance(DbClient.class);
     }
     public boolean filter(String value) {
         return this.dbClient.query(value);
     }
 }
I haven't setup a Flink environment to try the above yet though.
Does anyone know of a less verbose way?
I imagine this could get quite verbose with multiple injected fields.

Thanks,
Xiaochuan Yu

Re: Dependency Injection and Flink

Posted by Steven Wu <st...@gmail.com>.
Stephan,

That would be helpful. On job manager side, entry class provides such an
entry point hook. The problem is on the task manager side, where we don't
have such an initialization/entry point.

I have brought up the same question 3 months ago in this list with subject
"entrypoint for executing job in task manager".

Thanks,
Steven



On Thu, Mar 15, 2018 at 1:49 PM, Stephan Ewen <se...@apache.org> wrote:

> Would it help to be able to register "initializers", meaning some
> classes/methods that will be called at every process entry point, to set up
> something like this?
>
>
> On Tue, Mar 13, 2018 at 7:56 PM, Steven Wu <st...@gmail.com> wrote:
>
>> Xiaochuan,
>>
>> We are doing exactly as you described. We keep the injector as a global
>> static var.
>>
>> But we extend from FlinkJobManager and FlinkTaskManager to override main
>> method and initialize the injector (and other things) during JVM startup,
>> which does cause tight code coupling. It is a little painful to upgrade
>> Flink because sometimes internal code structure change of FlinkJobManager
>> and FlinkTaskManager can break our extended class..
>>
>> Thanks,
>> Steven
>>
>>
>> On Tue, Mar 13, 2018 at 11:30 AM, XiaoChuan Yu <xi...@kik.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I'm evaluating Flink with the intent to integrate it into a Java project
>>> that uses a lot of dependency injection via Guice. What would be the best
>>> way to work with DI/Guice given that injected fields aren't Serializable?
>>> I looked at this StackOverflow answer so far. To my understanding the
>>> strategy is as follows but I'm not sure about step 3:
>>>
>>>    1. Use a RichFunction any time injection required.
>>>    2. Do not use @Inject, instead mark each injected field as transient.
>>>    3. Implement open() / close() and manually assign values to injected
>>>    fields using Injector.getInstance(SomeClass.class)? But where do I
>>>    get the injector? Create one on the spot each time? Keep one as a static
>>>    var somewhere and use everywhere?
>>>
>>> Example:
>>>  public class MyFilter extends FilterFunction<String> {
>>>      private transient DbClient dbClient;
>>>      //@Inject DbClient dbClient; //typical Guice field injection
>>>
>>>      public void open(Configuration parameters) {
>>>          // where am I suppose to get the injector?
>>>          // keep it as a static variable somewhere and init it in Main?
>>>          this.dbClient = MyInjectorHolder.injector().ge
>>> tInstance(DbClient.class);
>>>      }
>>>      public boolean filter(String value) {
>>>          return this.dbClient.query(value);
>>>      }
>>>  }
>>> I haven't setup a Flink environment to try the above yet though.
>>> Does anyone know of a less verbose way?
>>> I imagine this could get quite verbose with multiple injected fields.
>>>
>>> Thanks,
>>> Xiaochuan Yu
>>>
>>>
>>
>

Re: Dependency Injection and Flink

Posted by Stephan Ewen <se...@apache.org>.
Would it help to be able to register "initializers", meaning some
classes/methods that will be called at every process entry point, to set up
something like this?


On Tue, Mar 13, 2018 at 7:56 PM, Steven Wu <st...@gmail.com> wrote:

> Xiaochuan,
>
> We are doing exactly as you described. We keep the injector as a global
> static var.
>
> But we extend from FlinkJobManager and FlinkTaskManager to override main
> method and initialize the injector (and other things) during JVM startup,
> which does cause tight code coupling. It is a little painful to upgrade
> Flink because sometimes internal code structure change of FlinkJobManager
> and FlinkTaskManager can break our extended class..
>
> Thanks,
> Steven
>
>
> On Tue, Mar 13, 2018 at 11:30 AM, XiaoChuan Yu <xi...@kik.com>
> wrote:
>
>> Hi,
>>
>> I'm evaluating Flink with the intent to integrate it into a Java project
>> that uses a lot of dependency injection via Guice. What would be the best
>> way to work with DI/Guice given that injected fields aren't Serializable?
>> I looked at this StackOverflow answer so far. To my understanding the
>> strategy is as follows but I'm not sure about step 3:
>>
>>    1. Use a RichFunction any time injection required.
>>    2. Do not use @Inject, instead mark each injected field as transient.
>>    3. Implement open() / close() and manually assign values to injected
>>    fields using Injector.getInstance(SomeClass.class)? But where do I
>>    get the injector? Create one on the spot each time? Keep one as a static
>>    var somewhere and use everywhere?
>>
>> Example:
>>  public class MyFilter extends FilterFunction<String> {
>>      private transient DbClient dbClient;
>>      //@Inject DbClient dbClient; //typical Guice field injection
>>
>>      public void open(Configuration parameters) {
>>          // where am I suppose to get the injector?
>>          // keep it as a static variable somewhere and init it in Main?
>>          this.dbClient = MyInjectorHolder.injector().ge
>> tInstance(DbClient.class);
>>      }
>>      public boolean filter(String value) {
>>          return this.dbClient.query(value);
>>      }
>>  }
>> I haven't setup a Flink environment to try the above yet though.
>> Does anyone know of a less verbose way?
>> I imagine this could get quite verbose with multiple injected fields.
>>
>> Thanks,
>> Xiaochuan Yu
>>
>>
>

Re: Dependency Injection and Flink

Posted by Steven Wu <st...@gmail.com>.
Xiaochuan,

We are doing exactly as you described. We keep the injector as a global
static var.

But we extend from FlinkJobManager and FlinkTaskManager to override main
method and initialize the injector (and other things) during JVM startup,
which does cause tight code coupling. It is a little painful to upgrade
Flink because sometimes internal code structure change of FlinkJobManager
and FlinkTaskManager can break our extended class..

Thanks,
Steven


On Tue, Mar 13, 2018 at 11:30 AM, XiaoChuan Yu <xi...@kik.com> wrote:

> Hi,
>
> I'm evaluating Flink with the intent to integrate it into a Java project
> that uses a lot of dependency injection via Guice. What would be the best
> way to work with DI/Guice given that injected fields aren't Serializable?
> I looked at this StackOverflow answer so far. To my understanding the
> strategy is as follows but I'm not sure about step 3:
>
>    1. Use a RichFunction any time injection required.
>    2. Do not use @Inject, instead mark each injected field as transient.
>    3. Implement open() / close() and manually assign values to injected
>    fields using Injector.getInstance(SomeClass.class)? But where do I get
>    the injector? Create one on the spot each time? Keep one as a static var
>    somewhere and use everywhere?
>
> Example:
>  public class MyFilter extends FilterFunction<String> {
>      private transient DbClient dbClient;
>      //@Inject DbClient dbClient; //typical Guice field injection
>
>      public void open(Configuration parameters) {
>          // where am I suppose to get the injector?
>          // keep it as a static variable somewhere and init it in Main?
>          this.dbClient = MyInjectorHolder.injector().
> getInstance(DbClient.class);
>      }
>      public boolean filter(String value) {
>          return this.dbClient.query(value);
>      }
>  }
> I haven't setup a Flink environment to try the above yet though.
> Does anyone know of a less verbose way?
> I imagine this could get quite verbose with multiple injected fields.
>
> Thanks,
> Xiaochuan Yu
>
>