You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Navneeth Krishnan <re...@gmail.com> on 2017/09/05 21:42:16 UTC

Broadcast Config through Connected Stream

Hi All,

I looked into an earlier email about the topic broadcast config through
connected stream and I couldn't find the conclusion.

I can't do the below approach since I need the config to be published to
all operator instances but I need keyed state for external querying.

streamToBeConfigured.connect(configMessageStream)
.keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
.flatMap(new FunctionWithConfigurableState())
.addSink(...);

One of the resolution I found in that mail chain was below. I can use this
to solve my issue but is this still the recommended approach?

stream1.connect(stream2)
            .map(new MergeStreamsMapFunction()) // Holds transient state of
the last ConfigMessage and maps Stream1's data to a Tuple2<Stream1Data,
ConfigMessage>
            .keyBy(new SomeIdKeySelector())         // KeyBy Id to allow
for ValueStateDescriptors and semantically correct partitioning according
to business logic
            .flatMap(new StatefulFlatMapFunction()) // Save latest received
ConfigMessage-Value in ValueStateDescriptor here
            .addSink(...);

Thanks,
Navneeth

Re: Broadcast Config through Connected Stream

Posted by Navneeth Krishnan <re...@gmail.com>.
Thanks a lot Aljoscha. That helps.

On Mon, Sep 25, 2017 at 4:47 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
>
> I think this is a valid approach, you can even use "operator state" in
> your map function to make the broadcast config state stateful.
>
> Another approach would be to use internal APIs to hack an operator that
> has a keyed stream on one input and a broadcast stream on the second input.
> You can see that approach in action in the Beam Flink Runner [1] but I
> would strongly recommend against doing that because it is using internal
> APIs and if the other approach works for you I would stay with that.
>
> Best,
> Aljoscha
>
> [1] https://github.com/apache/beam/blob/be9fb29901cf4a1ae7b4a9d8e9f25f
> 4ea78359fd/runners/flink/src/main/java/org/apache/beam/runners/flink/
> FlinkStreamingTransformTranslators.java#L488
>
> On 15. Sep 2017, at 07:04, Navneeth Krishnan <re...@gmail.com>
> wrote:
>
> Hi,
>
> Any suggestions on this could be achieved?
>
> Thanks
>
> On Thu, Sep 7, 2017 at 8:02 AM, Navneeth Krishnan <
> reachnavneeth2@gmail.com> wrote:
>
>> Hi All,
>>
>> Any suggestions on this would really help.
>>
>> Thanks.
>>
>> On Tue, Sep 5, 2017 at 2:42 PM, Navneeth Krishnan <
>> reachnavneeth2@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I looked into an earlier email about the topic broadcast config through
>>> connected stream and I couldn't find the conclusion.
>>>
>>> I can't do the below approach since I need the config to be published to
>>> all operator instances but I need keyed state for external querying.
>>>
>>> streamToBeConfigured.connect(configMessageStream)
>>> .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
>>> .flatMap(new FunctionWithConfigurableState())
>>> .addSink(...);
>>>
>>> One of the resolution I found in that mail chain was below. I can use
>>> this to solve my issue but is this still the recommended approach?
>>>
>>> stream1.connect(stream2)
>>>             .map(new MergeStreamsMapFunction()) // Holds transient state
>>> of the last ConfigMessage and maps Stream1's data to a Tuple2<Stream1Data,
>>> ConfigMessage>
>>>             .keyBy(new SomeIdKeySelector())         // KeyBy Id to allow
>>> for ValueStateDescriptors and semantically correct partitioning according
>>> to business logic
>>>             .flatMap(new StatefulFlatMapFunction()) // Save latest
>>> received ConfigMessage-Value in ValueStateDescriptor here
>>>             .addSink(...);
>>>
>>> Thanks,
>>> Navneeth
>>>
>>
>>
>
>

Re: Broadcast Config through Connected Stream

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

I think this is a valid approach, you can even use "operator state" in your map function to make the broadcast config state stateful.

Another approach would be to use internal APIs to hack an operator that has a keyed stream on one input and a broadcast stream on the second input. You can see that approach in action in the Beam Flink Runner [1] but I would strongly recommend against doing that because it is using internal APIs and if the other approach works for you I would stay with that.

Best,
Aljoscha

[1] https://github.com/apache/beam/blob/be9fb29901cf4a1ae7b4a9d8e9f25f4ea78359fd/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L488 <https://github.com/apache/beam/blob/be9fb29901cf4a1ae7b4a9d8e9f25f4ea78359fd/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L488>

