You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Deepak Jha <dk...@gmail.com> on 2016/03/30 20:19:51 UTC

Writing multiple streams to multiple kafka

Hi,
I'm building a pipeline using Flink using Kafka as source and sink. As part
of the this pipeline I have multiple stages in my run command and I would
like to publish some substages output into separate kafka topic.
My question is can I write multiple stages of run to multiple kafka topics ?

private val env = StreamExecutionEnvironment.getExecutionEnvironment
private val src = env.addSource(Source.kafka(streams.abc.topic))

override def run(stream: DataStream[TypeX]) : = {

  val stage1 = stream
                       .map(doA)
                       .map(doB)
                       .map(doC)

 val stage2 = stage1.map(doD)  *// Returns (isTrue: Boolean, somethingElse:
TypeT)*

val stage3 = stage2.filter(_.isTrue)
*stage3.addSink(Write_To_Kafka_Topic_Y)  // Can I do it outside run method
?*
val stage4 = stage2.filter(! _.isTrue)

stage4.map(_.toString)
}

run(src).addSink(Write_To_Kafka_Topic_X)


Ideally I will not prefer to call addSink method inside run (as mentioned
in bold lines above).
-- 
Thanks,
Deepak Jha

Re: Writing multiple streams to multiple kafka

Posted by Deepak Jha <dk...@gmail.com>.
It works... Thanks

On Thu, Mar 31, 2016 at 2:23 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> yes you can output the stages to several different Kafka Topics. If you
> don't want to call addSink inside the run() method you somehow have to
> return the handle to your stage3 DataStream, for example:
>
> private val env = StreamExecutionEnvironment.getExecutionEnvironment
> private val src = env.addSource(Source.kafka(streams.abc.topic))
>
> override def run(stream: DataStream[TypeX]) : = {
>
>   val stage1 = stream
>                        .map(doA)
>                        .map(doB)
>                        .map(doC)
>
>  val stage2 = stage1.map(doD)  *// Returns (isTrue: Boolean, somethingElse:
> TypeT)*
>
> val stage3 = stage2.filter(_.isTrue)
> val stage4 = stage2.filter(! _.isTrue)
>
> (stage3, stage4.map(_.toString)) // return both stages
> }
>
> val (stage3, stage4) = run(src)
> stage3.addSink(Write_To_Kafka_Topic_Y)
> stage4.addSink(Write_To_Kafka_Topic_X)
>
>
> On Wed, 30 Mar 2016 at 20:19 Deepak Jha <dk...@gmail.com> wrote:
>
> > Hi,
> > I'm building a pipeline using Flink using Kafka as source and sink. As
> part
> > of the this pipeline I have multiple stages in my run command and I would
> > like to publish some substages output into separate kafka topic.
> > My question is can I write multiple stages of run to multiple kafka
> topics
> > ?
> >
> > private val env = StreamExecutionEnvironment.getExecutionEnvironment
> > private val src = env.addSource(Source.kafka(streams.abc.topic))
> >
> > override def run(stream: DataStream[TypeX]) : = {
> >
> >   val stage1 = stream
> >                        .map(doA)
> >                        .map(doB)
> >                        .map(doC)
> >
> >  val stage2 = stage1.map(doD)  *// Returns (isTrue: Boolean,
> somethingElse:
> > TypeT)*
> >
> > val stage3 = stage2.filter(_.isTrue)
> > *stage3.addSink(Write_To_Kafka_Topic_Y)  // Can I do it outside run
> method
> > ?*
> > val stage4 = stage2.filter(! _.isTrue)
> >
> > stage4.map(_.toString)
> > }
> >
> > run(src).addSink(Write_To_Kafka_Topic_X)
> >
> >
> > Ideally I will not prefer to call addSink method inside run (as mentioned
> > in bold lines above).
> > --
> > Thanks,
> > Deepak Jha
> >
>



-- 
Thanks,
Deepak Jha

Re: Writing multiple streams to multiple kafka

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
yes you can output the stages to several different Kafka Topics. If you
don't want to call addSink inside the run() method you somehow have to
return the handle to your stage3 DataStream, for example:

private val env = StreamExecutionEnvironment.getExecutionEnvironment
private val src = env.addSource(Source.kafka(streams.abc.topic))

override def run(stream: DataStream[TypeX]) : = {

  val stage1 = stream
                       .map(doA)
                       .map(doB)
                       .map(doC)

 val stage2 = stage1.map(doD)  *// Returns (isTrue: Boolean, somethingElse:
TypeT)*

val stage3 = stage2.filter(_.isTrue)
val stage4 = stage2.filter(! _.isTrue)

(stage3, stage4.map(_.toString)) // return both stages
}

val (stage3, stage4) = run(src)
stage3.addSink(Write_To_Kafka_Topic_Y)
stage4.addSink(Write_To_Kafka_Topic_X)


On Wed, 30 Mar 2016 at 20:19 Deepak Jha <dk...@gmail.com> wrote:

> Hi,
> I'm building a pipeline using Flink using Kafka as source and sink. As part
> of the this pipeline I have multiple stages in my run command and I would
> like to publish some substages output into separate kafka topic.
> My question is can I write multiple stages of run to multiple kafka topics
> ?
>
> private val env = StreamExecutionEnvironment.getExecutionEnvironment
> private val src = env.addSource(Source.kafka(streams.abc.topic))
>
> override def run(stream: DataStream[TypeX]) : = {
>
>   val stage1 = stream
>                        .map(doA)
>                        .map(doB)
>                        .map(doC)
>
>  val stage2 = stage1.map(doD)  *// Returns (isTrue: Boolean, somethingElse:
> TypeT)*
>
> val stage3 = stage2.filter(_.isTrue)
> *stage3.addSink(Write_To_Kafka_Topic_Y)  // Can I do it outside run method
> ?*
> val stage4 = stage2.filter(! _.isTrue)
>
> stage4.map(_.toString)
> }
>
> run(src).addSink(Write_To_Kafka_Topic_X)
>
>
> Ideally I will not prefer to call addSink method inside run (as mentioned
> in bold lines above).
> --
> Thanks,
> Deepak Jha
>