You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Timur Fayruzov <ti...@gmail.com> on 2016/04/21 22:21:40 UTC

Access to a shared resource within a mapper

Hello,

I'm writing a Scala Flink application. I have a standalone process that
exists on every Flink node that I need to call to transform my data. To
access this process I need to initialize non thread-safe client first. I
would like to avoid initializing a client for each element being
transformed. A straightforward implementation would be something like this:
```

val env = ExecutionEnvironment.getExecutionEnvironment
val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c"))))
val pool  = new ArrayBlockingQueue[Client](5)
// pool is filled here
data.map(e => {
  val client = pool.take()
  val res = client.transform(e)
  pool.put(client)
  res
})

```
However, this causes a runtime exception with message "Task not
serializable", which makes sense.

Function parameters and broadcast variables won't work either as far as I
understand. Is there a way to make this happen?

Thanks,
Timur

Re: Access to a shared resource within a mapper

Posted by Timur Fayruzov <ti...@gmail.com>.
Hi Fabian,

I didn't realize you meant that lazy val should be inside RichMapFunction
implementation, it makes sense. That's what I ended up doing already.

Thanks!
Timur

On Mon, Apr 25, 2016 at 3:34 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Timur,
>
> a TaskManager may run as many subtasks of a Map operator as it has slots.
> Each subtask of an operator runs in a different thread. Each parallel
> subtask of a Map operator has its own MapFunction object, so it should be
> possible to use a lazy val.
>
> However, you should not use static variables to hold state, because these
> are shared between all MapFunction in a TaskManager (JVM).
>
> 2016-04-22 21:21 GMT+02:00 Timur Fayruzov <ti...@gmail.com>:
>
>> Actually, a follow-up question: is map function single-threaded (within
>> one task manager, that is). If it's not then lazy initialization wont'
>> work, is it right?
>>
>> On Fri, Apr 22, 2016 at 11:50 AM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> You may also be able to initialize the client only in the parallel
>>> execution by making it a "lazy" variable in Scala.
>>>
>>> On Fri, Apr 22, 2016 at 11:46 AM, Timur Fayruzov <
>>> timur.fairuzov@gmail.com> wrote:
>>>
>>>> Outstanding! Thanks, Aljoscha.
>>>>
>>>> On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek <al...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> you could use a RichMapFunction that has an open method:
>>>>>
>>>>> data.map(new RichMapFunction[...]() {
>>>>>   def open(): () = {
>>>>>     // initialize client
>>>>>   }
>>>>>
>>>>>   def map(input: INT): OUT = {
>>>>>     // use client
>>>>>   }
>>>>> }
>>>>>
>>>>> the open() method is called before any elements are passed to the
>>>>> function. The counterpart of open() is close(), which is called after all
>>>>> elements are through or if the job cancels.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>
>>>>> On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <ti...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I'm writing a Scala Flink application. I have a standalone process
>>>>>> that exists on every Flink node that I need to call to transform my data.
>>>>>> To access this process I need to initialize non thread-safe client first. I
>>>>>> would like to avoid initializing a client for each element being
>>>>>> transformed. A straightforward implementation would be something like this:
>>>>>> ```
>>>>>>
>>>>>> val env = ExecutionEnvironment.getExecutionEnvironment
>>>>>> val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c"))))
>>>>>> val pool  = new ArrayBlockingQueue[Client](5)
>>>>>> // pool is filled here
>>>>>> data.map(e => {
>>>>>>   val client = pool.take()
>>>>>>   val res = client.transform(e)
>>>>>>   pool.put(client)
>>>>>>   res
>>>>>> })
>>>>>>
>>>>>> ```
>>>>>> However, this causes a runtime exception with message "Task not
>>>>>> serializable", which makes sense.
>>>>>>
>>>>>> Function parameters and broadcast variables won't work either as far
>>>>>> as I understand. Is there a way to make this happen?
>>>>>>
>>>>>> Thanks,
>>>>>> Timur
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Access to a shared resource within a mapper

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Timur,

a TaskManager may run as many subtasks of a Map operator as it has slots.
Each subtask of an operator runs in a different thread. Each parallel
subtask of a Map operator has its own MapFunction object, so it should be
possible to use a lazy val.

However, you should not use static variables to hold state, because these
are shared between all MapFunction in a TaskManager (JVM).

2016-04-22 21:21 GMT+02:00 Timur Fayruzov <ti...@gmail.com>:

> Actually, a follow-up question: is map function single-threaded (within
> one task manager, that is). If it's not then lazy initialization wont'
> work, is it right?
>
> On Fri, Apr 22, 2016 at 11:50 AM, Stephan Ewen <se...@apache.org> wrote:
>
>> You may also be able to initialize the client only in the parallel
>> execution by making it a "lazy" variable in Scala.
>>
>> On Fri, Apr 22, 2016 at 11:46 AM, Timur Fayruzov <
>> timur.fairuzov@gmail.com> wrote:
>>
>>> Outstanding! Thanks, Aljoscha.
>>>
>>> On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>> you could use a RichMapFunction that has an open method:
>>>>
>>>> data.map(new RichMapFunction[...]() {
>>>>   def open(): () = {
>>>>     // initialize client
>>>>   }
>>>>
>>>>   def map(input: INT): OUT = {
>>>>     // use client
>>>>   }
>>>> }
>>>>
>>>> the open() method is called before any elements are passed to the
>>>> function. The counterpart of open() is close(), which is called after all
>>>> elements are through or if the job cancels.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <ti...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I'm writing a Scala Flink application. I have a standalone process
>>>>> that exists on every Flink node that I need to call to transform my data.
>>>>> To access this process I need to initialize non thread-safe client first. I
>>>>> would like to avoid initializing a client for each element being
>>>>> transformed. A straightforward implementation would be something like this:
>>>>> ```
>>>>>
>>>>> val env = ExecutionEnvironment.getExecutionEnvironment
>>>>> val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c"))))
>>>>> val pool  = new ArrayBlockingQueue[Client](5)
>>>>> // pool is filled here
>>>>> data.map(e => {
>>>>>   val client = pool.take()
>>>>>   val res = client.transform(e)
>>>>>   pool.put(client)
>>>>>   res
>>>>> })
>>>>>
>>>>> ```
>>>>> However, this causes a runtime exception with message "Task not
>>>>> serializable", which makes sense.
>>>>>
>>>>> Function parameters and broadcast variables won't work either as far
>>>>> as I understand. Is there a way to make this happen?
>>>>>
>>>>> Thanks,
>>>>> Timur
>>>>>
>>>>
>>>
>>
>

Re: Access to a shared resource within a mapper

Posted by Timur Fayruzov <ti...@gmail.com>.
Actually, a follow-up question: is map function single-threaded (within one
task manager, that is). If it's not then lazy initialization wont' work, is
it right?

On Fri, Apr 22, 2016 at 11:50 AM, Stephan Ewen <se...@apache.org> wrote:

> You may also be able to initialize the client only in the parallel
> execution by making it a "lazy" variable in Scala.
>
> On Fri, Apr 22, 2016 at 11:46 AM, Timur Fayruzov <timur.fairuzov@gmail.com
> > wrote:
>
>> Outstanding! Thanks, Aljoscha.
>>
>> On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> Hi,
>>> you could use a RichMapFunction that has an open method:
>>>
>>> data.map(new RichMapFunction[...]() {
>>>   def open(): () = {
>>>     // initialize client
>>>   }
>>>
>>>   def map(input: INT): OUT = {
>>>     // use client
>>>   }
>>> }
>>>
>>> the open() method is called before any elements are passed to the
>>> function. The counterpart of open() is close(), which is called after all
>>> elements are through or if the job cancels.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <ti...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I'm writing a Scala Flink application. I have a standalone process that
>>>> exists on every Flink node that I need to call to transform my data. To
>>>> access this process I need to initialize non thread-safe client first. I
>>>> would like to avoid initializing a client for each element being
>>>> transformed. A straightforward implementation would be something like this:
>>>> ```
>>>>
>>>> val env = ExecutionEnvironment.getExecutionEnvironment
>>>> val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c"))))
>>>> val pool  = new ArrayBlockingQueue[Client](5)
>>>> // pool is filled here
>>>> data.map(e => {
>>>>   val client = pool.take()
>>>>   val res = client.transform(e)
>>>>   pool.put(client)
>>>>   res
>>>> })
>>>>
>>>> ```
>>>> However, this causes a runtime exception with message "Task not
>>>> serializable", which makes sense.
>>>>
>>>> Function parameters and broadcast variables won't work either as far as
>>>> I understand. Is there a way to make this happen?
>>>>
>>>> Thanks,
>>>> Timur
>>>>
>>>
>>
>

Re: Access to a shared resource within a mapper

Posted by Stephan Ewen <se...@apache.org>.
You may also be able to initialize the client only in the parallel
execution by making it a "lazy" variable in Scala.

On Fri, Apr 22, 2016 at 11:46 AM, Timur Fayruzov <ti...@gmail.com>
wrote:

> Outstanding! Thanks, Aljoscha.
>
> On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>> you could use a RichMapFunction that has an open method:
>>
>> data.map(new RichMapFunction[...]() {
>>   def open(): () = {
>>     // initialize client
>>   }
>>
>>   def map(input: INT): OUT = {
>>     // use client
>>   }
>> }
>>
>> the open() method is called before any elements are passed to the
>> function. The counterpart of open() is close(), which is called after all
>> elements are through or if the job cancels.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <ti...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I'm writing a Scala Flink application. I have a standalone process that
>>> exists on every Flink node that I need to call to transform my data. To
>>> access this process I need to initialize non thread-safe client first. I
>>> would like to avoid initializing a client for each element being
>>> transformed. A straightforward implementation would be something like this:
>>> ```
>>>
>>> val env = ExecutionEnvironment.getExecutionEnvironment
>>> val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c"))))
>>> val pool  = new ArrayBlockingQueue[Client](5)
>>> // pool is filled here
>>> data.map(e => {
>>>   val client = pool.take()
>>>   val res = client.transform(e)
>>>   pool.put(client)
>>>   res
>>> })
>>>
>>> ```
>>> However, this causes a runtime exception with message "Task not
>>> serializable", which makes sense.
>>>
>>> Function parameters and broadcast variables won't work either as far as
>>> I understand. Is there a way to make this happen?
>>>
>>> Thanks,
>>> Timur
>>>
>>
>

Re: Access to a shared resource within a mapper

Posted by Timur Fayruzov <ti...@gmail.com>.
Outstanding! Thanks, Aljoscha.

On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> you could use a RichMapFunction that has an open method:
>
> data.map(new RichMapFunction[...]() {
>   def open(): () = {
>     // initialize client
>   }
>
>   def map(input: INT): OUT = {
>     // use client
>   }
> }
>
> the open() method is called before any elements are passed to the
> function. The counterpart of open() is close(), which is called after all
> elements are through or if the job cancels.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <ti...@gmail.com>
> wrote:
>
>> Hello,
>>
>> I'm writing a Scala Flink application. I have a standalone process that
>> exists on every Flink node that I need to call to transform my data. To
>> access this process I need to initialize non thread-safe client first. I
>> would like to avoid initializing a client for each element being
>> transformed. A straightforward implementation would be something like this:
>> ```
>>
>> val env = ExecutionEnvironment.getExecutionEnvironment
>> val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c"))))
>> val pool  = new ArrayBlockingQueue[Client](5)
>> // pool is filled here
>> data.map(e => {
>>   val client = pool.take()
>>   val res = client.transform(e)
>>   pool.put(client)
>>   res
>> })
>>
>> ```
>> However, this causes a runtime exception with message "Task not
>> serializable", which makes sense.
>>
>> Function parameters and broadcast variables won't work either as far as I
>> understand. Is there a way to make this happen?
>>
>> Thanks,
>> Timur
>>
>

Re: Access to a shared resource within a mapper

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
you could use a RichMapFunction that has an open method:

data.map(new RichMapFunction[...]() {
  def open(): () = {
    // initialize client
  }

  def map(input: INT): OUT = {
    // use client
  }
}

the open() method is called before any elements are passed to the function.
The counterpart of open() is close(), which is called after all elements
are through or if the job cancels.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <ti...@gmail.com>
wrote:

> Hello,
>
> I'm writing a Scala Flink application. I have a standalone process that
> exists on every Flink node that I need to call to transform my data. To
> access this process I need to initialize non thread-safe client first. I
> would like to avoid initializing a client for each element being
> transformed. A straightforward implementation would be something like this:
> ```
>
> val env = ExecutionEnvironment.getExecutionEnvironment
> val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c"))))
> val pool  = new ArrayBlockingQueue[Client](5)
> // pool is filled here
> data.map(e => {
>   val client = pool.take()
>   val res = client.transform(e)
>   pool.put(client)
>   res
> })
>
> ```
> However, this causes a runtime exception with message "Task not
> serializable", which makes sense.
>
> Function parameters and broadcast variables won't work either as far as I
> understand. Is there a way to make this happen?
>
> Thanks,
> Timur
>