You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Flink Developer <de...@protonmail.com> on 2018/11/11 01:16:11 UTC

How to use multiple sources with multiple sinks

How can I configure 1 Flink Job (stream execution environment, parallelism set to 10) to have multiple kafka sources where each has its' own sink to s3.

For example, let's say the sources are:

- Kafka Topic A - Consumer (10 partitions)
- Kafka Topic B - Consumer (10 partitions)
- Kafka Topic C - Consumer (10 partitions)

And let's say the sinks are:

- BucketingSink to S3 in bucket: s3://kafka_topic_a/<data files>
- BucketingSink to S3 in bucket: s3://kafka_topic_b/<data files>
- BucketingSink to S3 in bucket: s3://kafka_topic_c/<data files>

And between source 1 to sink 1, I would like to perform unique processing. Between source 2 to sink 2, it should have unique processing and between source 3 to sink 3, it should also have unique processing.

How can this be achieved? Is there an example?

Re: How to use multiple sources with multiple sinks

Posted by vino yang <ya...@gmail.com>.
Hi,

If you are expressing a job that contains three pairs of source->sinks that
are isolated from each other, then Flink supports this form of Job.
It is not much different from a single source->sink, just changed from a
DataStream to three DataStreams.

For example,

*DataStream ds1 = xxx*
*ds1.addSink();*

*DataStream ds2 = xxx*
*ds2.addSink();*

*DataStream ds3 = xxx*
*ds3.addSink();*

Thanks, vino.

Flink Developer <de...@protonmail.com> 于2018年11月11日周日 上午9:24写道:

> How can I configure 1 Flink Job (stream execution environment, parallelism
> set to 10) to have multiple kafka sources where each has its' own sink to
> s3.
>
> For example, let's say the sources are:
>
>    1. Kafka Topic A - Consumer (10 partitions)
>    2. Kafka Topic B - Consumer (10 partitions)
>    3. Kafka Topic C - Consumer (10 partitions)
>
> And let's say the sinks are:
>
>    1. BucketingSink to S3 in bucket: s3://kafka_topic_a/<data files>
>    2. BucketingSink to S3 in bucket: s3://kafka_topic_b/<data files>
>    3. BucketingSink to S3 in bucket: s3://kafka_topic_c/<data files>
>
> And between source 1 to sink 1, I would like to perform unique processing.
> Between source 2 to sink 2, it should have unique processing and between
> source 3 to sink 3, it should also have unique processing.
>
> How can this be achieved? Is there an example?
>