You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lasse Nedergaard <la...@gmail.com> on 2018/05/01 08:20:04 UTC

ConnectedIterativeStreams and processing state 1.4.2

Hi.

I have a case where I have a input stream that I want to enrich with
external data. I want to cache some of the external lookup data to improve
the overall performances.
To update my cache (a CoProcessFunction) I would use iteration to send the
external enriched information back to the cache and update a mapstate. I
use CoProcesFunction as the input stream and the enrich stream contains 2
diff.object types and I don't want to mix them.
Because I use a ConnectedIterativeStream I can't use state in my
CoProcessFunction because the ConnectedIterativeStream create a DataStream
based on the Feedback signature and not the stream I close the iteration
with and it is not possible to provide a keySelector in the withFeedbackType

Form Flink source

public ConnectedIterativeStreams(DataStream<I> input,
TypeInformation<F> feedbackType, long waitTime) {
    super(input.getExecutionEnvironment(), input, new
DataStream(input.getExecutionEnvironment(), new
CoFeedbackTransformation(input.getParallelism(), feedbackType,
waitTime)));
}

and both streams need to be keyed before state are assigned to the operator.

Any ideas how to workaround this problem?

My sudo code is as below.

IterativeStream.ConnectedIterativeStreams<InputObject, EnrichData>
iteration = inputStream
        .keyBy(obj -> obj.getkey))
        .iterate(maxWaitForIterations).withFeedbackType(TypeInformation.of(new
TypeHint<EnrichData>() {}));

DataStream<ReportMessageBase> enrichedStream = iteration
        .process(new EnrichFromState());

DataStream<ReportMessageBase> notEnrichedOutput = enrichedStream
        .filter(obj -> obj.enriched);

EnrichService EnrichService = new EnrichService();
DataStream<InputObject> enrichedFromApi =
EnrichService.parse(notEnrichedOutput);

DataStream<EnrichData> newEnrich = enrichedFromApi
        .map(obj -> {

            EnrichData newData =  new EnrichData();
            newData.xx = obj.xx();

            return newData;
        })
        .keyBy(obj -> obj.*getkey*);


iteration.closeWith(newAddresses);

....

Re: ConnectedIterativeStreams and processing state 1.4.2

Posted by Lasse Nedergaard <la...@gmail.com>.
I could but the external Rest call is done with async operator and I want to reduce the number of objects going to async and it would require that I store the state in the async operator to. 

Med venlig hilsen / Best regards
Lasse Nedergaard


