You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Averell <lv...@gmail.com> on 2019/05/07 08:16:00 UTC

How to export all not-null keyed ValueState

Hi,

I have a keyed value state which is available for only about 1% the total
number of keyed values that I have. Is there any way to get the values of
all those state values? 
I looked at the queryable state option, but it looks like supporting
querying by keyed value only. 

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How to export all not-null keyed ValueState

Posted by Averell Huyen Levan <lv...@gmail.com>.
Thank you very much, Fabian.

Regards,
Averell

On Fri, May 10, 2019 at 9:46 PM Fabian Hueske <fh...@gmail.com> wrote:

> Hi Averell,
>
> I'd go with your approach any state access (given that you use RocksDB
> keyed state) or deduplication of messages is going to be more expensive
> than a simple cast.
>
> Best, Fabian
>
> Am Fr., 10. Mai 2019 um 13:08 Uhr schrieb Averell Huyen Levan <
> lvhuyen@gmail.com>:
>
>> Hi Fabian,
>>
>> Thanks for that. However, as I mentioned in my previous email, that
>> implementation requires a lot of typecasting/object wrapping.
>> I tried to broadcast that Toggle stream - the toggles will be saved as a
>> MapState, and whenever an export trigger record arrived, I send out that
>> MapState. There's no use of applyToKeyedState in this implementation.
>> And the problem I got is I received duplicated output (one from each
>> parallelism-instance).
>>
>> Is there any option to modify the keyed state from within the
>> processBroadcastElement() method?
>>
>> Thanks a lot for your help.
>>
>> Regards,
>> Averell
>>
>>
>> On Fri, May 10, 2019 at 8:52 PM Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi Averell,
>>>
>>> Ah, sorry. I had assumed the toggle events where broadcasted anyway.
>>> Since you had both streams keyed, your current solution looks fine to me.
>>>
>>> Best,
>>> Fabian
>>>
>>> Am Fr., 10. Mai 2019 um 03:13 Uhr schrieb Averell Huyen Levan <
>>> lvhuyen@gmail.com>:
>>>
>>>> Hi Fabian,
>>>>
>>>> Sorry, but I am still confused about your guide. If I union the Toggle
>>>> stream with the StateReportTrigger stream, would that means I need to make
>>>> my Toggles broadcasted states? Or there's some way to modify the keyed
>>>> states from within the processBroadcastElement() method?
>>>>
>>>> I tried to implement the other direction (which I briefed in my
>>>> previous email). It seems working, but I am not confident in that, not sure
>>>> whether it has any flaws. Could you please give your comment?
>>>> In my view, this implementation causes a lot of type-casting for my
>>>> main data stream, which might cause a high impact on performance (my toggle
>>>> is on in only about 1% of the keys, and the rate of input1.left is less
>>>> than a millionth comparing to the rate of input1.right)
>>>>
>>>> /**
>>>>   * This KeyedBroadcastProcessFunction has:
>>>>   *    input1: a keyed `DataStream[Either[Toggle, MyEvent]]`:
>>>>   *       input1.left: Toggles in the form of a tuple (Key, Boolean).
>>>>   *          When Toggle._2 == true, records from input1.right for the same key will be forwarded to the main output.
>>>>   *          If it is false, records from input1.right for that same key will be dropped
>>>>   *       input1.right: the main data stream
>>>>   *
>>>>   *    input2: a broadcasted stream of StateReport triggers. When a record arrived on this stream,
>>>>   *       the current value of Toggles will be sent out via the outputTag
>>>>   */
>>>> class EventFilterAndExportToggles(outputTag: OutputTag[Toggle])
>>>>       extends KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent] {
>>>>
>>>>    val toggleStateDescriptor = new ValueStateDescriptor[Boolean]("MyEventToggle", classOf[Boolean])
>>>>
>>>>    override def processElement(in1: Either[Toggle, MyEvent],
>>>>                         readOnlyContext: KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent]#ReadOnlyContext,
>>>>                         collector: Collector[MyEvent]): Unit = {
>>>>       in1 match {
>>>>          case Left(toggle) =>
>>>>             getRuntimeContext.getState(toggleStateDescriptor).update(toggle._2)
>>>>          case Right(event) =>
>>>>             if (getRuntimeContext.getState(toggleStateDescriptor).value())
>>>>                collector.collect(event)
>>>>       }
>>>>    }
>>>>
>>>>    override def processBroadcastElement(in2: Any,
>>>>                                context: KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent]#Context,
>>>>                                collector: Collector[MyEvent]): Unit = {
>>>>       context.applyToKeyedState(toggleStateDescriptor, (k: Key, s: ValueState[Boolean]) =>
>>>>          if (s != null) context.output(outputTag, (k, s.value())))
>>>>    }
>>>> }
>>>>
>>>> Thanks for your help.
>>>> Regards,
>>>> Averell
>>>>
>>>> On Thu, May 9, 2019 at 7:31 PM Fabian Hueske <fh...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Passing a Context through a DataStream definitely does not work.
>>>>> You'd need to have the keyed state that you want to scan over in the
>>>>> KeyedBroadcastProcessFunction.
>>>>>
>>>>> For the toggle filter use case, you would need to have a unioned
>>>>> stream with Toggle and StateReport events.
>>>>> For the output, you can use side outputs to route the different
>>>>> outputs to different streams.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell <lv...@gmail.com>:
>>>>>
>>>>>> Thank you Congxian and Fabian.
>>>>>>
>>>>>> @Fabian: could you please give a bit more details? My understanding
>>>>>> is: to
>>>>>> pass the context itself and an OutputTag to the KeyedStateFunction
>>>>>> parameter
>>>>>> of  KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and
>>>>>> from
>>>>>> within that KeyedStateFunction.process() send out the side output. Am
>>>>>> I
>>>>>> understand your idea correctly?
>>>>>>
>>>>>> BTW, I have another question regarding KeyedBroadcastProcessFunction
>>>>>> best
>>>>>> practice: I am having two streams: Data and Toggle. The stream Toggle
>>>>>> is
>>>>>> just a keyed boolean stream, being used to filter data from the
>>>>>> stream Data.
>>>>>> And I am implementing that filter using a simple
>>>>>> RichCoFlatMapFunction.
>>>>>>
>>>>>> Now that I want to export the list of keys which are currently
>>>>>> toggled on.
>>>>>> Should I
>>>>>> (1) have one additional KeyedBroadcastProcessFunction operator (which
>>>>>> has
>>>>>> Toggle and BroadCast as the input streams), or
>>>>>> (2) replace that RichCoFlatMapFunction with a new
>>>>>> KeyedBroadcastProcessFunction, which has both functionalities: filter
>>>>>> and
>>>>>> export? Doing this would require unioning Toggle and Data into one
>>>>>> single
>>>>>> keyed stream.
>>>>>>
>>>>>> Thanks and best regards,
>>>>>> Averell
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Sent from:
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>>
>>>>>

