You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Michael David Pedersen <mi...@googlemail.com> on 2016/10/31 10:06:52 UTC

Efficient filtering on Spark SQL dataframes with ordered keys

Hello,

I've got a Spark SQL dataframe containing a "key" column. The queries I
want to run start by filtering on the key range. My question in outline: is
it possible to sort the dataset by key so as to do efficient key range
filters, before subsequently running a more complex SQL query?

I'm aware that such efficient filtering is possible for key-value RDDs,
i.e. RDDs over Tuple2, using PairRDDFunctions. My workflow currently looks
as follows:

// Create a dataframe
val df: DataFrame = sqlContext.sql("SELECT * FROM ...")
val keyValRDD = df.rdd.map( (r: Row) => (r.getAs[String]("key"), r) )

// Sort by key - and cache.
val keyValRDDSorted = keyValRDD.sortByKey().cache

// Define a function to run SQL query on a range.
def queryRange(lower: String, upper: String, sql: String, tableName:
String) = {
    val rangeRDD = keyValRDDSorted.filterByRange(lower, upper)
    val rangeDF = sqlContext.createDataFrame(rangeRDD.map{ _._2 },
df.schema)
    rangeDF.createTempView(tableName)
    sqlContext.sql(sql)
}

// Invoke multiple times.
queryRange(...)
queryRange(...)
...

This works, and is efficient in that only the partitions containing the
relevant key range are processed. However, I understand that Spark SQL uses
an optimised storage format as compared to plain RDDs. The above workflow
can't take advantage of this, as it is the key-value RDD that is cached.

So, my specific question: Is there a more efficient way of achieving the
desired result?

Any pointers would be much appreciated.

Many thanks,
Michael

PS: This question was also asked on StackOverflow -
http://stackoverflow.com/questions/40129411/efficient-filtering-on-spark-sql-dataframes-with-ordered-keys
.

Re: Efficient filtering on Spark SQL dataframes with ordered keys

Posted by Michael David Pedersen <mi...@googlemail.com>.
Awesome, thank you Michael for the detailed example!

I'll look into whether I can use this approach for my use case. If so, I
could avoid the overhead of repeatedly registering a temp table for one-off
queries, instead registering the table once and relying on the injected
strategy. Don't know how much of an impact this overhead has in praxis
though.

Cheers,
Michael

Re: Efficient filtering on Spark SQL dataframes with ordered keys

Posted by Michael Armbrust <mi...@databricks.com>.
registerTempTable is backed by an in-memory hash table that maps table name
(a string) to a logical query plan.  Fragments of that logical query plan
may or may not be cached (but calling register alone will not result in any
materialization of results).  In Spark 2.0 we renamed this function to
createOrReplaceTempView, since a traditional RDBMs view is a better analogy
here.

If I was trying to augment the engine to make better use of HBase's
internal ordering, I'd probably use the experimental ability to inject
extra strategies into the query planner.  Essentially, you could look for
filters on top of BaseRelations (the internal class used to map DataSources
into the query plan) where there is a range filter on some prefix of the
table's key.  When this is detected, you could return an RDD that contains
the already filtered result talking directly to HBase, which would override
the default execution pathway.

I wrote up a (toy) example of using this API
<https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/3897837307833148/2840265927289860/latest.html>,
which might be helpful.

On Tue, Nov 1, 2016 at 4:11 AM, Mich Talebzadeh <mi...@gmail.com>
wrote:

> it would be great if we establish this.
>
> I know in Hive these temporary tables "CREATE TEMPRARY TABLE ..." are
> private to the session and are put in a hidden staging directory as below
>
> /user/hive/warehouse/.hive-staging_hive_2016-07-10_22-58-
> 47_319_5605745346163312826-10
>
> and removed when the session ends or table is dropped
>
> Not sure how Spark handles this.
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 1 November 2016 at 10:50, Michael David Pedersen <michael.d.pedersen@
> googlemail.com> wrote:
>
>> Thanks for the link, I hadn't come across this.
>>
>> According to https://forums.databricks.com/questions/400/what-is-the-diff
>>> erence-between-registertemptable-a.html
>>>
>>> and I quote
>>>
>>> "registerTempTable()
>>>
>>> registerTempTable() creates an in-memory table that is scoped to the
>>> cluster in which it was created. The data is stored using Hive's
>>> highly-optimized, in-memory columnar format."
>>>
>> But then the last post in the thread corrects this, saying:
>> "registerTempTable does not create a 'cached' in-memory table, but rather
>> an alias or a reference to the DataFrame. It's akin to a pointer in C/C++
>> or a reference in Java".
>>
>> So - probably need to dig into the sources to get more clarity on this.
>>
>> Cheers,
>> Michael
>>
>
>

