You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mich Talebzadeh <mi...@gmail.com> on 2018/09/07 18:34:51 UTC

Fwd: Using MongoDB as an Operational Data Store (ODS) with Spark Streaming

Hi,

Anyone in the Spark community has had any exposure to this work please?

thanks



---------- Forwarded message ---------
From: Mich Talebzadeh <mi...@gmail.com>
Date: Thu, 6 Sep 2018 at 21:12
Subject: Using MongoDB as an Operational Data Store (ODS) with Spark
Streaming
To: <mo...@googlegroups.com>


Hi,

I thought that may find below useful.

Versions:


   - Hadoop 3.1
   - Spark 2.3
   - MongoDB 4.0.1
   - ZooKeeper on docker version zookeeper-3.4.11
   - Three Kafka dockers running kafka version kafka_2.12-0.10.2.1

I send trade data every 2 seconds composing of 100 securities for the Kafka
topic. So in every batch interval = 2 seconds we deal with 100 rows.

I then go three every RDD and look at the individual rows comprising:

case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE:
Double)

 And examine every security for high value prices.

This loop seems to work OK

        // Work on individual messages
         for(line <- pricesRDD.collect.toArray)
         {
           var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toString.toDouble
           var priceToString = line._2.split(',').view(3)
           var CURRENCY = "GBP"
           var op_type = "1"
           var op_time = System.currentTimeMillis.toString
           if (price > 90.0)
           {
             //println ("price > 90.0, saving to MongoDB collection!")
             var document = sparkContext.parallelize((1 to 1).
                            map(i =>
Document.parse(s"{key:'$key',ticker:'$ticker',timeissued:'$timeissued',price:$price,CURRENCY:'$CURRENCY',op_type:$op_type,op_time:'$op_time'}")))
             //
             // Writing document to MongoDB collection
             //
             MongoSpark.save(document, writeConfig)
             if(ticker == "VOD" && price > 99.0)
             {
               sqltext = Calendar.getInstance.getTime.toString + ", Price
on "+ticker+" hit " +price.toString
               //java.awt.Toolkit.getDefaultToolkit().beep()
               println(sqltext)
             }
           }
         }
      }

I collected 30,000 trades for this streaming and as you see I write to
MongoDB. In this case MongoDB is a standalone cluster.

The performance is good as shown in below Spark GUI

[image: image.png]
In general if your average processing time (here around 600ms < Batch
interval of 2 sec, then you are OK). However, when I compare this using
Hbase as the data store (in place of MongoDB), I end up with processing
time of 52ms for Hbase) as shown below:

[image: image.png]

The number of batches in both runs are pretty similar. So I am wondering
what factors influence this delay in MongoDB. In both cases Spark is
running under standalone mode with the same configuration for both runs. It
is possible that the way I write documents to MongoDB is not particularly
efficient or the connection through the following connection string in Spark
connectionString =
dbConnection+"://"+dbUsername+":"+dbPassword+"@"+mongodbHost+":"+mongodbPort+"/"+dbDatabase+"."+collectionName

and sparkConf

 sparkConf.set("spark.mongodb.input.uri", connectionString)
 sparkConf.set("spark.mongodb.output.uri", connectionString)

 is not particularly efficient.

Of course Hbase is native to Hadoop in this case and it uses HDFS for
storage. MongoDB is configured external to Hadoop

My concern at the moment is the speed of writes to MongoDB as opposed to
any reads/queries etc.

I appreciate if someone else shares their experiences or suggestions..

Thanks

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

Re: Using MongoDB as an Operational Data Store (ODS) with Spark Streaming

Posted by Mich Talebzadeh <mi...@gmail.com>.
Hi,

Thanks for colleagues who made great suggestions.

In my notes below I raised the concern about the speed of writes from Spark
to MongoDB (standalone version).

I was doing looping over RDD rows and selecting high value trades
(messages) and posting them into MongoDB collection individually.

This turned out to be inefficient in a distributed environment with spark
streaming.

