You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yan Chou Chen <yc...@gmail.com> on 2016/06/16 15:07:30 UTC

How MapFunction gets executed?

A quick question. When running a stream job that executes
DataStream.map(MapFunction) , after data is read from Kafka, does each
MapFunction is created per item or based on parallelism?

For instance, for the following code snippet

val env = StreamExecutionEnvironment.getExeutionEnvironment
val stream = env.addSource(FlinkKafkaConsumer09(...))
stream.map(new RichMapFunction[String, Unit] {

    // my AsyncHttpClient instance

    override def open(params: Configuration) { /* create my
AsyncHttpClient instance, etc. */ }

    override def close() { /* close my AsyncHttpClient instance*/ }

    override def map(record: String) {
        // my code
    }
})

Is RichMapFunction created for each record (as String in the above
example)? Or say the program set parallelism to 4 so 4 RichMapFunction
instances are created first, then data read from Kafka consumer is
divided into 4 partitions (or something similar), and then map(record:
String) is called within something like while loop? Or what is the
actual flow? Or source code I can start from (I trace through
StreamExecutionEnvironment/ addSource/ DataStream/ transform/
addOperator etc., but I then get lost in source code)?

Basically my problem is I have an AsyncHttpClient instance opened
within open() function and close in close function according to the
RichMapFunction doc. However, an issue is that in some cases my
AsyncHttpClient instance is not executed which displays warning like

AsyncHttpClient.close() hasn't been invoked, which may produce file
descriptor leaks

Therefore I would like to know the life cycle so that I can close
resource appropriately.

Thanks

Re: How MapFunction gets executed?

Posted by Yan Chou Chen <yc...@gmail.com>.
Thanks for clarifying that helps me identify the root cause. The
problem comes from my code which is not related to Flink. Now the
problem is solved. Thank you again for the advice!

On 16 June 2016 at 23:49, Till Rohrmann <tr...@apache.org> wrote:
> Hi Yan Chou Chen,
>
> Flink does not instantiate for each record a mapper. Instead, it will create
> as many mappers as you've defined with the parallelism. Each of these
> mappers is deployed to a slot on a TaskManager. When it is deployed and
> before it receives records, the open method is called once. Then incoming
> records are processed as they arrive at the operator. Once the operator has
> finished processing (in the streaming case, this means that the user has
> stopped or cancelled the job) it will call the close method. The close
> method should also be called if your job fails. Therefore, I cannot explain
> why some of your resources don't get closed. Could you check whether the
> logs contains something suspicious.
>
> Cheers,
> Till
>
> On Thu, Jun 16, 2016 at 5:07 PM, Yan Chou Chen <yc...@gmail.com> wrote:
>>
>> A quick question. When running a stream job that executes
>> DataStream.map(MapFunction) , after data is read from Kafka, does each
>> MapFunction is created per item or based on parallelism?
>>
>> For instance, for the following code snippet
>>
>> val env = StreamExecutionEnvironment.getExeutionEnvironment
>> val stream = env.addSource(FlinkKafkaConsumer09(...))
>> stream.map(new RichMapFunction[String, Unit] {
>>
>>     // my AsyncHttpClient instance
>>
>>     override def open(params: Configuration) { /* create my
>> AsyncHttpClient instance, etc. */ }
>>
>>     override def close() { /* close my AsyncHttpClient instance*/ }
>>
>>     override def map(record: String) {
>>         // my code
>>     }
>> })
>>
>> Is RichMapFunction created for each record (as String in the above
>> example)? Or say the program set parallelism to 4 so 4 RichMapFunction
>> instances are created first, then data read from Kafka consumer is
>> divided into 4 partitions (or something similar), and then map(record:
>> String) is called within something like while loop? Or what is the
>> actual flow? Or source code I can start from (I trace through
>> StreamExecutionEnvironment/ addSource/ DataStream/ transform/
>> addOperator etc., but I then get lost in source code)?
>>
>> Basically my problem is I have an AsyncHttpClient instance opened
>> within open() function and close in close function according to the
>> RichMapFunction doc. However, an issue is that in some cases my
>> AsyncHttpClient instance is not executed which displays warning like
>>
>> AsyncHttpClient.close() hasn't been invoked, which may produce file
>> descriptor leaks
>>
>> Therefore I would like to know the life cycle so that I can close
>> resource appropriately.
>>
>> Thanks
>
>

Re: How MapFunction gets executed?

Posted by Till Rohrmann <tr...@apache.org>.
Hi Yan Chou Chen,

Flink does not instantiate for each record a mapper. Instead, it will
create as many mappers as you've defined with the parallelism. Each of
these mappers is deployed to a slot on a TaskManager. When it is deployed
and before it receives records, the open method is called once. Then
incoming records are processed as they arrive at the operator. Once the
operator has finished processing (in the streaming case, this means that
the user has stopped or cancelled the job) it will call the close method.
The close method should also be called if your job fails. Therefore, I
cannot explain why some of your resources don't get closed. Could you check
whether the logs contains something suspicious.

Cheers,
Till

On Thu, Jun 16, 2016 at 5:07 PM, Yan Chou Chen <yc...@gmail.com> wrote:

> A quick question. When running a stream job that executes
> DataStream.map(MapFunction) , after data is read from Kafka, does each
> MapFunction is created per item or based on parallelism?
>
> For instance, for the following code snippet
>
> val env = StreamExecutionEnvironment.getExeutionEnvironment
> val stream = env.addSource(FlinkKafkaConsumer09(...))
> stream.map(new RichMapFunction[String, Unit] {
>
>     // my AsyncHttpClient instance
>
>     override def open(params: Configuration) { /* create my
> AsyncHttpClient instance, etc. */ }
>
>     override def close() { /* close my AsyncHttpClient instance*/ }
>
>     override def map(record: String) {
>         // my code
>     }
> })
>
> Is RichMapFunction created for each record (as String in the above
> example)? Or say the program set parallelism to 4 so 4 RichMapFunction
> instances are created first, then data read from Kafka consumer is
> divided into 4 partitions (or something similar), and then map(record:
> String) is called within something like while loop? Or what is the
> actual flow? Or source code I can start from (I trace through
> StreamExecutionEnvironment/ addSource/ DataStream/ transform/
> addOperator etc., but I then get lost in source code)?
>
> Basically my problem is I have an AsyncHttpClient instance opened
> within open() function and close in close function according to the
> RichMapFunction doc. However, an issue is that in some cases my
> AsyncHttpClient instance is not executed which displays warning like
>
> AsyncHttpClient.close() hasn't been invoked, which may produce file
> descriptor leaks
>
> Therefore I would like to know the life cycle so that I can close
> resource appropriately.
>
> Thanks
>