You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by sivaprakash <si...@gmail.com> on 2018/09/17 08:28:24 UTC

Subscribe Multiple Topics Structured Streaming

Hi

I have integrated Spark Streaming with Kafka in which Im listening 2 topics

def main(args: Array[String]): Unit = {

    val schema = StructType(
      List(
        StructField("gatewayId", StringType, true),
        StructField("userId", StringType, true)
      )
    )

    val spark = SparkSession
      .builder
      .master("local[4]")
      .appName("DeviceAutomation")
      .getOrCreate()

    val dfStatus = spark.readStream.
      format("kafka").
      option("subscribe", "utility-status, utility-critical").
      option("kafka.bootstrap.servers", "localhost:9092").
      option("startingOffsets", "earliest")
      .load()
    
      
      }
      
Since I have few more topics to be listed and perform different operations I
would like to move each topics into separate case class for better clarity.
Is it possible? 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Subscribe Multiple Topics Structured Streaming

Posted by Sivaprakash <si...@gmail.com>.
I would like to know how to create stream and sink operations outside
"main" method - just like another class which I can invoke from main. So
that I can have different implementations for each topic which I subscribed
in a specific class file. Is it a good practice or always the whole
implementations should go inside "main" method?

On Mon, Sep 17, 2018 at 11:35 PM naresh Goud <na...@gmail.com>
wrote:

> You can have below statement for multiple topics
>
> val dfStatus = spark.readStream.
>       format("kafka").
>       option("subscribe", "utility-status, utility-critical").
>       option("kafka.bootstrap.servers", "localhost:9092").
>       option("startingOffsets", "earliest")
>       .load()
>
>
>
>
>
> On Mon, Sep 17, 2018 at 3:28 AM sivaprakash <
> sivaprakashshanmugam@gmail.com> wrote:
>
>> Hi
>>
>> I have integrated Spark Streaming with Kafka in which Im listening 2
>> topics
>>
>> def main(args: Array[String]): Unit = {
>>
>>     val schema = StructType(
>>       List(
>>         StructField("gatewayId", StringType, true),
>>         StructField("userId", StringType, true)
>>       )
>>     )
>>
>>     val spark = SparkSession
>>       .builder
>>       .master("local[4]")
>>       .appName("DeviceAutomation")
>>       .getOrCreate()
>>
>>     val dfStatus = spark.readStream.
>>       format("kafka").
>>       option("subscribe", "utility-status, utility-critical").
>>       option("kafka.bootstrap.servers", "localhost:9092").
>>       option("startingOffsets", "earliest")
>>       .load()
>>
>>
>>       }
>>
>> Since I have few more topics to be listed and perform different
>> operations I
>> would like to move each topics into separate case class for better
>> clarity.
>> Is it possible?
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>
>> --
> Thanks,
> Naresh
> www.linkedin.com/in/naresh-dulam
> http://hadoopandspark.blogspot.com/
>
>

-- 
- Prakash.

Re: Subscribe Multiple Topics Structured Streaming

Posted by naresh Goud <na...@gmail.com>.
You can have below statement for multiple topics

val dfStatus = spark.readStream.
      format("kafka").
      option("subscribe", "utility-status, utility-critical").
      option("kafka.bootstrap.servers", "localhost:9092").
      option("startingOffsets", "earliest")
      .load()





On Mon, Sep 17, 2018 at 3:28 AM sivaprakash <si...@gmail.com>
wrote:

> Hi
>
> I have integrated Spark Streaming with Kafka in which Im listening 2 topics
>
> def main(args: Array[String]): Unit = {
>
>     val schema = StructType(
>       List(
>         StructField("gatewayId", StringType, true),
>         StructField("userId", StringType, true)
>       )
>     )
>
>     val spark = SparkSession
>       .builder
>       .master("local[4]")
>       .appName("DeviceAutomation")
>       .getOrCreate()
>
>     val dfStatus = spark.readStream.
>       format("kafka").
>       option("subscribe", "utility-status, utility-critical").
>       option("kafka.bootstrap.servers", "localhost:9092").
>       option("startingOffsets", "earliest")
>       .load()
>
>
>       }
>
> Since I have few more topics to be listed and perform different operations
> I
> would like to move each topics into separate case class for better clarity.
> Is it possible?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
> --
Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/