Re: How to export all not-null keyed ValueState

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Averell,

I'd go with your approach any state access (given that you use RocksDB
keyed state) or deduplication of messages is going to be more expensive
than a simple cast.

Best, Fabian

Am Fr., 10. Mai 2019 um 13:08 Uhr schrieb Averell Huyen Levan <
lvhuyen@gmail.com>:

> Hi Fabian,
>
> Thanks for that. However, as I mentioned in my previous email, that
> implementation requires a lot of typecasting/object wrapping.
> I tried to broadcast that Toggle stream - the toggles will be saved as a
> MapState, and whenever an export trigger record arrived, I send out that
> MapState. There's no use of applyToKeyedState in this implementation. And
> the problem I got is I received duplicated output (one from each
> parallelism-instance).
>
> Is there any option to modify the keyed state from within the
> processBroadcastElement() method?
>
> Thanks a lot for your help.
>
> Regards,
> Averell
>
>
> On Fri, May 10, 2019 at 8:52 PM Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Averell,
>>
>> Ah, sorry. I had assumed the toggle events where broadcasted anyway.
>> Since you had both streams keyed, your current solution looks fine to me.
>>
>> Best,
>> Fabian
>>
>> Am Fr., 10. Mai 2019 um 03:13 Uhr schrieb Averell Huyen Levan <
>> lvhuyen@gmail.com>:
>>
>>> Hi Fabian,
>>>
>>> Sorry, but I am still confused about your guide. If I union the Toggle
>>> stream with the StateReportTrigger stream, would that means I need to make
>>> my Toggles broadcasted states? Or there's some way to modify the keyed
>>> states from within the processBroadcastElement() method?
>>>
>>> I tried to implement the other direction (which I briefed in my previous
>>> email). It seems working, but I am not confident in that, not sure whether
>>> it has any flaws. Could you please give your comment?
>>> In my view, this implementation causes a lot of type-casting for my main
>>> data stream, which might cause a high impact on performance (my toggle is
>>> on in only about 1% of the keys, and the rate of input1.left is less than a
>>> millionth comparing to the rate of input1.right)
>>>
>>> /**
>>>   * This KeyedBroadcastProcessFunction has:
>>>   *    input1: a keyed `DataStream[Either[Toggle, MyEvent]]`:
>>>   *       input1.left: Toggles in the form of a tuple (Key, Boolean).
>>>   *          When Toggle._2 == true, records from input1.right for the same key will be forwarded to the main output.
>>>   *          If it is false, records from input1.right for that same key will be dropped
>>>   *       input1.right: the main data stream
>>>   *
>>>   *    input2: a broadcasted stream of StateReport triggers. When a record arrived on this stream,
>>>   *       the current value of Toggles will be sent out via the outputTag
>>>   */
>>> class EventFilterAndExportToggles(outputTag: OutputTag[Toggle])
>>>       extends KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent] {
>>>
>>>    val toggleStateDescriptor = new ValueStateDescriptor[Boolean]("MyEventToggle", classOf[Boolean])
>>>
>>>    override def processElement(in1: Either[Toggle, MyEvent],
>>>                         readOnlyContext: KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent]#ReadOnlyContext,
>>>                         collector: Collector[MyEvent]): Unit = {
>>>       in1 match {
>>>          case Left(toggle) =>
>>>             getRuntimeContext.getState(toggleStateDescriptor).update(toggle._2)
>>>          case Right(event) =>
>>>             if (getRuntimeContext.getState(toggleStateDescriptor).value())
>>>                collector.collect(event)
>>>       }
>>>    }
>>>
>>>    override def processBroadcastElement(in2: Any,
>>>                                context: KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent]#Context,
>>>                                collector: Collector[MyEvent]): Unit = {
>>>       context.applyToKeyedState(toggleStateDescriptor, (k: Key, s: ValueState[Boolean]) =>
>>>          if (s != null) context.output(outputTag, (k, s.value())))
>>>    }
>>> }
>>>
>>> Thanks for your help.
>>> Regards,
>>> Averell
>>>
>>> On Thu, May 9, 2019 at 7:31 PM Fabian Hueske <fh...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Passing a Context through a DataStream definitely does not work.
>>>> You'd need to have the keyed state that you want to scan over in the
>>>> KeyedBroadcastProcessFunction.
>>>>
>>>> For the toggle filter use case, you would need to have a unioned stream
>>>> with Toggle and StateReport events.
>>>> For the output, you can use side outputs to route the different outputs
>>>> to different streams.
>>>>
>>>> Best, Fabian
>>>>
>>>> Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell <lv...@gmail.com>:
>>>>
>>>>> Thank you Congxian and Fabian.
>>>>>
>>>>> @Fabian: could you please give a bit more details? My understanding
>>>>> is: to
>>>>> pass the context itself and an OutputTag to the KeyedStateFunction
>>>>> parameter
>>>>> of  KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from
>>>>> within that KeyedStateFunction.process() send out the side output. Am I
>>>>> understand your idea correctly?
>>>>>
>>>>> BTW, I have another question regarding KeyedBroadcastProcessFunction
>>>>> best
>>>>> practice: I am having two streams: Data and Toggle. The stream Toggle
>>>>> is
>>>>> just a keyed boolean stream, being used to filter data from the stream
>>>>> Data.
>>>>> And I am implementing that filter using a simple RichCoFlatMapFunction.
>>>>>
>>>>> Now that I want to export the list of keys which are currently toggled
>>>>> on.
>>>>> Should I
>>>>> (1) have one additional KeyedBroadcastProcessFunction operator (which
>>>>> has
>>>>> Toggle and BroadCast as the input streams), or
>>>>> (2) replace that RichCoFlatMapFunction with a new
>>>>> KeyedBroadcastProcessFunction, which has both functionalities: filter
>>>>> and
>>>>> export? Doing this would require unioning Toggle and Data into one
>>>>> single
>>>>> keyed stream.
>>>>>
>>>>> Thanks and best regards,
>>>>> Averell
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Sent from:
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>
>>>>

