You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by roshan joe <im...@gmail.com> on 2017/11/21 12:45:17 UTC
spark lookup against shared RDD
Below is the use-case I am trying to solve using Spark shared Rdd:
I have a json dataset which is periodically being saved to a shared RDD in
Streaming App-1 using "saveValues" as below.
val sharedRdd: IgniteRDD[String, String] = igniteContext.fromCache[String,
String](cachecfg)
sharedRdd.saveValues(jsonRdd.values)
This SharedRdd is used to lookup values against the incremental dataset in
Streaming App-2. Lookup is performed based on two fields (key and date) in
the json and both the incremental dataset and the sharedRdd contain these
fields. Below is how the values are retrieved from sharedRDD and the lookup
is performed currently.
//Retrieve values from sharedRdd
val sharedRdd: IgniteRDD[String, String] =
igniteContext.fromCache("sharedRdd")
val sharedRddJson = sharedRdd.values
//convert sharedRdd to DF
val sharedJsonDF = spark.read.json(sharedRddJson)
sharedJsonDF.createOrReplaceTempView("sharedJsonDF")
//convert incremental dataset to DF
incrementalDF.createOrReplaceTempView("incrementalDF")
//perform the lookup using Join query
val sqlQuery = "SELECT * " +
"FROM incrementalDF a " +
"INNER JOIN sharedJsonDF b " +
"ON a.key = b.key " +
"AND a.date <= b.date "
val sqlDF = spark.sql(sqlQuery)
Below are the questions I have:
- Would adding the Index on the Join fields on the sharedRdd help to
improve the performance? If so, what is the best way to add index on the
json data? I see some index being added in config file and some in code but
couldn't find a working example.
- Can the sharedRdd be directly joined against the incrementalDF using
the "sql"? I couldn't get the "sql" working directly on the sharedRdd using
"_key", "_val" fields. Do I need to add fields info in the config file for
the sql to get working?
- I believe the above join query currently causes a "Shuffle" when the 2
DFs are joined. Is there a way to colocate sharedRdd and IncrementalRdd on
the given 2 keys, so that the data movement can be avoided and performance
improved?
- Is there any significance of the "number of partitions" in the current
case?
- Is there anything else that can be done to make the above join /
lookup faster?
Thanks in advance for the time!
Re: spark lookup against shared RDD
Posted by roshan joe <im...@gmail.com>.
Any response on the below questions for using index on spark shared rdd
will be really appreciated.
Thank you.
On Tue, Nov 21, 2017 at 4:45 AM, roshan joe <im...@gmail.com> wrote:
> Below is the use-case I am trying to solve using Spark shared Rdd:
>
> I have a json dataset which is periodically being saved to a shared RDD in
> Streaming App-1 using "saveValues" as below.
>
> val sharedRdd: IgniteRDD[String, String] = igniteContext.fromCache[String,
> String](cachecfg)
> sharedRdd.saveValues(jsonRdd.values)
>
>
> This SharedRdd is used to lookup values against the incremental dataset in
> Streaming App-2. Lookup is performed based on two fields (key and date) in
> the json and both the incremental dataset and the sharedRdd contain these
> fields. Below is how the values are retrieved from sharedRDD and the lookup
> is performed currently.
>
> //Retrieve values from sharedRdd
> val sharedRdd: IgniteRDD[String, String] = igniteContext.fromCache("
> sharedRdd")
> val sharedRddJson = sharedRdd.values
>
> //convert sharedRdd to DF
> val sharedJsonDF = spark.read.json(sharedRddJson)
> sharedJsonDF.createOrReplaceTempView("sharedJsonDF")
>
> //convert incremental dataset to DF
> incrementalDF.createOrReplaceTempView("incrementalDF")
>
> //perform the lookup using Join query
> val sqlQuery = "SELECT * " +
> "FROM incrementalDF a " +
> "INNER JOIN sharedJsonDF b " +
> "ON a.key = b.key " +
> "AND a.date <= b.date "
>
> val sqlDF = spark.sql(sqlQuery)
>
>
> Below are the questions I have:
>
> - Would adding the Index on the Join fields on the sharedRdd help to
> improve the performance? If so, what is the best way to add index on the
> json data? I see some index being added in config file and some in code but
> couldn't find a working example.
>
>
> - Can the sharedRdd be directly joined against the incrementalDF using
> the "sql"? I couldn't get the "sql" working directly on the sharedRdd using
> "_key", "_val" fields. Do I need to add fields info in the config file for
> the sql to get working?
>
>
> - I believe the above join query currently causes a "Shuffle" when the
> 2 DFs are joined. Is there a way to colocate sharedRdd and IncrementalRdd
> on the given 2 keys, so that the data movement can be avoided and
> performance improved?
>
>
> - Is there any significance of the "number of partitions" in the
> current case?
>
>
> - Is there anything else that can be done to make the above join /
> lookup faster?
>
> Thanks in advance for the time!
>
>
>