Re: Efficient filtering on Spark SQL dataframes with ordered keys

Posted by Mich Talebzadeh <mi...@gmail.com>.
it would be great if we establish this.

I know in Hive these temporary tables "CREATE TEMPRARY TABLE ..." are
private to the session and are put in a hidden staging directory as below

/user/hive/warehouse/.hive-staging_hive_2016-07-10_22-58-47_319_5605745346163312826-10

and removed when the session ends or table is dropped

Not sure how Spark handles this.

HTH


Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 1 November 2016 at 10:50, Michael David Pedersen <
michael.d.pedersen@googlemail.com> wrote:

> Thanks for the link, I hadn't come across this.
>
> According to https://forums.databricks.com/questions/400/what-is-the-diff
>> erence-between-registertemptable-a.html
>>
>> and I quote
>>
>> "registerTempTable()
>>
>> registerTempTable() creates an in-memory table that is scoped to the
>> cluster in which it was created. The data is stored using Hive's
>> highly-optimized, in-memory columnar format."
>>
> But then the last post in the thread corrects this, saying:
> "registerTempTable does not create a 'cached' in-memory table, but rather
> an alias or a reference to the DataFrame. It's akin to a pointer in C/C++
> or a reference in Java".
>
> So - probably need to dig into the sources to get more clarity on this.
>
> Cheers,
> Michael
>

Re: Efficient filtering on Spark SQL dataframes with ordered keys

Posted by Michael David Pedersen <mi...@googlemail.com>.
Thanks for the link, I hadn't come across this.

According to https://forums.databricks.com/questions/400/what-is-the-
> difference-between-registertemptable-a.html
>
> and I quote
>
> "registerTempTable()
>
> registerTempTable() creates an in-memory table that is scoped to the
> cluster in which it was created. The data is stored using Hive's
> highly-optimized, in-memory columnar format."
>
But then the last post in the thread corrects this, saying:
"registerTempTable does not create a 'cached' in-memory table, but rather
an alias or a reference to the DataFrame. It's akin to a pointer in C/C++
or a reference in Java".

So - probably need to dig into the sources to get more clarity on this.

Cheers,
Michael

Re: Efficient filtering on Spark SQL dataframes with ordered keys

Posted by Mich Talebzadeh <mi...@gmail.com>.
A bit of gray area here I am afraid, I was trying to experiment with it

According to
https://forums.databricks.com/questions/400/what-is-the-difference-between-registertemptable-a.html

and I quote

"registerTempTable()

registerTempTable() creates an in-memory table that is scoped to the
cluster in which it was created. The data is stored using Hive's
highly-optimized, in-memory columnar format."


So on the face of it tempTable is an in-memory table

HTH




Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 1 November 2016 at 10:01, Michael David Pedersen <
michael.d.pedersen@googlemail.com> wrote:

> Hi again Mich,
>
> "But the thing is that I don't explicitly cache the tempTables ..".
>>
>> I believe tempTable is created in-memory and is already cached
>>
>
> That surprises me since there is a sqlContext.cacheTable method to
> explicitly cache a table in memory. Or am I missing something? This could
> explain why I'm seeing somewhat worse performance than I'd expect.
>
> Cheers,
> Michael
>

Re: Efficient filtering on Spark SQL dataframes with ordered keys

Posted by Michael David Pedersen <mi...@googlemail.com>.
Hi again Mich,

"But the thing is that I don't explicitly cache the tempTables ..".
>
> I believe tempTable is created in-memory and is already cached
>

That surprises me since there is a sqlContext.cacheTable method to
explicitly cache a table in memory. Or am I missing something? This could
explain why I'm seeing somewhat worse performance than I'd expect.

Cheers,
Michael

Re: Efficient filtering on Spark SQL dataframes with ordered keys

