You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Srinivas V <sr...@gmail.com> on 2020/06/09 16:10:15 UTC

[spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

Hello,
 In Structured Streaming, is it possible to have one spark application with
one query to consume topics from multiple kafka clusters?

I am trying to consume two topics each from different Kafka Cluster, but it
gives one of the topics as an unknown topic and the job keeps running
without completing in Spark UI.

Is it not allowed in Spark 2.4.5?

Regards
Srini

Re: [spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

Posted by Srinivas V <sr...@gmail.com>.
ok, thanks for confirming, I will do it this way.

Regards
Srini

On Tue, Jun 9, 2020 at 11:31 PM Gerard Maas <ge...@gmail.com> wrote:

> Hi Srinivas,
>
> Reading from different brokers is possible but you need to connect to each
> Kafka cluster separately.
> Trying to mix connections to two different Kafka clusters in one
> subscriber is not supported. (I'm sure that it would give all kind of weird
> errors)
> The  "kafka.bootstrap.servers" option is there to indicate the potential
> many brokers of the *same* Kafka cluster.
>
> The way to address this is following the suggestion of German to create a
> subscriptions for each Kafka cluster you are talking to.
>
> val df_cluster1 = spark
>   .read
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "cluster1_host:cluster1_port")
>   .option("subscribe", "topic1, topic2")
>  .load()
>
> val df_cluster2 = spark
>   .read
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "cluster2_host:cluster2_port")
>   .option("subscribe", "topic3, topicn, topicn+1,")
>  .load()
>
> After acquiring the DataFrame, you can union them and treat all the data
> with a single process.
>
> val unifiedData = df_cluster1.union(df_cluster2)
> // apply further transformations on `unifiedData`
>
> kr, Gerard.
>
>
> :
>
>
>
> On Tue, Jun 9, 2020 at 6:30 PM Srinivas V <sr...@gmail.com> wrote:
>
>> Thanks for the quick reply. This may work but I have like 5 topics to
>> listen to right now, I am trying to keep all topics in an array in a
>> properties file and trying to read all at once. This way it is dynamic and
>> you have one code block like below and you may add or delete topics from
>> the config file without changing code. If someone confirms that it does not
>> work, I would have to do something like you have provided.
>>
>> val df_cluster1 = spark
>>   .read
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", "cluster1_host:cluster1_port,cluster2_host:port")
>>
>> .option("subscribe", "topic1, topic2,topic3,topic4,topic5")
>>
>>

Re: [spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

Posted by Gerard Maas <ge...@gmail.com>.
Hi Srinivas,

Reading from different brokers is possible but you need to connect to each
Kafka cluster separately.
Trying to mix connections to two different Kafka clusters in one subscriber
is not supported. (I'm sure that it would give all kind of weird errors)
The  "kafka.bootstrap.servers" option is there to indicate the potential
many brokers of the *same* Kafka cluster.

The way to address this is following the suggestion of German to create a
subscriptions for each Kafka cluster you are talking to.

val df_cluster1 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "cluster1_host:cluster1_port")
  .option("subscribe", "topic1, topic2")
 .load()

val df_cluster2 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "cluster2_host:cluster2_port")
  .option("subscribe", "topic3, topicn, topicn+1,")
 .load()

After acquiring the DataFrame, you can union them and treat all the data
with a single process.

val unifiedData = df_cluster1.union(df_cluster2)
// apply further transformations on `unifiedData`

kr, Gerard.


:



On Tue, Jun 9, 2020 at 6:30 PM Srinivas V <sr...@gmail.com> wrote:

> Thanks for the quick reply. This may work but I have like 5 topics to
> listen to right now, I am trying to keep all topics in an array in a
> properties file and trying to read all at once. This way it is dynamic and
> you have one code block like below and you may add or delete topics from
> the config file without changing code. If someone confirms that it does not
> work, I would have to do something like you have provided.
>
> val df_cluster1 = spark
>   .read
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "cluster1_host:cluster1_port,cluster2_host:port")
>
> .option("subscribe", "topic1, topic2,topic3,topic4,topic5")
>
>

Re: [spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

Posted by Srinivas V <sr...@gmail.com>.
Thanks for the quick reply. This may work but I have like 5 topics to
listen to right now, I am trying to keep all topics in an array in a
properties file and trying to read all at once. This way it is dynamic and
you have one code block like below and you may add or delete topics from
the config file without changing code. If someone confirms that it does not
work, I would have to do something like you have provided.

val df_cluster1 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers",
"cluster1_host:cluster1_port,cluster2_host:port")

.option("subscribe", "topic1, topic2,topic3,topic4,topic5")

Re: [spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

Posted by German SM <ge...@gmail.com>.
Hello,

I've never tried that, this doesn't work?

val df_cluster1 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "cluster1_host:cluster1_port")
  .option("subscribe", "topic1")

val df_cluster2 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "cluster2_host:port")
  .option("subscribe", "topic2")


On Tue, 9 Jun 2020 at 18:10, Srinivas V <sr...@gmail.com> wrote:

> Hello,
>  In Structured Streaming, is it possible to have one spark application
> with one query to consume topics from multiple kafka clusters?
>
> I am trying to consume two topics each from different Kafka Cluster, but
> it gives one of the topics as an unknown topic and the job keeps running
> without completing in Spark UI.
>
> Is it not allowed in Spark 2.4.5?
>
> Regards
> Srini
>
>
>
>