You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Andy Hoang <an...@parcelperform.com> on 2019/06/06 10:29:02 UTC

Weird behavior with CoFlatMapFunction

Hi guys,

I want to merge 2 diffrent stream, one is config stream and the other is the value json, to check again that config. Its seem like the CoFlatMapFunction should be used.
Here my sample:

    val filterStream: ConnectedStreams[ControlEvent, JsValue]=(specificControlStream).connect(eventStream)
    class FilterFunction() extends CoFlatMapFunction[ControlEvent, JsValue, FilteredEvent] {
      var configs = new ControlEvent(1, "a”) # default
      PPLogger.getActivityLogger.info("# init ")
      override def flatMap1(value: ControlEvent, out: Collector[FilteredEvent]): Unit = {
        PPLogger.getActivityLogger.info("# f1 value %s ".format(value.jsonPath))
        configs =  value
        PPLogger.getActivityLogger.info("# f1 current config %s ".format(configs))
      }
      override def flatMap2(value: JsValue, out: Collector[FilteredEvent]): Unit = {
          PPLogger.getActivityLogger.info("# f2 current config %s ".format(configs))
          PPLogger.getActivityLogger.info("# f2 current value %s ".format(value.toString()))
      }
    }
    val x = new FilterFunction()
    filterStream.flatMap(x)
….


How I sent message (kafka)
——————
+send eventStream msg
+send configStream msg
+send eventStream msg
——————

My log result look like this
2019-06-06 10:15:21 INFO  activity                                                     %PARSER_ERROR[x] - # f2 current config ControlEvent(1,a)
2019-06-06 10:15:21 INFO  activity                                                     %PARSER_ERROR[x] - # f2 current value {"organization_id":1,"randomeText":"text_random35","value":{"to_compare":"zzzzz_xxxxxx_4"}}
2019-06-06 10:15:24 INFO  activity                                                     %PARSER_ERROR[x] - # f1 value zzzzz_xxxxxx_2
2019-06-06 10:15:24 INFO  activity                                                     %PARSER_ERROR[x] - # f1 current config ControlEvent(1,zzzzz_xxxxxx_2)
2019-06-06 10:15:30 INFO  activity                                                     %PARSER_ERROR[x] - # f2 current config ControlEvent(1,a)
2019-06-06 10:15:30 INFO  activity                                                     %PARSER_ERROR[x] - # f2 current value {"organization_id":1,"randomeText":"text_random35","value":{"to_compare":"zzzzz_xxxxxx_4"}}


My understanding is:
when flatmap1 is run, the config will be changed, and this change will share state (configs is updated) with flatmap2.
But the result log is really different

-----------
I tried use configs as mutable.ListBuffer to collect the history then tried to update configs in both flatmap1 and flatmap2, I see that those two configs variable from flatmap1 and flatmap2 is 2 different variable (but same name in the class!).


My env:
- Flink minicluster (sbt run)
- Flink 1.7.2
- Kafka 1.0
- Scala 2.11




Re: Weird behavior with CoFlatMapFunction

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

There are a few things to point out about your example:

1. The the CoFlatMapFunction is probably executed in parallel. The
configuration is only applied to one of the parallel function instances.
You probably want to broadcast the configuration changes to all function
instances. Have a look at the broadcast state pattern [1] [2].
2. Flink does not guaratee the order in which the flatmap1 and flatmap2
methods are called.
3. You probably want to store the configuration in Flink managed state that
can be recovered in case of a failure. Again, have a look at the broadcast
state pattern.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/broadcast_state.html
[2]
https://www.ververica.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink

Am Do., 6. Juni 2019 um 12:29 Uhr schrieb Andy Hoang <andy@parcelperform.com
>:

> Hi guys,
>
> I want to merge 2 diffrent stream, one is config stream and the other is
> the value json, to check again that config. Its seem like the
> CoFlatMapFunction should be used.
> Here my sample:
>
>     val filterStream: ConnectedStreams[ControlEvent,
> JsValue]=(specificControlStream).connect(eventStream)
>     class FilterFunction() extends CoFlatMapFunction[ControlEvent,
> JsValue, FilteredEvent] {
>       var configs = new ControlEvent(1, "a”) # default
>       PPLogger.getActivityLogger.info("# init ")
>       override def flatMap1(value: ControlEvent, out:
> Collector[FilteredEvent]): Unit = {
>         PPLogger.getActivityLogger.info("# f1 value %s
> ".format(value.jsonPath))
>         configs =  value
>         PPLogger.getActivityLogger.info("# f1 current config %s
> ".format(configs))
>       }
>       override def flatMap2(value: JsValue, out:
> Collector[FilteredEvent]): Unit = {
>           PPLogger.getActivityLogger.info("# f2 current config %s
> ".format(configs))
>           PPLogger.getActivityLogger.info("# f2 current value %s
> ".format(value.toString()))
>       }
>     }
>     val x = new FilterFunction()
>     filterStream.flatMap(x)
> ….
>
>
> How I sent message (kafka)
> ——————
> +send eventStream msg
> +send configStream msg
> +send eventStream msg
> ——————
>
> My log result look like this
> 2019-06-06 10:15:21 INFO  activity
>              %PARSER_ERROR[x] - # f2 current config ControlEvent(1,a)
> 2019-06-06 10:15:21 INFO  activity
>              %PARSER_ERROR[x] - # f2 current value
> {"organization_id":1,"randomeText":"text_random35","value":{"to_compare":"zzzzz_xxxxxx_4"}}
> 2019-06-06 10:15:24 INFO  activity
>              %PARSER_ERROR[x] - # f1 value zzzzz_xxxxxx_2
> 2019-06-06 10:15:24 INFO  activity
>              %PARSER_ERROR[x] - # f1 current config
> ControlEvent(1,zzzzz_xxxxxx_2)
> 2019-06-06 10:15:30 INFO  activity
>              %PARSER_ERROR[x] - # f2 current config ControlEvent(1,a)
> 2019-06-06 10:15:30 INFO  activity
>              %PARSER_ERROR[x] - # f2 current value
> {"organization_id":1,"randomeText":"text_random35","value":{"to_compare":"zzzzz_xxxxxx_4"}}
>
>
> My understanding is:
> when flatmap1 is run, the config will be changed, and this change will
> share state (configs is updated) with flatmap2.
> But the result log is really different
>
> -----------
> I tried use configs as mutable.ListBuffer to collect the history then
> tried to update configs in both flatmap1 and flatmap2, I see that those two
> configs variable from flatmap1 and flatmap2 is 2 different variable (but
> same name in the class!).
>
>
> My env:
> - Flink minicluster (sbt run)
> - Flink 1.7.2
> - Kafka 1.0
> - Scala 2.11
>
>
>
>