> Den 3. maj 2018 kl. 13.09 skrev Aljoscha Krettek <al...@apache.org>:
> 
> Couldn't you do that in one operator then? I mean doing the calls and caching the results?
> 
>> On 3. May 2018, at 12:28, Lasse Nedergaard <la...@gmail.com> wrote:
>> 
>> Hi. 
>> 
>> The idea is to cache the latest enrichment data to reuse them and thereby limit the number of external enrichment calls a local cache in Flink as many of our data objects are enriched with the same data. 
>> An alternative solution could be to store the enriched data in Kafka and then stream them into the Flink job that way but if I could do it inside Flink it would be easier 
>> 
>> Med venlig hilsen / Best regards
>> Lasse Nedergaard
>> 
>> 
>>> Den 3. maj 2018 kl. 12.09 skrev Aljoscha Krettek <al...@apache.org>:
>>> 
>>> Hi,
>>> 
>>> Why do you want to do the enrichment downstream and send the data back up? The problem is that feedback edges (or iterations, they are the same in Flink) have some issues with fault-tolerance. Could you maybe outline a bit more in-depths what you're doing and what the flow of data and enrichment is?
>>> 
>>> Best,
>>> Aljoscha
>>> 
>>>> On 2. May 2018, at 16:25, Lasse Nedergaard <la...@gmail.com> wrote:
>>>> 
>>>> Hi. 
>>>> 
>>>> Because the data that I will cache come from a downstream operator and iterations was the only way to look data back to a prev. Operator as I know 
>>>> 
>>>> Med venlig hilsen / Best regards
>>>> Lasse Nedergaard
>>>> 
>>>> 
>>>>> Den 2. maj 2018 kl. 15.35 skrev Piotr Nowojski <pi...@data-artisans.com>:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> Why can not you use simple CoProcessFunction and handle cache updates within it’s processElement1 or processElement2 method?
>>>>> 
>>>>> Piotrek
>>>>> 
>>>>>> On 1 May 2018, at 10:20, Lasse Nedergaard <la...@gmail.com> wrote:
>>>>>> 
>>>>>> Hi.
>>>>>> 
>>>>>> I have a case where I have a input stream that I want to enrich with external data. I want to cache some of the external lookup data to improve the overall performances.
>>>>>> To update my cache (a CoProcessFunction) I would use iteration to send the external enriched information back to the cache and update a mapstate. I use CoProcesFunction as the input stream and the enrich stream contains 2 diff.object types and I don't want to mix them. 
>>>>>> Because I use a ConnectedIterativeStream I can't use state in my CoProcessFunction because the ConnectedIterativeStream create a DataStream based on the Feedback signature and not the stream I close the iteration with and it is not possible to provide a keySelector in the withFeedbackType
>>>>>> 
>>>>>> Form Flink source
>>>>>> public ConnectedIterativeStreams(DataStream<I> input, TypeInformation<F> feedbackType, long waitTime) {
>>>>>>     super(input.getExecutionEnvironment(), input, new DataStream(input.getExecutionEnvironment(), new CoFeedbackTransformation(input.getParallelism(), feedbackType, waitTime)));
>>>>>> }
>>>>>> and both streams need to be keyed before state are assigned to the operator.
>>>>>> Any ideas how to workaround this problem?
>>>>>> 
>>>>>> My sudo code is as below.
>>>>>> 
>>>>>> IterativeStream.ConnectedIterativeStreams<InputObject, EnrichData> iteration = inputStream
>>>>>>         .keyBy(obj -> obj.getkey))
>>>>>>         .iterate(maxWaitForIterations).withFeedbackType(TypeInformation.of(new TypeHint<EnrichData>() {}));
>>>>>> 
>>>>>> DataStream<ReportMessageBase> enrichedStream = iteration
>>>>>>         .process(new EnrichFromState());
>>>>>> 
>>>>>> DataStream<ReportMessageBase> notEnrichedOutput = enrichedStream
>>>>>>         .filter(obj -> obj.enriched);
>>>>>>         
>>>>>> EnrichService EnrichService = new EnrichService();
>>>>>> DataStream<InputObject> enrichedFromApi = EnrichService.parse(notEnrichedOutput);
>>>>>> 
>>>>>> DataStream<EnrichData> newEnrich = enrichedFromApi
>>>>>>         .map(obj -> {
>>>>>> 
>>>>>>             EnrichData newData =  new EnrichData();
>>>>>>             newData.xx = obj.xx();
>>>>>>             
>>>>>>             return newData;
>>>>>>         })
>>>>>>         .keyBy(obj -> obj.getkey);
>>>>>> 
>>>>>> 
>>>>>> iteration.closeWith(newAddresses);
>>>>>> ....
>>>>> 
>>> 
> 

Re: ConnectedIterativeStreams and processing state 1.4.2

Posted by Aljoscha Krettek <al...@apache.org>.
Couldn't you do that in one operator then? I mean doing the calls and caching the results?