> On 15. Sep 2017, at 07:04, Navneeth Krishnan <re...@gmail.com> wrote:
> 
> Hi,
> 
> Any suggestions on this could be achieved?
> 
> Thanks
> 
> On Thu, Sep 7, 2017 at 8:02 AM, Navneeth Krishnan <reachnavneeth2@gmail.com <ma...@gmail.com>> wrote:
> Hi All,
> 
> Any suggestions on this would really help. 
> 
> Thanks.
> 
> On Tue, Sep 5, 2017 at 2:42 PM, Navneeth Krishnan <reachnavneeth2@gmail.com <ma...@gmail.com>> wrote:
> Hi All,
> 
> I looked into an earlier email about the topic broadcast config through connected stream and I couldn't find the conclusion.
> 
> I can't do the below approach since I need the config to be published to all operator instances but I need keyed state for external querying.
> 
> streamToBeConfigured.connect(configMessageStream)
> 	.keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
> 	.flatMap(new FunctionWithConfigurableState())
> 	.addSink(...);
> 
> One of the resolution I found in that mail chain was below. I can use this to solve my issue but is this still the recommended approach?
> 
> stream1.connect(stream2)
>             .map(new MergeStreamsMapFunction()) // Holds transient state of the last ConfigMessage and maps Stream1's data to a Tuple2<Stream1Data, ConfigMessage>
>             .keyBy(new SomeIdKeySelector())         // KeyBy Id to allow for ValueStateDescriptors and semantically correct partitioning according to business logic
>             .flatMap(new StatefulFlatMapFunction()) // Save latest received ConfigMessage-Value in ValueStateDescriptor here
>             .addSink(...);
> 
> Thanks,
> Navneeth
> 
> 


Re: Broadcast Config through Connected Stream

Posted by Navneeth Krishnan <re...@gmail.com>.
Hi,

Any suggestions on this could be achieved?

Thanks

On Thu, Sep 7, 2017 at 8:02 AM, Navneeth Krishnan <re...@gmail.com>
wrote:

> Hi All,
>
> Any suggestions on this would really help.
>
> Thanks.
>
> On Tue, Sep 5, 2017 at 2:42 PM, Navneeth Krishnan <
> reachnavneeth2@gmail.com> wrote:
>
>> Hi All,
>>
>> I looked into an earlier email about the topic broadcast config through
>> connected stream and I couldn't find the conclusion.
>>
>> I can't do the below approach since I need the config to be published to
>> all operator instances but I need keyed state for external querying.
>>
>> streamToBeConfigured.connect(configMessageStream)
>> .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
>> .flatMap(new FunctionWithConfigurableState())
>> .addSink(...);
>>
>> One of the resolution I found in that mail chain was below. I can use
>> this to solve my issue but is this still the recommended approach?
>>
>> stream1.connect(stream2)
>>             .map(new MergeStreamsMapFunction()) // Holds transient state
>> of the last ConfigMessage and maps Stream1's data to a Tuple2<Stream1Data,
>> ConfigMessage>
>>             .keyBy(new SomeIdKeySelector())         // KeyBy Id to allow
>> for ValueStateDescriptors and semantically correct partitioning according
>> to business logic
>>             .flatMap(new StatefulFlatMapFunction()) // Save latest
>> received ConfigMessage-Value in ValueStateDescriptor here
>>             .addSink(...);
>>
>> Thanks,
>> Navneeth
>>
>
>

Re: Broadcast Config through Connected Stream

Posted by Navneeth Krishnan <re...@gmail.com>.
Hi All,

Any suggestions on this would really help.

Thanks.

On Tue, Sep 5, 2017 at 2:42 PM, Navneeth Krishnan <re...@gmail.com>
wrote:

> Hi All,
>
> I looked into an earlier email about the topic broadcast config through
> connected stream and I couldn't find the conclusion.
>
> I can't do the below approach since I need the config to be published to
> all operator instances but I need keyed state for external querying.
>
> streamToBeConfigured.connect(configMessageStream)
> .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
> .flatMap(new FunctionWithConfigurableState())
> .addSink(...);
>
> One of the resolution I found in that mail chain was below. I can use this
> to solve my issue but is this still the recommended approach?
>
> stream1.connect(stream2)
>             .map(new MergeStreamsMapFunction()) // Holds transient state
> of the last ConfigMessage and maps Stream1's data to a Tuple2<Stream1Data,
> ConfigMessage>
>             .keyBy(new SomeIdKeySelector())         // KeyBy Id to allow
> for ValueStateDescriptors and semantically correct partitioning according
> to business logic
>             .flatMap(new StatefulFlatMapFunction()) // Save latest
> received ConfigMessage-Value in ValueStateDescriptor here
>             .addSink(...);
>
> Thanks,
> Navneeth
>