You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mich Talebzadeh <mi...@gmail.com> on 2019/03/27 19:47:38 UTC

Streaming data out of spark to a Kafka topic

Hi,

In a traditional we get data via Kafka into Spark streaming, do some work
and write to a NoSQL database like Mongo, Hbase or Aerospike.

That part can be done below and is best explained by the code as follows:

Once a high value DF lookups is created I want send the data to a new topic
for recipients!

    val kafkaParams = Map[String, String](
                                      "bootstrap.servers" ->
bootstrapServers,
                                      "schema.registry.url" ->
schemaRegistryURL,
                                       "zookeeper.connect" ->
zookeeperConnect,
                                       "group.id" -> sparkAppName,
                                       "zookeeper.connection.timeout.ms" ->
zookeeperConnectionTimeoutMs,
                                       "rebalance.backoff.ms" ->
rebalanceBackoffMS,
                                       "zookeeper.session.timeout.ms" ->
zookeeperSessionTimeOutMs,
                                       "auto.commit.interval.ms" ->
autoCommitIntervalMS
                                     )
    //val topicsSet = topics.split(",").toSet
    val topics = Set(topicsValue)
    val dstream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
    // This returns a tuple of key and value (since messages in Kafka are
optionally keyed). In this case it is of type (String, String)
    dstream.cache()
    //
    val topicsOut = Set(topicsValueOut)
    val dstreamOut = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsOut)
    dstreamOut.cache()


    dstream.foreachRDD
    { pricesRDD =>
      if (!pricesRDD.isEmpty)  // data exists in RDD
      {
        val op_time = System.currentTimeMillis.toString
        val spark =
SparkSessionSingleton.getInstance(pricesRDD.sparkContext.getConf)
        val sc = spark.sparkContext
        import spark.implicits._
        var operation = new operationStruct(op_type, op_time)
        // Convert RDD[String] to RDD[case class] to DataFrame
        val RDDString = pricesRDD.map { case (_, value) => value.split(',')
}.map(p =>
priceDocument(priceStruct(p(0).toString,p(1).toString,p(2).toString,p(3).toDouble,
currency), operation))
        val df = spark.createDataFrame(RDDString)
        //df.printSchema
        var document = df.filter('priceInfo.getItem("price") > 90.0)
        MongoSpark.save(document, writeConfig)
         println("Current time is: " + Calendar.getInstance.getTime)
         totalPrices += document.count
         var endTimeQuery = System.currentTimeMillis
         println("Total Prices added to the collection so far: "
+totalPrices+ " , Runnig for  " + (endTimeQuery -
startTimeQuery)/(1000*60)+" Minutes")
         // Check if running time > runTime exit
         if( (endTimeQuery - startTimeQuery)/(100000*60) > runTime)
         {
           println("\nDuration exceeded " + runTime + " minutes exiting")
           System.exit(0)
         }
         // picking up individual arrays -->
df.select('otherDetails.getItem("tickerQuotes")(0)) shows first element
         //val lookups = df.filter('priceInfo.getItem("ticker") ===
tickerWatch && 'priceInfo.getItem("price") > priceWatch)
         val lookups = df.filter('priceInfo.getItem("price") > priceWatch)
         if(lookups.count > 0) {
           println("High value tickers")
           lookups.select('priceInfo.getItem("timeissued").as("Time
issued"), 'priceInfo.getItem("ticker").as("Ticker"),
'priceInfo.getItem("price").cast("Double").as("Latest price")).show

// Now here I want to send the content of lookups DF to another kafka
topic!!!
//Note that above I have created a new dstreamOut with a new topic topicsOut

How that can be done?
         }
      }
    }

Thanks


Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

Re: Streaming data out of spark to a Kafka topic

Posted by Mich Talebzadeh <mi...@gmail.com>.
Hi Gabor,

I will look at the link and see what it provides.

Thanks,


Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 27 Mar 2019 at 21:23, Gabor Somogyi <ga...@gmail.com>
wrote:

> Hi Mich,
>
> Please take a look at how to write data into Kafka topic with DStreams:
> https://github.com/gaborgsomogyi/spark-dstream-secure-kafka-sink-app/blob/62d64ce368bc07b385261f85f44971b32fe41327/src/main/scala/com/cloudera/spark/examples/DirectKafkaSinkWordCount.scala#L77
> (DStreams has no native Kafka sink, if you need it use Structured
> Streaming)
>
> BR,
> G
>
>
> On Wed, Mar 27, 2019 at 8:47 PM Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> Hi,
>>
>> In a traditional we get data via Kafka into Spark streaming, do some work
>> and write to a NoSQL database like Mongo, Hbase or Aerospike.
>>
>> That part can be done below and is best explained by the code as follows:
>>
>> Once a high value DF lookups is created I want send the data to a new
>> topic for recipients!
>>
>>     val kafkaParams = Map[String, String](
>>                                       "bootstrap.servers" ->
>> bootstrapServers,
>>                                       "schema.registry.url" ->
>> schemaRegistryURL,
>>                                        "zookeeper.connect" ->
>> zookeeperConnect,
>>                                        "group.id" -> sparkAppName,
>>                                        "zookeeper.connection.timeout.ms"
>> -> zookeeperConnectionTimeoutMs,
>>                                        "rebalance.backoff.ms" ->
>> rebalanceBackoffMS,
>>                                        "zookeeper.session.timeout.ms" ->
>> zookeeperSessionTimeOutMs,
>>                                        "auto.commit.interval.ms" ->
>> autoCommitIntervalMS
>>                                      )
>>     //val topicsSet = topics.split(",").toSet
>>     val topics = Set(topicsValue)
>>     val dstream = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
>>     // This returns a tuple of key and value (since messages in Kafka are
>> optionally keyed). In this case it is of type (String, String)
>>     dstream.cache()
>>     //
>>     val topicsOut = Set(topicsValueOut)
>>     val dstreamOut = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsOut)
>>     dstreamOut.cache()
>>
>>
>>     dstream.foreachRDD
>>     { pricesRDD =>
>>       if (!pricesRDD.isEmpty)  // data exists in RDD
>>       {
>>         val op_time = System.currentTimeMillis.toString
>>         val spark =
>> SparkSessionSingleton.getInstance(pricesRDD.sparkContext.getConf)
>>         val sc = spark.sparkContext
>>         import spark.implicits._
>>         var operation = new operationStruct(op_type, op_time)
>>         // Convert RDD[String] to RDD[case class] to DataFrame
>>         val RDDString = pricesRDD.map { case (_, value) =>
>> value.split(',') }.map(p =>
>> priceDocument(priceStruct(p(0).toString,p(1).toString,p(2).toString,p(3).toDouble,
>> currency), operation))
>>         val df = spark.createDataFrame(RDDString)
>>         //df.printSchema
>>         var document = df.filter('priceInfo.getItem("price") > 90.0)
>>         MongoSpark.save(document, writeConfig)
>>          println("Current time is: " + Calendar.getInstance.getTime)
>>          totalPrices += document.count
>>          var endTimeQuery = System.currentTimeMillis
>>          println("Total Prices added to the collection so far: "
>> +totalPrices+ " , Runnig for  " + (endTimeQuery -
>> startTimeQuery)/(1000*60)+" Minutes")
>>          // Check if running time > runTime exit
>>          if( (endTimeQuery - startTimeQuery)/(100000*60) > runTime)
>>          {
>>            println("\nDuration exceeded " + runTime + " minutes exiting")
>>            System.exit(0)
>>          }
>>          // picking up individual arrays -->
>> df.select('otherDetails.getItem("tickerQuotes")(0)) shows first element
>>          //val lookups = df.filter('priceInfo.getItem("ticker") ===
>> tickerWatch && 'priceInfo.getItem("price") > priceWatch)
>>          val lookups = df.filter('priceInfo.getItem("price") > priceWatch)
>>          if(lookups.count > 0) {
>>            println("High value tickers")
>>            lookups.select('priceInfo.getItem("timeissued").as("Time
>> issued"), 'priceInfo.getItem("ticker").as("Ticker"),
>> 'priceInfo.getItem("price").cast("Double").as("Latest price")).show
>>
>> // Now here I want to send the content of lookups DF to another kafka
>> topic!!!
>> //Note that above I have created a new dstreamOut with a new topic
>> topicsOut
>> How that can be done?
>>          }
>>       }
>>     }
>>
>> Thanks
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>

Re: Streaming data out of spark to a Kafka topic

Posted by Gabor Somogyi <ga...@gmail.com>.
Hi Mich,

Please take a look at how to write data into Kafka topic with DStreams:
https://github.com/gaborgsomogyi/spark-dstream-secure-kafka-sink-app/blob/62d64ce368bc07b385261f85f44971b32fe41327/src/main/scala/com/cloudera/spark/examples/DirectKafkaSinkWordCount.scala#L77
(DStreams has no native Kafka sink, if you need it use Structured Streaming)

BR,
G


On Wed, Mar 27, 2019 at 8:47 PM Mich Talebzadeh <mi...@gmail.com>
wrote:

> Hi,
>
> In a traditional we get data via Kafka into Spark streaming, do some work
> and write to a NoSQL database like Mongo, Hbase or Aerospike.
>
> That part can be done below and is best explained by the code as follows:
>
> Once a high value DF lookups is created I want send the data to a new
> topic for recipients!
>
>     val kafkaParams = Map[String, String](
>                                       "bootstrap.servers" ->
> bootstrapServers,
>                                       "schema.registry.url" ->
> schemaRegistryURL,
>                                        "zookeeper.connect" ->
> zookeeperConnect,
>                                        "group.id" -> sparkAppName,
>                                        "zookeeper.connection.timeout.ms"
> -> zookeeperConnectionTimeoutMs,
>                                        "rebalance.backoff.ms" ->
> rebalanceBackoffMS,
>                                        "zookeeper.session.timeout.ms" ->
> zookeeperSessionTimeOutMs,
>                                        "auto.commit.interval.ms" ->
> autoCommitIntervalMS
>                                      )
>     //val topicsSet = topics.split(",").toSet
>     val topics = Set(topicsValue)
>     val dstream = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
>     // This returns a tuple of key and value (since messages in Kafka are
> optionally keyed). In this case it is of type (String, String)
>     dstream.cache()
>     //
>     val topicsOut = Set(topicsValueOut)
>     val dstreamOut = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsOut)
>     dstreamOut.cache()
>
>
>     dstream.foreachRDD
>     { pricesRDD =>
>       if (!pricesRDD.isEmpty)  // data exists in RDD
>       {
>         val op_time = System.currentTimeMillis.toString
>         val spark =
> SparkSessionSingleton.getInstance(pricesRDD.sparkContext.getConf)
>         val sc = spark.sparkContext
>         import spark.implicits._
>         var operation = new operationStruct(op_type, op_time)
>         // Convert RDD[String] to RDD[case class] to DataFrame
>         val RDDString = pricesRDD.map { case (_, value) =>
> value.split(',') }.map(p =>
> priceDocument(priceStruct(p(0).toString,p(1).toString,p(2).toString,p(3).toDouble,
> currency), operation))
>         val df = spark.createDataFrame(RDDString)
>         //df.printSchema
>         var document = df.filter('priceInfo.getItem("price") > 90.0)
>         MongoSpark.save(document, writeConfig)
>          println("Current time is: " + Calendar.getInstance.getTime)
>          totalPrices += document.count
>          var endTimeQuery = System.currentTimeMillis
>          println("Total Prices added to the collection so far: "
> +totalPrices+ " , Runnig for  " + (endTimeQuery -
> startTimeQuery)/(1000*60)+" Minutes")
>          // Check if running time > runTime exit
>          if( (endTimeQuery - startTimeQuery)/(100000*60) > runTime)
>          {
>            println("\nDuration exceeded " + runTime + " minutes exiting")
>            System.exit(0)
>          }
>          // picking up individual arrays -->
> df.select('otherDetails.getItem("tickerQuotes")(0)) shows first element
>          //val lookups = df.filter('priceInfo.getItem("ticker") ===
> tickerWatch && 'priceInfo.getItem("price") > priceWatch)
>          val lookups = df.filter('priceInfo.getItem("price") > priceWatch)
>          if(lookups.count > 0) {
>            println("High value tickers")
>            lookups.select('priceInfo.getItem("timeissued").as("Time
> issued"), 'priceInfo.getItem("ticker").as("Ticker"),
> 'priceInfo.getItem("price").cast("Double").as("Latest price")).show
>
> // Now here I want to send the content of lookups DF to another kafka
> topic!!!
> //Note that above I have created a new dstreamOut with a new topic
> topicsOut
> How that can be done?
>          }
>       }
>     }
>
> Thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>