Posted by Mich Talebzadeh <mi...@gmail.com>.
well I suppose one  can drop tempTable as below

scala> df.registerTempTable("tmp")

scala> spark.sql("select count(1) from tmp").show
+--------+
|count(1)|
+--------+
|  904180|
+--------+

scala> spark.sql("drop table if exists tmp")
res22: org.apache.spark.sql.DataFrame = []

Also your point

"But the thing is that I don't explicitly cache the tempTables ..".

I believe tempTable is created in-memory and is already cached

HTH


























Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 31 October 2016 at 14:16, Michael David Pedersen <
michael.d.pedersen@googlemail.com> wrote:

> Hi Mich,
>
> Thank you again for your reply.
>
> As I see you are caching the table already sorted
>>
>> val keyValRDDSorted = keyValRDD.sortByKey().cache
>>
>> and the next stage is you are creating multiple tempTables (different
>> ranges) that cache a subset of rows already cached in RDD. The data stored
>> in tempTable is in Hive columnar format (I assume that means ORC format)
>>
>
> But the thing is that I don't explicitly cache the tempTables, and I don't
> really want to because I'll only run a single query on each tempTable. So I
> expect the SQL query processor to operate directly on the underlying
> key-value RDD, and my concern is that this may be inefficient.
>
>
>> Well that is all you can do.
>>
>
> Ok, thanks - that's really what I wanted to get confirmation of.
>
>
>> Bear in mind that these tempTables are immutable and I do not know any
>> way of dropping tempTable to free more memory.
>>
>
> I'm assuming there won't be any (significant) memory overhead of
> registering the temp tables as long as I don't explicitly cache them. Am I
> wrong? In any case I'll be calling sqlContext.dropTempTable once the query
> has completed, which according to the documentation should also free up
> memory.
>
> Cheers,
> Michael
>

Re: Efficient filtering on Spark SQL dataframes with ordered keys

Posted by Michael David Pedersen <mi...@googlemail.com>.
Hi Mich,

Thank you again for your reply.

As I see you are caching the table already sorted
>
> val keyValRDDSorted = keyValRDD.sortByKey().cache
>
> and the next stage is you are creating multiple tempTables (different
> ranges) that cache a subset of rows already cached in RDD. The data stored
> in tempTable is in Hive columnar format (I assume that means ORC format)
>

But the thing is that I don't explicitly cache the tempTables, and I don't
really want to because I'll only run a single query on each tempTable. So I
expect the SQL query processor to operate directly on the underlying
key-value RDD, and my concern is that this may be inefficient.


> Well that is all you can do.
>

Ok, thanks - that's really what I wanted to get confirmation of.


> Bear in mind that these tempTables are immutable and I do not know any way
> of dropping tempTable to free more memory.
>

I'm assuming there won't be any (significant) memory overhead of
registering the temp tables as long as I don't explicitly cache them. Am I
wrong? In any case I'll be calling sqlContext.dropTempTable once the query
has completed, which according to the documentation should also free up
memory.

Cheers,
Michael

Re: Efficient filtering on Spark SQL dataframes with ordered keys

Posted by Mich Talebzadeh <mi...@gmail.com>.
Hi Michael,

As I see you are caching the table already sorted

val keyValRDDSorted = keyValRDD.sortByKey().cache

and the next stage is you are creating multiple tempTables (different
ranges) that cache a subset of rows already cached in RDD. The data stored
in tempTable is in Hive columnar format (I assume that means ORC format)

Well that is all you can do. Bear in mind that these tempTables are
immutable and I do not know any way of dropping tempTable to free more
memory.

Depending on the size of the main table, caching the whole table may
require a lot of memory but you can see this in UI storage page.
Alternative is to use persist(StorageLevel.MEMORY_AND_DISK_SER()) with a
mix of cached and disk.

HTH







Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 31 October 2016 at 10:55, Michael David Pedersen <
michael.d.pedersen@googlemail.com> wrote:

> Hi Mich,
>
> Thank you for your quick reply!
>
> What type of table is the underlying table? Is it Hbase, Hive ORC or what?
>>
>
> It is a custom datasource, but ultimately backed by HBase.
>
>
>> By Key you mean a UNIQUE ID or something similar and then you do multiple
>> scans on the tempTable which stores data using in-memory columnar format.
>>
>
> The key is a unique ID, yes. But note that I don't actually do multiple
> scans on the same temp table: I create a new temp table for every query I
> want to run, because each query will be based on a different key range. The
> caching is at the level of the full key-value RDD.
>
> If I did instead cache the temp table, I don't see a way of exploiting key
> ordering for key range filters?
>
>
>> That is the optimisation of tempTable storage as far as I know.
>>
>
> So it seems to me that my current solution won't be using this
> optimisation, as I'm caching the RDD rather than the temp table.
>
>
>> Have you tried it using predicate push-down on the underlying table
>> itself?
>>
>
> No, because I essentially want to load the entire table into memory before
> doing any queries. At that point I have nothing to push down.
>
> Cheers,
> Michael
>

Re: Efficient filtering on Spark SQL dataframes with ordered keys

Posted by Michael David Pedersen <mi...@googlemail.com>.
Hi Mich,

Thank you for your quick reply!

What type of table is the underlying table? Is it Hbase, Hive ORC or what?
>

It is a custom datasource, but ultimately backed by HBase.


> By Key you mean a UNIQUE ID or something similar and then you do multiple
> scans on the tempTable which stores data using in-memory columnar format.
>

The key is a unique ID, yes. But note that I don't actually do multiple
scans on the same temp table: I create a new temp table for every query I
want to run, because each query will be based on a different key range. The
caching is at the level of the full key-value RDD.

If I did instead cache the temp table, I don't see a way of exploiting key
ordering for key range filters?


> That is the optimisation of tempTable storage as far as I know.
>

So it seems to me that my current solution won't be using this
optimisation, as I'm caching the RDD rather than the temp table.


> Have you tried it using predicate push-down on the underlying table itself?
>

No, because I essentially want to load the entire table into memory before
doing any queries. At that point I have nothing to push down.

Cheers,
Michael

Re: Efficient filtering on Spark SQL dataframes with ordered keys

Posted by Mich Talebzadeh <mi...@gmail.com>.
Hi Michael.

What type of table is the underlying table? Is it Hbase, Hive ORC or what?

By Key you mean a UNIQUE ID or something similar and then you do multiple
scans on the tempTable which stores data using in-memory columnar format.

That is the optimisation of tempTable storage as far as I know.

Have you tried it using predicate push-down on the underlying table itself?

HTH




Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 31 October 2016 at 10:06, Michael David Pedersen <
michael.d.pedersen@googlemail.com> wrote:

> Hello,
>
> I've got a Spark SQL dataframe containing a "key" column. The queries I
> want to run start by filtering on the key range. My question in outline: is
> it possible to sort the dataset by key so as to do efficient key range
> filters, before subsequently running a more complex SQL query?
>
> I'm aware that such efficient filtering is possible for key-value RDDs,
> i.e. RDDs over Tuple2, using PairRDDFunctions. My workflow currently looks
> as follows:
>
> // Create a dataframe
> val df: DataFrame = sqlContext.sql("SELECT * FROM ...")
> val keyValRDD = df.rdd.map( (r: Row) => (r.getAs[String]("key"), r) )
>
> // Sort by key - and cache.
> val keyValRDDSorted = keyValRDD.sortByKey().cache
>
> // Define a function to run SQL query on a range.
> def queryRange(lower: String, upper: String, sql: String, tableName:
> String) = {
>     val rangeRDD = keyValRDDSorted.filterByRange(lower, upper)
>     val rangeDF = sqlContext.createDataFrame(rangeRDD.map{ _._2 },
> df.schema)
>     rangeDF.createTempView(tableName)
>     sqlContext.sql(sql)
> }
>
> // Invoke multiple times.
> queryRange(...)
> queryRange(...)
> ...
>
> This works, and is efficient in that only the partitions containing the
> relevant key range are processed. However, I understand that Spark SQL uses
> an optimised storage format as compared to plain RDDs. The above workflow
> can't take advantage of this, as it is the key-value RDD that is cached.
>
> So, my specific question: Is there a more efficient way of achieving the
> desired result?
>
> Any pointers would be much appreciated.
>
> Many thanks,
> Michael
>
> PS: This question was also asked on StackOverflow -
> http://stackoverflow.com/questions/40129411/efficient-
> filtering-on-spark-sql-dataframes-with-ordered-keys.
>