Re: How to export all not-null keyed ValueState

Posted by Averell Huyen Levan <lv...@gmail.com>.
Hi Fabian,

Thanks for that. However, as I mentioned in my previous email, that
implementation requires a lot of typecasting/object wrapping.
I tried to broadcast that Toggle stream - the toggles will be saved as a
MapState, and whenever an export trigger record arrived, I send out that
MapState. There's no use of applyToKeyedState in this implementation. And
the problem I got is I received duplicated output (one from each
parallelism-instance).

Is there any option to modify the keyed state from within the
processBroadcastElement() method?

Thanks a lot for your help.

Regards,
Averell


On Fri, May 10, 2019 at 8:52 PM Fabian Hueske <fh...@gmail.com> wrote:

> Hi Averell,
>
> Ah, sorry. I had assumed the toggle events where broadcasted anyway.
> Since you had both streams keyed, your current solution looks fine to me.
>
> Best,
> Fabian
>
> Am Fr., 10. Mai 2019 um 03:13 Uhr schrieb Averell Huyen Levan <
> lvhuyen@gmail.com>:
>
>> Hi Fabian,
>>
>> Sorry, but I am still confused about your guide. If I union the Toggle
>> stream with the StateReportTrigger stream, would that means I need to make
>> my Toggles broadcasted states? Or there's some way to modify the keyed
>> states from within the processBroadcastElement() method?
>>
>> I tried to implement the other direction (which I briefed in my previous
>> email). It seems working, but I am not confident in that, not sure whether
>> it has any flaws. Could you please give your comment?
>> In my view, this implementation causes a lot of type-casting for my main
>> data stream, which might cause a high impact on performance (my toggle is
>> on in only about 1% of the keys, and the rate of input1.left is less than a
>> millionth comparing to the rate of input1.right)
>>
>> /**
>>   * This KeyedBroadcastProcessFunction has:
>>   *    input1: a keyed `DataStream[Either[Toggle, MyEvent]]`:
>>   *       input1.left: Toggles in the form of a tuple (Key, Boolean).
>>   *          When Toggle._2 == true, records from input1.right for the same key will be forwarded to the main output.
>>   *          If it is false, records from input1.right for that same key will be dropped
>>   *       input1.right: the main data stream
>>   *
>>   *    input2: a broadcasted stream of StateReport triggers. When a record arrived on this stream,
>>   *       the current value of Toggles will be sent out via the outputTag
>>   */
>> class EventFilterAndExportToggles(outputTag: OutputTag[Toggle])
>>       extends KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent] {
>>
>>    val toggleStateDescriptor = new ValueStateDescriptor[Boolean]("MyEventToggle", classOf[Boolean])
>>
>>    override def processElement(in1: Either[Toggle, MyEvent],
>>                         readOnlyContext: KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent]#ReadOnlyContext,
>>                         collector: Collector[MyEvent]): Unit = {
>>       in1 match {
>>          case Left(toggle) =>
>>             getRuntimeContext.getState(toggleStateDescriptor).update(toggle._2)
>>          case Right(event) =>
>>             if (getRuntimeContext.getState(toggleStateDescriptor).value())
>>                collector.collect(event)
>>       }
>>    }
>>
>>    override def processBroadcastElement(in2: Any,
>>                                context: KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent]#Context,
>>                                collector: Collector[MyEvent]): Unit = {
>>       context.applyToKeyedState(toggleStateDescriptor, (k: Key, s: ValueState[Boolean]) =>
>>          if (s != null) context.output(outputTag, (k, s.value())))
>>    }
>> }
>>
>> Thanks for your help.
>> Regards,
>> Averell
>>
>> On Thu, May 9, 2019 at 7:31 PM Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Passing a Context through a DataStream definitely does not work.
>>> You'd need to have the keyed state that you want to scan over in the
>>> KeyedBroadcastProcessFunction.
>>>
>>> For the toggle filter use case, you would need to have a unioned stream
>>> with Toggle and StateReport events.
>>> For the output, you can use side outputs to route the different outputs
>>> to different streams.
>>>
>>> Best, Fabian
>>>
>>> Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell <lv...@gmail.com>:
>>>
>>>> Thank you Congxian and Fabian.
>>>>
>>>> @Fabian: could you please give a bit more details? My understanding is:
>>>> to
>>>> pass the context itself and an OutputTag to the KeyedStateFunction
>>>> parameter
>>>> of  KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from
>>>> within that KeyedStateFunction.process() send out the side output. Am I
>>>> understand your idea correctly?
>>>>
>>>> BTW, I have another question regarding KeyedBroadcastProcessFunction
>>>> best
>>>> practice: I am having two streams: Data and Toggle. The stream Toggle is
>>>> just a keyed boolean stream, being used to filter data from the stream
>>>> Data.
>>>> And I am implementing that filter using a simple RichCoFlatMapFunction.
>>>>
>>>> Now that I want to export the list of keys which are currently toggled
>>>> on.
>>>> Should I
>>>> (1) have one additional KeyedBroadcastProcessFunction operator (which
>>>> has
>>>> Toggle and BroadCast as the input streams), or
>>>> (2) replace that RichCoFlatMapFunction with a new
>>>> KeyedBroadcastProcessFunction, which has both functionalities: filter
>>>> and
>>>> export? Doing this would require unioning Toggle and Data into one
>>>> single
>>>> keyed stream.
>>>>
>>>> Thanks and best regards,
>>>> Averell
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>
>>>