> On 3. May 2018, at 12:28, Lasse Nedergaard <la...@gmail.com> wrote:
> 
> Hi. 
> 
> The idea is to cache the latest enrichment data to reuse them and thereby limit the number of external enrichment calls a local cache in Flink as many of our data objects are enriched with the same data. 
> An alternative solution could be to store the enriched data in Kafka and then stream them into the Flink job that way but if I could do it inside Flink it would be easier 
> 
> Med venlig hilsen / Best regards
> Lasse Nedergaard
> 
> 
> Den 3. maj 2018 kl. 12.09 skrev Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>>:
> 
>> Hi,
>> 
>> Why do you want to do the enrichment downstream and send the data back up? The problem is that feedback edges (or iterations, they are the same in Flink) have some issues with fault-tolerance. Could you maybe outline a bit more in-depths what you're doing and what the flow of data and enrichment is?
>> 
>> Best,
>> Aljoscha
>> 
>>> On 2. May 2018, at 16:25, Lasse Nedergaard <lassenedergaard@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi. 
>>> 
>>> Because the data that I will cache come from a downstream operator and iterations was the only way to look data back to a prev. Operator as I know 
>>> 
>>> Med venlig hilsen / Best regards
>>> Lasse Nedergaard
>>> 
>>> 
>>> Den 2. maj 2018 kl. 15.35 skrev Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>>:
>>> 
>>>> Hi,
>>>> 
>>>> Why can not you use simple CoProcessFunction and handle cache updates within it’s processElement1 or processElement2 method?
>>>> 
>>>> Piotrek
>>>> 
>>>>> On 1 May 2018, at 10:20, Lasse Nedergaard <lassenedergaard@gmail.com <ma...@gmail.com>> wrote:
>>>>> 
>>>>> Hi.
>>>>> 
>>>>> I have a case where I have a input stream that I want to enrich with external data. I want to cache some of the external lookup data to improve the overall performances.
>>>>> To update my cache (a CoProcessFunction) I would use iteration to send the external enriched information back to the cache and update a mapstate. I use CoProcesFunction as the input stream and the enrich stream contains 2 diff.object types and I don't want to mix them. 
>>>>> Because I use a ConnectedIterativeStream I can't use state in my CoProcessFunction because the ConnectedIterativeStream create a DataStream based on the Feedback signature and not the stream I close the iteration with and it is not possible to provide a keySelector in the withFeedbackType
>>>>> 
>>>>> Form Flink source
>>>>> public ConnectedIterativeStreams(DataStream<I> input, TypeInformation<F> feedbackType, long waitTime) {
>>>>>     super(input.getExecutionEnvironment(), input, new DataStream(input.getExecutionEnvironment(), new CoFeedbackTransformation(input.getParallelism(), feedbackType, waitTime)));
>>>>> }
>>>>> and both streams need to be keyed before state are assigned to the operator.
>>>>> Any ideas how to workaround this problem?
>>>>> 
>>>>> My sudo code is as below.
>>>>> 
>>>>> IterativeStream.ConnectedIterativeStreams<InputObject, EnrichData> iteration = inputStream
>>>>>         .keyBy(obj -> obj.getkey))
>>>>>         .iterate(maxWaitForIterations).withFeedbackType(TypeInformation.of(new TypeHint<EnrichData>() {}));
>>>>> 
>>>>> DataStream<ReportMessageBase> enrichedStream = iteration
>>>>>         .process(new EnrichFromState());
>>>>> 
>>>>> DataStream<ReportMessageBase> notEnrichedOutput = enrichedStream
>>>>>         .filter(obj -> obj.enriched);
>>>>>         
>>>>> EnrichService EnrichService = new EnrichService();
>>>>> DataStream<InputObject> enrichedFromApi = EnrichService.parse(notEnrichedOutput);
>>>>> 
>>>>> DataStream<EnrichData> newEnrich = enrichedFromApi
>>>>>         .map(obj -> {
>>>>> 
>>>>>             EnrichData newData =  new EnrichData();
>>>>>             newData.xx = obj.xx();
>>>>>             
>>>>>             return newData;
>>>>>         })
>>>>>         .keyBy(obj -> obj.getkey);
>>>>> 
>>>>> 
>>>>> iteration.closeWith(newAddresses);
>>>>> ....
>>>> 
>> 


Re: ConnectedIterativeStreams and processing state 1.4.2

Posted by Lasse Nedergaard <la...@gmail.com>.
Hi. 

The idea is to cache the latest enrichment data to reuse them and thereby limit the number of external enrichment calls a local cache in Flink as many of our data objects are enriched with the same data. 
An alternative solution could be to store the enriched data in Kafka and then stream them into the Flink job that way but if I could do it inside Flink it would be easier 

Med venlig hilsen / Best regards
Lasse Nedergaard


