You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sendoh <un...@gmail.com> on 2016/06/23 12:33:20 UTC
Iterate several kafka topics using the kafka connector
Hi Flink developers,
Can I ask how could we iterate several Kafka topics using the Kafka
connector?
Our idea is like the following example:
List<DataStream<JSONObject>> streams = new ArrayList<>();
// Iterate kafka topics
Iterator<String> topicIter = topicList.iterator();
while (topicIter.hasNext()){
String topic = topicIter.next();
streams.add(env.addSource(new FlinkKafkaConsumer09<>(topic,
new JSONSchema(), properties)).rebalance());
}
Our goal is to union several kafka data streams into one, given the topics
as a list:
Iterator<DataStream<JSONObject>> streamsIt = streams.iterator();
DataStream<JSONObject> currentStream = streamsIt.next();
while(streamsIt.hasNext()){
DataStream<JSONObject> nextStream = streamsIt.next();
currentStream = currentStream.union(nextStream);
}
Cheers,
Sendoh
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Iterate-several-kafka-topics-using-the-kafka-connector-tp7673.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Iterate several kafka topics using the kafka connector
Posted by Sendoh <un...@gmail.com>.
Thank you. It totally works as what we want which unions data streams.
Best,
Sendoh
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Iterate-several-kafka-topics-using-the-kafka-connector-tp7673p7680.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Iterate several kafka topics using the kafka connector
Posted by Till Rohrmann <tr...@apache.org>.
It is possible to instantiate the FlinkKafkaConsumer with multiple topics
[1]. Simply pass a list of topic names instead of a the name of a single
topic.
streams.add(env.addSource(new
FlinkKafkaConsumer09<>(Arrays.asList("foo", "bar", "foobar"),
new JSONSchema(), properties));
[1]
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html#kafka-consumer
Cheers,
Till
On Thu, Jun 23, 2016 at 2:33 PM, Sendoh <un...@gmail.com> wrote:
> Hi Flink developers,
>
> Can I ask how could we iterate several Kafka topics using the Kafka
> connector?
>
> Our idea is like the following example:
>
> List<DataStream<JSONObject>> streams = new ArrayList<>();
>
> // Iterate kafka topics
> Iterator<String> topicIter = topicList.iterator();
>
> while (topicIter.hasNext()){
>
> String topic = topicIter.next();
>
> streams.add(env.addSource(new FlinkKafkaConsumer09<>(topic,
> new JSONSchema(), properties)).rebalance());
>
> }
>
> Our goal is to union several kafka data streams into one, given the topics
> as a list:
>
> Iterator<DataStream<JSONObject>> streamsIt = streams.iterator();
>
> DataStream<JSONObject> currentStream = streamsIt.next();
> while(streamsIt.hasNext()){
> DataStream<JSONObject> nextStream = streamsIt.next();
> currentStream = currentStream.union(nextStream);
> }
>
> Cheers,
>
> Sendoh
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Iterate-several-kafka-topics-using-the-kafka-connector-tp7673.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>