Re: How to export all not-null keyed ValueState

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Averell,

Ah, sorry. I had assumed the toggle events where broadcasted anyway.
Since you had both streams keyed, your current solution looks fine to me.

Best,
Fabian

Am Fr., 10. Mai 2019 um 03:13 Uhr schrieb Averell Huyen Levan <
lvhuyen@gmail.com>:

> Hi Fabian,
>
> Sorry, but I am still confused about your guide. If I union the Toggle
> stream with the StateReportTrigger stream, would that means I need to make
> my Toggles broadcasted states? Or there's some way to modify the keyed
> states from within the processBroadcastElement() method?
>
> I tried to implement the other direction (which I briefed in my previous
> email). It seems working, but I am not confident in that, not sure whether
> it has any flaws. Could you please give your comment?
> In my view, this implementation causes a lot of type-casting for my main
> data stream, which might cause a high impact on performance (my toggle is
> on in only about 1% of the keys, and the rate of input1.left is less than a
> millionth comparing to the rate of input1.right)
>
> /**
>   * This KeyedBroadcastProcessFunction has:
>   *    input1: a keyed `DataStream[Either[Toggle, MyEvent]]`:
>   *       input1.left: Toggles in the form of a tuple (Key, Boolean).
>   *          When Toggle._2 == true, records from input1.right for the same key will be forwarded to the main output.
>   *          If it is false, records from input1.right for that same key will be dropped
>   *       input1.right: the main data stream
>   *
>   *    input2: a broadcasted stream of StateReport triggers. When a record arrived on this stream,
>   *       the current value of Toggles will be sent out via the outputTag
>   */
> class EventFilterAndExportToggles(outputTag: OutputTag[Toggle])
>       extends KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent] {
>
>    val toggleStateDescriptor = new ValueStateDescriptor[Boolean]("MyEventToggle", classOf[Boolean])
>
>    override def processElement(in1: Either[Toggle, MyEvent],
>                         readOnlyContext: KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent]#ReadOnlyContext,
>                         collector: Collector[MyEvent]): Unit = {
>       in1 match {
>          case Left(toggle) =>
>             getRuntimeContext.getState(toggleStateDescriptor).update(toggle._2)
>          case Right(event) =>
>             if (getRuntimeContext.getState(toggleStateDescriptor).value())
>                collector.collect(event)
>       }
>    }
>
>    override def processBroadcastElement(in2: Any,
>                                context: KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent]#Context,
>                                collector: Collector[MyEvent]): Unit = {
>       context.applyToKeyedState(toggleStateDescriptor, (k: Key, s: ValueState[Boolean]) =>
>          if (s != null) context.output(outputTag, (k, s.value())))
>    }
> }
>
> Thanks for your help.
> Regards,
> Averell
>
> On Thu, May 9, 2019 at 7:31 PM Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi,
>>
>> Passing a Context through a DataStream definitely does not work.
>> You'd need to have the keyed state that you want to scan over in the
>> KeyedBroadcastProcessFunction.
>>
>> For the toggle filter use case, you would need to have a unioned stream
>> with Toggle and StateReport events.
>> For the output, you can use side outputs to route the different outputs
>> to different streams.
>>
>> Best, Fabian
>>
>> Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell <lv...@gmail.com>:
>>
>>> Thank you Congxian and Fabian.
>>>
>>> @Fabian: could you please give a bit more details? My understanding is:
>>> to
>>> pass the context itself and an OutputTag to the KeyedStateFunction
>>> parameter
>>> of  KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from
>>> within that KeyedStateFunction.process() send out the side output. Am I
>>> understand your idea correctly?
>>>
>>> BTW, I have another question regarding KeyedBroadcastProcessFunction best
>>> practice: I am having two streams: Data and Toggle. The stream Toggle is
>>> just a keyed boolean stream, being used to filter data from the stream
>>> Data.
>>> And I am implementing that filter using a simple RichCoFlatMapFunction.
>>>
>>> Now that I want to export the list of keys which are currently toggled
>>> on.
>>> Should I
>>> (1) have one additional KeyedBroadcastProcessFunction operator (which has
>>> Toggle and BroadCast as the input streams), or
>>> (2) replace that RichCoFlatMapFunction with a new
>>> KeyedBroadcastProcessFunction, which has both functionalities: filter and
>>> export? Doing this would require unioning Toggle and Data into one single
>>> keyed stream.
>>>
>>> Thanks and best regards,
>>> Averell
>>>
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>