> Den 3. maj 2018 kl. 12.09 skrev Aljoscha Krettek <al...@apache.org>:
> 
> Hi,
> 
> Why do you want to do the enrichment downstream and send the data back up? The problem is that feedback edges (or iterations, they are the same in Flink) have some issues with fault-tolerance. Could you maybe outline a bit more in-depths what you're doing and what the flow of data and enrichment is?
> 
> Best,
> Aljoscha
> 
>> On 2. May 2018, at 16:25, Lasse Nedergaard <la...@gmail.com> wrote:
>> 
>> Hi. 
>> 
>> Because the data that I will cache come from a downstream operator and iterations was the only way to look data back to a prev. Operator as I know 
>> 
>> Med venlig hilsen / Best regards
>> Lasse Nedergaard
>> 
>> 
>>> Den 2. maj 2018 kl. 15.35 skrev Piotr Nowojski <pi...@data-artisans.com>:
>>> 
>>> Hi,
>>> 
>>> Why can not you use simple CoProcessFunction and handle cache updates within it’s processElement1 or processElement2 method?
>>> 
>>> Piotrek
>>> 
>>>> On 1 May 2018, at 10:20, Lasse Nedergaard <la...@gmail.com> wrote:
>>>> 
>>>> Hi.
>>>> 
>>>> I have a case where I have a input stream that I want to enrich with external data. I want to cache some of the external lookup data to improve the overall performances.
>>>> To update my cache (a CoProcessFunction) I would use iteration to send the external enriched information back to the cache and update a mapstate. I use CoProcesFunction as the input stream and the enrich stream contains 2 diff.object types and I don't want to mix them. 
>>>> Because I use a ConnectedIterativeStream I can't use state in my CoProcessFunction because the ConnectedIterativeStream create a DataStream based on the Feedback signature and not the stream I close the iteration with and it is not possible to provide a keySelector in the withFeedbackType
>>>> 
>>>> Form Flink source
>>>> public ConnectedIterativeStreams(DataStream<I> input, TypeInformation<F> feedbackType, long waitTime) {
>>>>     super(input.getExecutionEnvironment(), input, new DataStream(input.getExecutionEnvironment(), new CoFeedbackTransformation(input.getParallelism(), feedbackType, waitTime)));
>>>> }
>>>> and both streams need to be keyed before state are assigned to the operator.
>>>> Any ideas how to workaround this problem?
>>>> 
>>>> My sudo code is as below.
>>>> 
>>>> IterativeStream.ConnectedIterativeStreams<InputObject, EnrichData> iteration = inputStream
>>>>         .keyBy(obj -> obj.getkey))
>>>>         .iterate(maxWaitForIterations).withFeedbackType(TypeInformation.of(new TypeHint<EnrichData>() {}));
>>>> 
>>>> DataStream<ReportMessageBase> enrichedStream = iteration
>>>>         .process(new EnrichFromState());
>>>> 
>>>> DataStream<ReportMessageBase> notEnrichedOutput = enrichedStream
>>>>         .filter(obj -> obj.enriched);
>>>>         
>>>> EnrichService EnrichService = new EnrichService();
>>>> DataStream<InputObject> enrichedFromApi = EnrichService.parse(notEnrichedOutput);
>>>> 
>>>> DataStream<EnrichData> newEnrich = enrichedFromApi
>>>>         .map(obj -> {
>>>> 
>>>>             EnrichData newData =  new EnrichData();
>>>>             newData.xx = obj.xx();
>>>>             
>>>>             return newData;
>>>>         })
>>>>         .keyBy(obj -> obj.getkey);
>>>> 
>>>> 
>>>> iteration.closeWith(newAddresses);
>>>> ....
>>> 
> 

Re: ConnectedIterativeStreams and processing state 1.4.2

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

Why do you want to do the enrichment downstream and send the data back up? The problem is that feedback edges (or iterations, they are the same in Flink) have some issues with fault-tolerance. Could you maybe outline a bit more in-depths what you're doing and what the flow of data and enrichment is?

Best,
Aljoscha