Hence I decided to modify the code and post the conditional rows of RDD by
filtering those wanted rows at Dstream .In a simple language we moved away
from cursor to treating the result set as one.

    dstream.foreachRDD
    { pricesRDD =>
      if (!pricesRDD.isEmpty)  // data exists in RDD
      {
        val op_time = System.currentTimeMillis.toString
        val spark =
SparkSessionSingleton.getInstance(pricesRDD.sparkContext.getConf)
        import spark.implicits._
        // Convert RDD[String] to RDD[case class] to DataFrame
        val RDDString = pricesRDD.map { case (_, value) => value.split(',')
}.map(p => columns(p(0).toString,p(1).toString,p(2).toString,p(3).toDouble,
currency, op_type, op_time))
        val df = spark.createDataFrame(RDDString)
        var document = df.filter('price > 90.0)
        MongoSpark.save(document, writeConfig)
…..

This immediately improved the performance of the streaming statistics with
Processing time going down from 605ms to 71ms. The scheduling delay was
reduced from 261ms to 2ms. These are shown in plot below from Spark GUI

[image: image.png]

Contrast this one with the graph from the same operation using MongoDB with
looping over individual messages


[image: image.png]
I am now looking at other options to streamline the processes. Also note
that MongoDB Compass has a web gui that allows basic monitoring of read and
writes, network and memory usage. Having said that I did not find it
particularly useful. A snapshot is shown below

[image: image.png]

HTH

Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 7 Sep 2018 at 19:34, Mich Talebzadeh <mi...@gmail.com>
wrote:

> Hi,
>
> Anyone in the Spark community has had any exposure to this work please?
>
> thanks
>
>
>
> ---------- Forwarded message ---------
> From: Mich Talebzadeh <mi...@gmail.com>
> Date: Thu, 6 Sep 2018 at 21:12
> Subject: Using MongoDB as an Operational Data Store (ODS) with Spark
> Streaming
> To: <mo...@googlegroups.com>
>
>
> Hi,
>
> I thought that may find below useful.
>
> Versions:
>
>
>    - Hadoop 3.1
>    - Spark 2.3
>    - MongoDB 4.0.1
>    - ZooKeeper on docker version zookeeper-3.4.11
>    - Three Kafka dockers running kafka version kafka_2.12-0.10.2.1
>
> I send trade data every 2 seconds composing of 100 securities for the
> Kafka topic. So in every batch interval = 2 seconds we deal with 100 rows.
>
> I then go three every RDD and look at the individual rows comprising:
>
> case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE:
> Double)
>
>  And examine every security for high value prices.
>
> This loop seems to work OK
>
>         // Work on individual messages
>          for(line <- pricesRDD.collect.toArray)
>          {
>            var key = line._2.split(',').view(0).toString
>            var ticker =  line._2.split(',').view(1).toString
>            var timeissued = line._2.split(',').view(2).toString
>            var price = line._2.split(',').view(3).toString.toDouble
>            var priceToString = line._2.split(',').view(3)
>            var CURRENCY = "GBP"
>            var op_type = "1"
>            var op_time = System.currentTimeMillis.toString
>            if (price > 90.0)
>            {
>              //println ("price > 90.0, saving to MongoDB collection!")
>              var document = sparkContext.parallelize((1 to 1).
>                             map(i =>
> Document.parse(s"{key:'$key',ticker:'$ticker',timeissued:'$timeissued',price:$price,CURRENCY:'$CURRENCY',op_type:$op_type,op_time:'$op_time'}")))
>              //
>              // Writing document to MongoDB collection
>              //
>              MongoSpark.save(document, writeConfig)
>              if(ticker == "VOD" && price > 99.0)
>              {
>                sqltext = Calendar.getInstance.getTime.toString + ", Price
> on "+ticker+" hit " +price.toString
>                //java.awt.Toolkit.getDefaultToolkit().beep()
>                println(sqltext)
>              }
>            }
>          }
>       }
>
> I collected 30,000 trades for this streaming and as you see I write to
> MongoDB. In this case MongoDB is a standalone cluster.
>
> The performance is good as shown in below Spark GUI
>
> [image: image.png]
> In general if your average processing time (here around 600ms < Batch
> interval of 2 sec, then you are OK). However, when I compare this using
> Hbase as the data store (in place of MongoDB), I end up with processing
> time of 52ms for Hbase) as shown below:
>
> [image: image.png]
>
> The number of batches in both runs are pretty similar. So I am wondering
> what factors influence this delay in MongoDB. In both cases Spark is
> running under standalone mode with the same configuration for both runs. It
> is possible that the way I write documents to MongoDB is not particularly
> efficient or the connection through the following connection string in Spark
> connectionString =
> dbConnection+"://"+dbUsername+":"+dbPassword+"@"+mongodbHost+":"+mongodbPort+"/"+dbDatabase+"."+collectionName
>
> and sparkConf
>
>  sparkConf.set("spark.mongodb.input.uri", connectionString)
>  sparkConf.set("spark.mongodb.output.uri", connectionString)
>
>  is not particularly efficient.
>
> Of course Hbase is native to Hadoop in this case and it uses HDFS for
> storage. MongoDB is configured external to Hadoop
>
> My concern at the moment is the speed of writes to MongoDB as opposed to
> any reads/queries etc.
>
> I appreciate if someone else shares their experiences or suggestions..
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>