Re: How to export all not-null keyed ValueState

Posted by Averell Huyen Levan <lv...@gmail.com>.
Hi Fabian,

Sorry, but I am still confused about your guide. If I union the Toggle
stream with the StateReportTrigger stream, would that means I need to make
my Toggles broadcasted states? Or there's some way to modify the keyed
states from within the processBroadcastElement() method?

I tried to implement the other direction (which I briefed in my previous
email). It seems working, but I am not confident in that, not sure whether
it has any flaws. Could you please give your comment?
In my view, this implementation causes a lot of type-casting for my main
data stream, which might cause a high impact on performance (my toggle is
on in only about 1% of the keys, and the rate of input1.left is less than a
millionth comparing to the rate of input1.right)

/**
  * This KeyedBroadcastProcessFunction has:
  *    input1: a keyed `DataStream[Either[Toggle, MyEvent]]`:
  *       input1.left: Toggles in the form of a tuple (Key, Boolean).
  *          When Toggle._2 == true, records from input1.right for the
same key will be forwarded to the main output.
  *          If it is false, records from input1.right for that same
key will be dropped
  *       input1.right: the main data stream
  *
  *    input2: a broadcasted stream of StateReport triggers. When a
record arrived on this stream,
  *       the current value of Toggles will be sent out via the outputTag
  */