> On 2. May 2018, at 16:25, Lasse Nedergaard <la...@gmail.com> wrote:
> 
> Hi. 
> 
> Because the data that I will cache come from a downstream operator and iterations was the only way to look data back to a prev. Operator as I know 
> 
> Med venlig hilsen / Best regards
> Lasse Nedergaard
> 
> 
> Den 2. maj 2018 kl. 15.35 skrev Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>>:
> 
>> Hi,
>> 
>> Why can not you use simple CoProcessFunction and handle cache updates within it’s processElement1 or processElement2 method?
>> 
>> Piotrek
>> 
>>> On 1 May 2018, at 10:20, Lasse Nedergaard <lassenedergaard@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi.
>>> 
>>> I have a case where I have a input stream that I want to enrich with external data. I want to cache some of the external lookup data to improve the overall performances.
>>> To update my cache (a CoProcessFunction) I would use iteration to send the external enriched information back to the cache and update a mapstate. I use CoProcesFunction as the input stream and the enrich stream contains 2 diff.object types and I don't want to mix them. 
>>> Because I use a ConnectedIterativeStream I can't use state in my CoProcessFunction because the ConnectedIterativeStream create a DataStream based on the Feedback signature and not the stream I close the iteration with and it is not possible to provide a keySelector in the withFeedbackType
>>> 
>>> Form Flink source
>>> public ConnectedIterativeStreams(DataStream<I> input, TypeInformation<F> feedbackType, long waitTime) {
>>>     super(input.getExecutionEnvironment(), input, new DataStream(input.getExecutionEnvironment(), new CoFeedbackTransformation(input.getParallelism(), feedbackType, waitTime)));
>>> }
>>> and both streams need to be keyed before state are assigned to the operator.
>>> Any ideas how to workaround this problem?
>>> 
>>> My sudo code is as below.
>>> 
>>> IterativeStream.ConnectedIterativeStreams<InputObject, EnrichData> iteration = inputStream
>>>         .keyBy(obj -> obj.getkey))
>>>         .iterate(maxWaitForIterations).withFeedbackType(TypeInformation.of(new TypeHint<EnrichData>() {}));
>>> 
>>> DataStream<ReportMessageBase> enrichedStream = iteration
>>>         .process(new EnrichFromState());
>>> 
>>> DataStream<ReportMessageBase> notEnrichedOutput = enrichedStream
>>>         .filter(obj -> obj.enriched);
>>>         
>>> EnrichService EnrichService = new EnrichService();
>>> DataStream<InputObject> enrichedFromApi = EnrichService.parse(notEnrichedOutput);
>>> 
>>> DataStream<EnrichData> newEnrich = enrichedFromApi
>>>         .map(obj -> {
>>> 
>>>             EnrichData newData =  new EnrichData();
>>>             newData.xx = obj.xx();
>>>             
>>>             return newData;
>>>         })
>>>         .keyBy(obj -> obj.getkey);
>>> 
>>> 
>>> iteration.closeWith(newAddresses);
>>> ....
>> 


Re: ConnectedIterativeStreams and processing state 1.4.2

Posted by Lasse Nedergaard <la...@gmail.com>.
Hi. 

Because the data that I will cache come from a downstream operator and iterations was the only way to look data back to a prev. Operator as I know 

Med venlig hilsen / Best regards
Lasse Nedergaard


