You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Saravanan77@gmail.com" <sa...@gmail.com> on 2022/02/09 16:56:33 UTC

Flink 1.12.x DataSet --> Flink 1.14.x DataStream

I am trying to migrate from Flink 1.12.x DataSet api to Flink 1.14.x
DataStream api. mapPartition is not available in Flink DataStream.
*Current Code using Flink 1.12.x DataSet :*

dataset
    .<few operations>
    .mapPartition(new SomeMapParitionFn())
    .<few more operations>

public static class SomeMapPartitionFn extends
RichMapPartitionFunction<InputModel, OutputModel> {

    @Override
    public void mapPartition(Iterable<InputModel> records,
Collector<OutputModel> out) throws Exception {
        for (InputModel record : records) {
            /*
            do some operation
             */
            if (/* some condition based on processing *MULTIPLE*
records */) {*                out.collect(...); // Conditional collect
               ---> (1)*            }
        }

        // At the end of the data, collect*        out.collect(...);
// Collect processed data                   ---> (2) *    }
}


   -

   (1) - Collector.collect invoked based on some condition after processing
   few records
   -

   (2) - Collector.collect invoked at the end of data

   Initially we thought of using flatMap instead of mapPartition, but the
   collector is not available in close function.

   https://issues.apache.org/jira/browse/FLINK-14709 - Only available in
   case of chained drivers

How to implement this in Flink 1.14.x DataStream? Please advise...

*Note*: Our application works with only finite set of data (Batch Mode)

Re: Flink 1.12.x DataSet --> Flink 1.14.x DataStream

Posted by "Saravanan77@gmail.com" <sa...@gmail.com>.
Thanks Zhipeng.  Working as expected.  Thanks once again.

Saravanan

On Tue, Feb 15, 2022 at 3:23 AM Zhipeng Zhang <zh...@gmail.com>
wrote:

> Hi Saravanan,
>
> One solution could be using a streamOperator to implement `BoundedOneInput`
> interface.
> An example code could be found here [1].
>
> [1]
> https://github.com/apache/flink-ml/blob/56b441d85c3356c0ffedeef9c27969aee5b3ecfc/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/DataStreamUtils.java#L75
>
> Saravanan77@gmail.com <sa...@gmail.com> 于2022年2月15日周二 02:44写道:
>
>> Hi Niklas,
>>
>> Thanks for your reply.  Approach [1] works only if operators are chained
>> (in order words, operators executed within the same task).   Since
>> mapPartition operator parallelism is different from previous operator
>> parallelism, it doesn't fall under the same task(or not chained) .
>>
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/flink-architecture/#tasks-and-operator-chains
>> https://issues.apache.org/jira/browse/FLINK-14709
>>
>> Saravanan
>>
>> On Mon, Feb 14, 2022 at 9:01 AM Niklas Semmler <ni...@ververica.com>
>> wrote:
>>
>>> Hi Saravanan,
>>>
>>> AFAIK the last record is not treated differently.
>>>
>>> Does the approach in [1] not work?
>>>
>>> Best regards,
>>> Niklas
>>>
>>>
>>> https://github.com/dmvk/flink/blob/2f1b573cd57e95ecac13c8c57c0356fb281fd753/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java#L279
>>>
>>>
>>> > On 9. Feb 2022, at 20:31, Saravanan77@gmail.com <sa...@gmail.com>
>>> wrote:
>>> >
>>> > Is there any way to identify the last message inside RichFunction in
>>> BATCH mode ?
>>> >
>>> >
>>> >
>>> > On Wed, Feb 9, 2022 at 8:56 AM Saravanan77@gmail.com <
>>> saravanan77@gmail.com> wrote:
>>> > I am trying to migrate from Flink 1.12.x DataSet api to Flink 1.14.x
>>> DataStream api. mapPartition is not available in Flink DataStream.
>>> >
>>> > Current Code using Flink 1.12.x DataSet :
>>> >
>>> > dataset
>>> >     .<few operations>
>>> >     .mapPartition(new SomeMapParitionFn())
>>> >     .<few more operations>
>>> >
>>> > public static class SomeMapPartitionFn extends
>>> RichMapPartitionFunction<InputModel, OutputModel> {
>>> >
>>> >     @Override
>>> >     public void mapPartition(Iterable<InputModel> records,
>>> Collector<OutputModel> out) throws Exception {
>>> >         for (InputModel record : records) {
>>> >             /*
>>> >             do some operation
>>> >              */
>>> >             if (/* some condition based on processing *MULTIPLE*
>>> records */) {
>>> >
>>> >                 out.collect(...); // Conditional collect
>>>   ---> (1)
>>> >             }
>>> >         }
>>> >
>>> >         // At the end of the data, collect
>>> >
>>> >         out.collect(...);   // Collect processed data
>>>  ---> (2)
>>> >     }
>>> > }
>>> >
>>> >       • (1) - Collector.collect invoked based on some condition after
>>> processing few records
>>> >       • (2) - Collector.collect invoked at the end of data
>>> >
>>> > Initially we thought of using flatMap instead of mapPartition, but the
>>> collector is not available in close function.
>>> >
>>> > https://issues.apache.org/jira/browse/FLINK-14709 - Only available in
>>> case of chained drivers
>>> > How to implement this in Flink 1.14.x DataStream? Please advise...
>>> >
>>> > Note: Our application works with only finite set of data (Batch Mode)
>>> >
>>>
>>>
>
> --
> best,
> Zhipeng
>
>

Re: Flink 1.12.x DataSet --> Flink 1.14.x DataStream

Posted by Zhipeng Zhang <zh...@gmail.com>.
Hi Saravanan,

One solution could be using a streamOperator to implement `BoundedOneInput`
interface.
An example code could be found here [1].

[1]
https://github.com/apache/flink-ml/blob/56b441d85c3356c0ffedeef9c27969aee5b3ecfc/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/DataStreamUtils.java#L75

Saravanan77@gmail.com <sa...@gmail.com> 于2022年2月15日周二 02:44写道:

> Hi Niklas,
>
> Thanks for your reply.  Approach [1] works only if operators are chained
> (in order words, operators executed within the same task).   Since
> mapPartition operator parallelism is different from previous operator
> parallelism, it doesn't fall under the same task(or not chained) .
>
>
>
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/flink-architecture/#tasks-and-operator-chains
> https://issues.apache.org/jira/browse/FLINK-14709
>
> Saravanan
>
> On Mon, Feb 14, 2022 at 9:01 AM Niklas Semmler <ni...@ververica.com>
> wrote:
>
>> Hi Saravanan,
>>
>> AFAIK the last record is not treated differently.
>>
>> Does the approach in [1] not work?
>>
>> Best regards,
>> Niklas
>>
>>
>> https://github.com/dmvk/flink/blob/2f1b573cd57e95ecac13c8c57c0356fb281fd753/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java#L279
>>
>>
>> > On 9. Feb 2022, at 20:31, Saravanan77@gmail.com <sa...@gmail.com>
>> wrote:
>> >
>> > Is there any way to identify the last message inside RichFunction in
>> BATCH mode ?
>> >
>> >
>> >
>> > On Wed, Feb 9, 2022 at 8:56 AM Saravanan77@gmail.com <
>> saravanan77@gmail.com> wrote:
>> > I am trying to migrate from Flink 1.12.x DataSet api to Flink 1.14.x
>> DataStream api. mapPartition is not available in Flink DataStream.
>> >
>> > Current Code using Flink 1.12.x DataSet :
>> >
>> > dataset
>> >     .<few operations>
>> >     .mapPartition(new SomeMapParitionFn())
>> >     .<few more operations>
>> >
>> > public static class SomeMapPartitionFn extends
>> RichMapPartitionFunction<InputModel, OutputModel> {
>> >
>> >     @Override
>> >     public void mapPartition(Iterable<InputModel> records,
>> Collector<OutputModel> out) throws Exception {
>> >         for (InputModel record : records) {
>> >             /*
>> >             do some operation
>> >              */
>> >             if (/* some condition based on processing *MULTIPLE*
>> records */) {
>> >
>> >                 out.collect(...); // Conditional collect
>> ---> (1)
>> >             }
>> >         }
>> >
>> >         // At the end of the data, collect
>> >
>> >         out.collect(...);   // Collect processed data
>>  ---> (2)
>> >     }
>> > }
>> >
>> >       • (1) - Collector.collect invoked based on some condition after
>> processing few records
>> >       • (2) - Collector.collect invoked at the end of data
>> >
>> > Initially we thought of using flatMap instead of mapPartition, but the
>> collector is not available in close function.
>> >
>> > https://issues.apache.org/jira/browse/FLINK-14709 - Only available in
>> case of chained drivers
>> > How to implement this in Flink 1.14.x DataStream? Please advise...
>> >
>> > Note: Our application works with only finite set of data (Batch Mode)
>> >
>>
>>

-- 
best,
Zhipeng

Re: Flink 1.12.x DataSet --> Flink 1.14.x DataStream

Posted by "Saravanan77@gmail.com" <sa...@gmail.com>.
Hi Niklas,

Thanks for your reply.  Approach [1] works only if operators are chained
(in order words, operators executed within the same task).   Since
mapPartition operator parallelism is different from previous operator
parallelism, it doesn't fall under the same task(or not chained) .


https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/flink-architecture/#tasks-and-operator-chains
https://issues.apache.org/jira/browse/FLINK-14709

Saravanan

On Mon, Feb 14, 2022 at 9:01 AM Niklas Semmler <ni...@ververica.com> wrote:

> Hi Saravanan,
>
> AFAIK the last record is not treated differently.
>
> Does the approach in [1] not work?
>
> Best regards,
> Niklas
>
>
> https://github.com/dmvk/flink/blob/2f1b573cd57e95ecac13c8c57c0356fb281fd753/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java#L279
>
>
> > On 9. Feb 2022, at 20:31, Saravanan77@gmail.com <sa...@gmail.com>
> wrote:
> >
> > Is there any way to identify the last message inside RichFunction in
> BATCH mode ?
> >
> >
> >
> > On Wed, Feb 9, 2022 at 8:56 AM Saravanan77@gmail.com <
> saravanan77@gmail.com> wrote:
> > I am trying to migrate from Flink 1.12.x DataSet api to Flink 1.14.x
> DataStream api. mapPartition is not available in Flink DataStream.
> >
> > Current Code using Flink 1.12.x DataSet :
> >
> > dataset
> >     .<few operations>
> >     .mapPartition(new SomeMapParitionFn())
> >     .<few more operations>
> >
> > public static class SomeMapPartitionFn extends
> RichMapPartitionFunction<InputModel, OutputModel> {
> >
> >     @Override
> >     public void mapPartition(Iterable<InputModel> records,
> Collector<OutputModel> out) throws Exception {
> >         for (InputModel record : records) {
> >             /*
> >             do some operation
> >              */
> >             if (/* some condition based on processing *MULTIPLE* records
> */) {
> >
> >                 out.collect(...); // Conditional collect
> ---> (1)
> >             }
> >         }
> >
> >         // At the end of the data, collect
> >
> >         out.collect(...);   // Collect processed data
>  ---> (2)
> >     }
> > }
> >
> >       • (1) - Collector.collect invoked based on some condition after
> processing few records
> >       • (2) - Collector.collect invoked at the end of data
> >
> > Initially we thought of using flatMap instead of mapPartition, but the
> collector is not available in close function.
> >
> > https://issues.apache.org/jira/browse/FLINK-14709 - Only available in
> case of chained drivers
> > How to implement this in Flink 1.14.x DataStream? Please advise...
> >
> > Note: Our application works with only finite set of data (Batch Mode)
> >
>
>

Re: Flink 1.12.x DataSet --> Flink 1.14.x DataStream

Posted by Niklas Semmler <ni...@ververica.com>.
Hi Saravanan,

AFAIK the last record is not treated differently.

Does the approach in [1] not work? 

Best regards,
Niklas

https://github.com/dmvk/flink/blob/2f1b573cd57e95ecac13c8c57c0356fb281fd753/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java#L279


> On 9. Feb 2022, at 20:31, Saravanan77@gmail.com <sa...@gmail.com> wrote:
> 
> Is there any way to identify the last message inside RichFunction in BATCH mode ?
> 
> 
> 
> On Wed, Feb 9, 2022 at 8:56 AM Saravanan77@gmail.com <sa...@gmail.com> wrote:
> I am trying to migrate from Flink 1.12.x DataSet api to Flink 1.14.x DataStream api. mapPartition is not available in Flink DataStream.
> 
> Current Code using Flink 1.12.x DataSet :
> 
> dataset
>     .<few operations>
>     .mapPartition(new SomeMapParitionFn())
>     .<few more operations>
> 
> public static class SomeMapPartitionFn extends RichMapPartitionFunction<InputModel, OutputModel> {
> 
>     @Override
>     public void mapPartition(Iterable<InputModel> records, Collector<OutputModel> out) throws Exception {
>         for (InputModel record : records) {
>             /*
>             do some operation    
>              */
>             if (/* some condition based on processing *MULTIPLE* records */) {
> 
>                 out.collect(...); // Conditional collect                ---> (1)
>             }
>         }
>         
>         // At the end of the data, collect
> 
>         out.collect(...);   // Collect processed data                   ---> (2) 
>     }
> }
> 
> 	• (1) - Collector.collect invoked based on some condition after processing few records
> 	• (2) - Collector.collect invoked at the end of data
> 
> Initially we thought of using flatMap instead of mapPartition, but the collector is not available in close function.
> 
> https://issues.apache.org/jira/browse/FLINK-14709 - Only available in case of chained drivers
> How to implement this in Flink 1.14.x DataStream? Please advise...
> 
> Note: Our application works with only finite set of data (Batch Mode)
> 


Re: Flink 1.12.x DataSet --> Flink 1.14.x DataStream

Posted by "Saravanan77@gmail.com" <sa...@gmail.com>.
Is there any way to identify the last message inside RichFunction in BATCH
mode ?



On Wed, Feb 9, 2022 at 8:56 AM Saravanan77@gmail.com <sa...@gmail.com>
wrote:

> I am trying to migrate from Flink 1.12.x DataSet api to Flink 1.14.x
> DataStream api. mapPartition is not available in Flink DataStream.
> *Current Code using Flink 1.12.x DataSet :*
>
> dataset
>     .<few operations>
>     .mapPartition(new SomeMapParitionFn())
>     .<few more operations>
>
> public static class SomeMapPartitionFn extends RichMapPartitionFunction<InputModel, OutputModel> {
>
>     @Override
>     public void mapPartition(Iterable<InputModel> records, Collector<OutputModel> out) throws Exception {
>         for (InputModel record : records) {
>             /*
>             do some operation
>              */
>             if (/* some condition based on processing *MULTIPLE* records */) {*                out.collect(...); // Conditional collect                ---> (1)*            }
>         }
>
>         // At the end of the data, collect*        out.collect(...);   // Collect processed data                   ---> (2) *    }
> }
>
>
>    -
>
>    (1) - Collector.collect invoked based on some condition after
>    processing few records
>    -
>
>    (2) - Collector.collect invoked at the end of data
>
>    Initially we thought of using flatMap instead of mapPartition, but the
>    collector is not available in close function.
>
>    https://issues.apache.org/jira/browse/FLINK-14709 - Only available in
>    case of chained drivers
>
> How to implement this in Flink 1.14.x DataStream? Please advise...
>
> *Note*: Our application works with only finite set of data (Batch Mode)
>
>
>