You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by roshan joe <im...@gmail.com> on 2017/11/21 12:45:17 UTC

spark lookup against shared RDD

Below is the use-case I am trying to solve using Spark shared Rdd:

I have a json dataset which is periodically being saved to a shared RDD in
Streaming App-1 using "saveValues" as below.

val sharedRdd: IgniteRDD[String, String] = igniteContext.fromCache[String,
String](cachecfg)
sharedRdd.saveValues(jsonRdd.values)


This SharedRdd is used to lookup values against the incremental dataset in
Streaming App-2. Lookup is performed based on two fields (key and date) in
the json and both the incremental dataset and the sharedRdd contain these
fields. Below is how the values are retrieved from sharedRDD and the lookup
is performed currently.

    //Retrieve values from sharedRdd
    val sharedRdd: IgniteRDD[String, String] =
igniteContext.fromCache("sharedRdd")
    val sharedRddJson = sharedRdd.values

    //convert sharedRdd to DF
    val sharedJsonDF = spark.read.json(sharedRddJson)
    sharedJsonDF.createOrReplaceTempView("sharedJsonDF")

     //convert incremental dataset to DF
    incrementalDF.createOrReplaceTempView("incrementalDF")

    //perform the lookup using Join query
    val sqlQuery = "SELECT * " +
"FROM incrementalDF a " +
"INNER JOIN sharedJsonDF b " +
"ON a.key = b.key " +
"AND a.date <= b.date "

    val sqlDF = spark.sql(sqlQuery)


Below are the questions I have:

   - Would adding the Index on the Join fields on the sharedRdd help to
   improve the performance? If so, what is the best way to add index on the
   json data? I see some index being added in config file and some in code but
   couldn't find a working example.


   - Can the sharedRdd be directly joined against the incrementalDF using
   the "sql"? I couldn't get the "sql" working directly on the sharedRdd using
   "_key", "_val" fields. Do I need to add fields info in the config file for
   the sql to get working?


   - I believe the above join query currently causes a "Shuffle" when the 2
   DFs are joined. Is there a way to colocate sharedRdd and IncrementalRdd on
   the given 2 keys, so that the data movement can be avoided and performance
   improved?


   - Is there any significance of the "number of partitions" in the current
   case?


   - Is there anything else that can be done to make the above join /
   lookup faster?

 Thanks in advance for the time!

Re: spark lookup against shared RDD

Posted by roshan joe <im...@gmail.com>.
Any response on the below questions for using index on spark shared rdd
will be really appreciated.

Thank you.

On Tue, Nov 21, 2017 at 4:45 AM, roshan joe <im...@gmail.com> wrote:

> Below is the use-case I am trying to solve using Spark shared Rdd:
>
> I have a json dataset which is periodically being saved to a shared RDD in
> Streaming App-1 using "saveValues" as below.
>
> val sharedRdd: IgniteRDD[String, String] = igniteContext.fromCache[String,
> String](cachecfg)
> sharedRdd.saveValues(jsonRdd.values)
>
>
> This SharedRdd is used to lookup values against the incremental dataset in
> Streaming App-2. Lookup is performed based on two fields (key and date) in
> the json and both the incremental dataset and the sharedRdd contain these
> fields. Below is how the values are retrieved from sharedRDD and the lookup
> is performed currently.
>
>     //Retrieve values from sharedRdd
>     val sharedRdd: IgniteRDD[String, String] = igniteContext.fromCache("
> sharedRdd")
>     val sharedRddJson = sharedRdd.values
>
>     //convert sharedRdd to DF
>     val sharedJsonDF = spark.read.json(sharedRddJson)
>     sharedJsonDF.createOrReplaceTempView("sharedJsonDF")
>
>      //convert incremental dataset to DF
>     incrementalDF.createOrReplaceTempView("incrementalDF")
>
>     //perform the lookup using Join query
>     val sqlQuery = "SELECT * " +
> "FROM incrementalDF a " +
> "INNER JOIN sharedJsonDF b " +
> "ON a.key = b.key " +
> "AND a.date <= b.date "
>
>     val sqlDF = spark.sql(sqlQuery)
>
>
> Below are the questions I have:
>
>    - Would adding the Index on the Join fields on the sharedRdd help to
>    improve the performance? If so, what is the best way to add index on the
>    json data? I see some index being added in config file and some in code but
>    couldn't find a working example.
>
>
>    - Can the sharedRdd be directly joined against the incrementalDF using
>    the "sql"? I couldn't get the "sql" working directly on the sharedRdd using
>    "_key", "_val" fields. Do I need to add fields info in the config file for
>    the sql to get working?
>
>
>    - I believe the above join query currently causes a "Shuffle" when the
>    2 DFs are joined. Is there a way to colocate sharedRdd and IncrementalRdd
>    on the given 2 keys, so that the data movement can be avoided and
>    performance improved?
>
>
>    - Is there any significance of the "number of partitions" in the
>    current case?
>
>
>    - Is there anything else that can be done to make the above join /
>    lookup faster?
>
>  Thanks in advance for the time!
>
>
>