class EventFilterAndExportToggles(outputTag: OutputTag[Toggle])
      extends KeyedBroadcastProcessFunction[Key, Either[Toggle,
MyEvent], Any, MyEvent] {

   val toggleStateDescriptor = new
ValueStateDescriptor[Boolean]("MyEventToggle", classOf[Boolean])

   override def processElement(in1: Either[Toggle, MyEvent],
                        readOnlyContext:
KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any,
MyEvent]#ReadOnlyContext,
                        collector: Collector[MyEvent]): Unit = {
      in1 match {
         case Left(toggle) =>
            getRuntimeContext.getState(toggleStateDescriptor).update(toggle._2)
         case Right(event) =>
            if (getRuntimeContext.getState(toggleStateDescriptor).value())
               collector.collect(event)
      }
   }

   override def processBroadcastElement(in2: Any,
                               context:
KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any,
MyEvent]#Context,
                               collector: Collector[MyEvent]): Unit = {
      context.applyToKeyedState(toggleStateDescriptor, (k: Key, s:
ValueState[Boolean]) =>
         if (s != null) context.output(outputTag, (k, s.value())))
   }
}

Thanks for your help.
Regards,
Averell

On Thu, May 9, 2019 at 7:31 PM Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> Passing a Context through a DataStream definitely does not work.
> You'd need to have the keyed state that you want to scan over in the
> KeyedBroadcastProcessFunction.
>
> For the toggle filter use case, you would need to have a unioned stream
> with Toggle and StateReport events.
> For the output, you can use side outputs to route the different outputs to
> different streams.
>
> Best, Fabian
>
> Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell <lv...@gmail.com>:
>
>> Thank you Congxian and Fabian.
>>
>> @Fabian: could you please give a bit more details? My understanding is: to
>> pass the context itself and an OutputTag to the KeyedStateFunction
>> parameter
>> of  KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from
>> within that KeyedStateFunction.process() send out the side output. Am I
>> understand your idea correctly?
>>
>> BTW, I have another question regarding KeyedBroadcastProcessFunction best
>> practice: I am having two streams: Data and Toggle. The stream Toggle is
>> just a keyed boolean stream, being used to filter data from the stream
>> Data.
>> And I am implementing that filter using a simple RichCoFlatMapFunction.
>>
>> Now that I want to export the list of keys which are currently toggled on.
>> Should I
>> (1) have one additional KeyedBroadcastProcessFunction operator (which has
>> Toggle and BroadCast as the input streams), or
>> (2) replace that RichCoFlatMapFunction with a new
>> KeyedBroadcastProcessFunction, which has both functionalities: filter and
>> export? Doing this would require unioning Toggle and Data into one single
>> keyed stream.
>>
>> Thanks and best regards,
>> Averell
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>