> Den 2. maj 2018 kl. 15.35 skrev Piotr Nowojski <pi...@data-artisans.com>:
> 
> Hi,
> 
> Why can not you use simple CoProcessFunction and handle cache updates within it’s processElement1 or processElement2 method?
> 
> Piotrek
> 
>> On 1 May 2018, at 10:20, Lasse Nedergaard <la...@gmail.com> wrote:
>> 
>> Hi.
>> 
>> I have a case where I have a input stream that I want to enrich with external data. I want to cache some of the external lookup data to improve the overall performances.
>> To update my cache (a CoProcessFunction) I would use iteration to send the external enriched information back to the cache and update a mapstate. I use CoProcesFunction as the input stream and the enrich stream contains 2 diff.object types and I don't want to mix them. 
>> Because I use a ConnectedIterativeStream I can't use state in my CoProcessFunction because the ConnectedIterativeStream create a DataStream based on the Feedback signature and not the stream I close the iteration with and it is not possible to provide a keySelector in the withFeedbackType
>> 
>> Form Flink source
>> public ConnectedIterativeStreams(DataStream<I> input, TypeInformation<F> feedbackType, long waitTime) {
>>     super(input.getExecutionEnvironment(), input, new DataStream(input.getExecutionEnvironment(), new CoFeedbackTransformation(input.getParallelism(), feedbackType, waitTime)));
>> }
>> and both streams need to be keyed before state are assigned to the operator.
>> Any ideas how to workaround this problem?
>> 
>> My sudo code is as below.
>> 
>> IterativeStream.ConnectedIterativeStreams<InputObject, EnrichData> iteration = inputStream
>>         .keyBy(obj -> obj.getkey))
>>         .iterate(maxWaitForIterations).withFeedbackType(TypeInformation.of(new TypeHint<EnrichData>() {}));
>> 
>> DataStream<ReportMessageBase> enrichedStream = iteration
>>         .process(new EnrichFromState());
>> 
>> DataStream<ReportMessageBase> notEnrichedOutput = enrichedStream
>>         .filter(obj -> obj.enriched);
>>         
>> EnrichService EnrichService = new EnrichService();
>> DataStream<InputObject> enrichedFromApi = EnrichService.parse(notEnrichedOutput);
>> 
>> DataStream<EnrichData> newEnrich = enrichedFromApi
>>         .map(obj -> {
>> 
>>             EnrichData newData =  new EnrichData();
>>             newData.xx = obj.xx();
>>             
>>             return newData;
>>         })
>>         .keyBy(obj -> obj.getkey);
>> 
>> 
>> iteration.closeWith(newAddresses);
>> ....
> 

Re: ConnectedIterativeStreams and processing state 1.4.2

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

Why can not you use simple CoProcessFunction and handle cache updates within it’s processElement1 or processElement2 method?

Piotrek

> On 1 May 2018, at 10:20, Lasse Nedergaard <la...@gmail.com> wrote:
> 
> Hi.
> 
> I have a case where I have a input stream that I want to enrich with external data. I want to cache some of the external lookup data to improve the overall performances.
> To update my cache (a CoProcessFunction) I would use iteration to send the external enriched information back to the cache and update a mapstate. I use CoProcesFunction as the input stream and the enrich stream contains 2 diff.object types and I don't want to mix them. 
> Because I use a ConnectedIterativeStream I can't use state in my CoProcessFunction because the ConnectedIterativeStream create a DataStream based on the Feedback signature and not the stream I close the iteration with and it is not possible to provide a keySelector in the withFeedbackType
> 
> Form Flink source
> public ConnectedIterativeStreams(DataStream<I> input, TypeInformation<F> feedbackType, long waitTime) {
>     super(input.getExecutionEnvironment(), input, new DataStream(input.getExecutionEnvironment(), new CoFeedbackTransformation(input.getParallelism(), feedbackType, waitTime)));
> }
> and both streams need to be keyed before state are assigned to the operator.
> Any ideas how to workaround this problem?
> 
> My sudo code is as below.
> 
> IterativeStream.ConnectedIterativeStreams<InputObject, EnrichData> iteration = inputStream
>         .keyBy(obj -> obj.getkey))
>         .iterate(maxWaitForIterations).withFeedbackType(TypeInformation.of(new TypeHint<EnrichData>() {}));
> 
> DataStream<ReportMessageBase> enrichedStream = iteration
>         .process(new EnrichFromState());
> 
> DataStream<ReportMessageBase> notEnrichedOutput = enrichedStream
>         .filter(obj -> obj.enriched);
>         
> EnrichService EnrichService = new EnrichService();
> DataStream<InputObject> enrichedFromApi = EnrichService.parse(notEnrichedOutput);
> 
> DataStream<EnrichData> newEnrich = enrichedFromApi
>         .map(obj -> {
> 
>             EnrichData newData =  new EnrichData();
>             newData.xx = obj.xx();
>             
>             return newData;
>         })
>         .keyBy(obj -> obj.getkey);
> 
> 
> iteration.closeWith(newAddresses);
> ....