You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Peter Ertl <pe...@gmx.net> on 2017/07/29 16:51:59 UTC

multiple streams with multiple actions - proper way?

Hello Flink People :-)


I am trying to get my head around flink - is it a supported use case to register multiple streams with possibly more than one transformation / action per stream?


def main(args: Array[String]): Unit = {

  val env = StreamExecutionEnvironment.getExecutionEnvironment

  val prop = new Properties()
  prop.setProperty("bootstrap.servers", "vmi:9092")
  
  // first stream
  val ins = env.addSource(new FlinkKafkaConsumer010("foo", new SimpleStringSchema(), prop))
    .map(s => "transformation-1: " + s)

  ins.map(s => "transformation-2:" + s).print() // one action
  ins.map(s => "transformation-3:" + s).print() // one more action
  ins.map(s => "transformation-4:" + s).print() // another action on the same stream

  // second, different stream
  val ins2 = env.addSource(new FlinkKafkaConsumer010("bar", new SimpleStringSchema(), prop))
    .map(s => "transformation-5: " + s)

  ins2.map(s => "transformation-7:" + s).print() // action
  ins2.map(s => "transformation-6:" + s).print() // different action
  
  env.execute("run all streams with multiple actions attached")
}


Is this program abusing flnk or is this just how you are supposed to do things?

also, how many threads will this programm consume when running with parallelism = 4 ?


Best regards
Peter


Re: multiple streams with multiple actions - proper way?

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Peter,

this kind of use case is supported, but it is best practice to split
independent pipelines into individual jobs.
One reason for that is to isolate failures and restarts.
For example, I would split the program you posted into two programs, one
for the "foo" topic and one of the "bar" topic. Depending on the complexity
of the operations, you might also want to split it further.

Best, Fabian




2017-07-29 18:51 GMT+02:00 Peter Ertl <pe...@gmx.net>:

> Hello Flink People :-)
>
>
> I am trying to get my head around flink - is it a supported use case to
> register multiple streams with possibly more than one transformation /
> action per stream?
>
>
> def main(args: Array[String]): Unit = {
>
>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>
>   val prop = new Properties()
>   prop.setProperty("bootstrap.servers", "vmi:9092")
>
>   // first stream
>   val ins = env.addSource(new FlinkKafkaConsumer010("foo", new SimpleStringSchema(), prop))
>     .map(s => "transformation-1: " + s)
>
>   ins.map(s => "transformation-2:" + s).print() // one action
>   ins.map(s => "transformation-3:" + s).print() // one more action
>   ins.map(s => "transformation-4:" + s).print() // another action on the same stream
>
>   // second, different stream
>   val ins2 = env.addSource(new FlinkKafkaConsumer010("bar", new SimpleStringSchema(), prop))
>     .map(s => "transformation-5: " + s)
>
>   ins2.map(s => "transformation-7:" + s).print() // action
>   ins2.map(s => "transformation-6:" + s).print() // different action
>
>   env.execute("run all streams with multiple actions attached")
> }
>
>
>
> Is this program abusing flnk or is this just how you are supposed to do
> things?
>
> also, how many threads will this programm consume when running with
> parallelism = 4 ?
>
>
> Best regards
> Peter
>
>