Re: How to export all not-null keyed ValueState

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Passing a Context through a DataStream definitely does not work.
You'd need to have the keyed state that you want to scan over in the
KeyedBroadcastProcessFunction.

For the toggle filter use case, you would need to have a unioned stream
with Toggle and StateReport events.
For the output, you can use side outputs to route the different outputs to
different streams.

Best, Fabian

Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell <lv...@gmail.com>:

> Thank you Congxian and Fabian.
>
> @Fabian: could you please give a bit more details? My understanding is: to
> pass the context itself and an OutputTag to the KeyedStateFunction
> parameter
> of  KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from
> within that KeyedStateFunction.process() send out the side output. Am I
> understand your idea correctly?
>
> BTW, I have another question regarding KeyedBroadcastProcessFunction best
> practice: I am having two streams: Data and Toggle. The stream Toggle is
> just a keyed boolean stream, being used to filter data from the stream
> Data.
> And I am implementing that filter using a simple RichCoFlatMapFunction.
>
> Now that I want to export the list of keys which are currently toggled on.
> Should I
> (1) have one additional KeyedBroadcastProcessFunction operator (which has
> Toggle and BroadCast as the input streams), or
> (2) replace that RichCoFlatMapFunction with a new
> KeyedBroadcastProcessFunction, which has both functionalities: filter and
> export? Doing this would require unioning Toggle and Data into one single
> keyed stream.
>
> Thanks and best regards,
> Averell
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: How to export all not-null keyed ValueState

Posted by Averell <lv...@gmail.com>.
Thank you Congxian and Fabian.

@Fabian: could you please give a bit more details? My understanding is: to
pass the context itself and an OutputTag to the KeyedStateFunction parameter
of  KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from
within that KeyedStateFunction.process() send out the side output. Am I
understand your idea correctly?

BTW, I have another question regarding KeyedBroadcastProcessFunction best
practice: I am having two streams: Data and Toggle. The stream Toggle is
just a keyed boolean stream, being used to filter data from the stream Data.
And I am implementing that filter using a simple RichCoFlatMapFunction.

Now that I want to export the list of keys which are currently toggled on.
Should I
(1) have one additional KeyedBroadcastProcessFunction operator (which has
Toggle and BroadCast as the input streams), or 
(2) replace that RichCoFlatMapFunction with a new
KeyedBroadcastProcessFunction, which has both functionalities: filter and
export? Doing this would require unioning Toggle and Data into one single
keyed stream.

Thanks and best regards,
Averell 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How to export all not-null keyed ValueState

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

The KeyedBroadcastProcessFunction has a method to iterate over all keys of
a keyed state.
This function is available via the Context object of the processBroadcast()
method.
Hence you need a broadcasted message to trigger the operation.

Best, Fabian

Am Do., 9. Mai 2019 um 08:46 Uhr schrieb Congxian Qiu <
qcx978132955@gmail.com>:

> Hi, Averell
>
> AFAIK, we can't get all the key-values from value state, but MapState has
> a function called `entries` can do this, maybe can use MapState as a
> workaround.
> On May 7, 2019, 16:16 +0800, Averell <lv...@gmail.com>, wrote:
>
> Hi,
>
> I have a keyed value state which is available for only about 1% the total
> number of keyed values that I have. Is there any way to get the values of
> all those state values?
> I looked at the queryable state option, but it looks like supporting
> querying by keyed value only.
>
> Thanks and best regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>

Re: How to export all not-null keyed ValueState

Posted by Congxian Qiu <qc...@gmail.com>.
Hi, Averell

AFAIK, we can't get all the key-values from value state, but MapState has a function called `entries` can do this, maybe can use MapState as a workaround.
On May 7, 2019, 16:16 +0800, Averell <lv...@gmail.com>, wrote:
> Hi,
>
> I have a keyed value state which is available for only about 1% the total
> number of keyed values that I have. Is there any way to get the values of
> all those state values?
> I looked at the queryable state option, but it looks like supporting
> querying by keyed value only.
>
> Thanks and best regards,
> Averell
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/