You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mich Talebzadeh <mi...@cloudtechnologypartners.co.uk> on 2016/02/23 00:16:45 UTC

Using functional programming rather than SQL

 

Hi, 

I have data stored in Hive tables that I want to do simple manipulation.


Currently in Spark I perform the following with getting the result set
using SQL from Hive tables, registering as a temporary table in Spark 

Now Ideally I can get the result set into a DF and work on DF to slice
and dice the data using functional programming with filter, map. split
etc. 

I wanted to get some ideas on how to go about it. 

thanks 

val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 

HiveContext.sql("use oraclehadoop")
val rs = HiveContext.sql("""SELECT t.calendar_month_desc,
c.channel_desc, SUM(s.amount_sold) AS TotalSales
FROM smallsales s, times t, channels c
WHERE s.time_id = t.time_id
AND s.channel_id = c.channel_id
GROUP BY t.calendar_month_desc, c.channel_desc
""")
RS.REGISTERTEMPTABLE("TMP") 

HiveContext.sql("""
SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
from tmp
ORDER BY MONTH, CHANNEL
""").collect.foreach(println)
HiveContext.sql("""
SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES
FROM tmp
GROUP BY channel_desc
order by SALES DESC
""").collect.foreach(println) 

-- 

Dr Mich Talebzadeh

LinkedIn
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

http://talebzadehmich.wordpress.com

NOTE: The information in this email is proprietary and confidential.
This message is for the designated recipient only, if you are not the
intended recipient, you should destroy it immediately. Any information
in this message shall not be understood as given or endorsed by Cloud
Technology Partners Ltd, its subsidiaries or their employees, unless
expressly so stated. It is the responsibility of the recipient to ensure
that this email is virus free, therefore neither Cloud Technology
partners Ltd, its subsidiaries nor their employees accept any
responsibility.

 

Re: Using functional programming rather than SQL

Posted by Koert Kuipers <ko...@tresata.com>.
however to really enjoy functional programming i assume you also want to
use lambda in your map and filter, which means you need to convert
DataFrame to Dataset, using df.as[SomeCaseClass]. Just be aware that its
somewhat early days for Dataset.

On Mon, Feb 22, 2016 at 6:45 PM, Kevin Mellott <ke...@gmail.com>
wrote:

> In your example, the *rs* instance should be a DataFrame object. In other
> words, the result of *HiveContext.sql* is a DataFrame that you can
> manipulate using *filter, map, *etc.
>
>
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
>
>
> On Mon, Feb 22, 2016 at 5:16 PM, Mich Talebzadeh <
> mich.talebzadeh@cloudtechnologypartners.co.uk> wrote:
>
>> Hi,
>>
>> I have data stored in Hive tables that I want to do simple manipulation.
>>
>> Currently in Spark I perform the following with getting the result set
>> using SQL from Hive tables, registering as a temporary table in Spark
>>
>> Now Ideally I can get the result set into a DF and work on DF to slice
>> and dice the data using functional programming with filter, map. split etc.
>>
>> I wanted to get some ideas on how to go about it.
>>
>> thanks
>>
>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>
>> HiveContext.sql("use oraclehadoop")
>> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc,
>> SUM(s.amount_sold) AS TotalSales
>> FROM smallsales s, times t, channels c
>> WHERE s.time_id = t.time_id
>> AND   s.channel_id = c.channel_id
>> GROUP BY t.calendar_month_desc, c.channel_desc
>> """)
>> *rs.registerTempTable("tmp")*
>>
>>
>> HiveContext.sql("""
>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
>> from tmp
>> ORDER BY MONTH, CHANNEL
>> """).collect.foreach(println)
>> HiveContext.sql("""
>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>> FROM tmp
>> GROUP BY channel_desc
>> order by SALES DESC
>> """).collect.foreach(println)
>>
>>
>> --
>>
>> Dr Mich Talebzadeh
>>
>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> http://talebzadehmich.wordpress.com
>>
>> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.
>>
>>
>>
>

Re: Using functional programming rather than SQL

Posted by Michał Zieliński <zi...@gmail.com>.
Your SQL query will look something like that in DataFrames (but as Ted
said, check the docs to see the signatures).

smallsales
.join(times,"time_id")
.join(channels,"channel_id")
.groupBy("calendar_month_desc","channel_desc")
.agg(sum(col("amount_sold")).as("TotalSales"),
"calendar_month_desc","channel_desc")

On 23 February 2016 at 01:50, Ted Yu <yu...@gmail.com> wrote:

> Mich:
> Please refer to the following test suite for examples on various DataFrame
> operations:
>
> sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
>
> On Mon, Feb 22, 2016 at 4:39 PM, Mich Talebzadeh <
> mich.talebzadeh@cloudtechnologypartners.co.uk> wrote:
>
>> Thanks Dean.
>>
>> I gather if I wanted to get the whole thing through FP with little or no
>> use of SQL, then for the first instance as I get the data set from Hive
>> (i.e,
>>
>> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc,
>> SUM(s.amount_sold) AS TotalSales
>> FROM smallsales s, times t, channels c
>> WHERE s.time_id = t.time_id
>> AND   s.channel_id = c.channel_id
>> GROUP BY t.calendar_month_desc, c.channel_desc
>> """)
>>
>> I can even possibly use DF to do the above sql joins because what I am
>> doing above can also be done in DF without SQL use? Is that possible? Or I
>> have to use some form of SQL?
>>
>>
>>
>> The rest I can do simply using DFs.
>>
>>
>>
>> Thanks
>>
>>
>>
>>
>>
>> On 23/02/2016 00:26, Dean Wampler wrote:
>>
>> Kevin gave you the answer you need, but I'd like to comment on your
>> subject line. SQL is a limited form of FP. Sure, there are no anonymous
>> functions and other limitations, but it's declarative, like good FP
>> programs should be, and it offers an important subset of the operators
>> ("combinators") you want.
>>
>> Also, on a practical note, use the DataFrame API whenever you can, rather
>> than dropping down to the RDD API, because the DataFrame API is far more
>> performant. It's a classic case where restricting your options enables more
>> aggressive optimizations behind the scenes. Michal Armbrust's talk at Spark
>> Summit East nicely made this point.
>> http://www.slideshare.net/databricks/structuring-spark-dataframes-datasets-and-streaming
>>
>> dean
>>
>> Dean Wampler, Ph.D.
>> Author: Programming Scala, 2nd Edition
>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>> Typesafe <http://typesafe.com>
>> @deanwampler <http://twitter.com/deanwampler>
>> http://polyglotprogramming.com
>>
>> On Mon, Feb 22, 2016 at 6:45 PM, Kevin Mellott <kevin.r.mellott@gmail.com
>> > wrote:
>>
>>> In your example, the *rs* instance should be a DataFrame object. In
>>> other words, the result of *HiveContext.sql* is a DataFrame that you
>>> can manipulate using *filter, map, *etc.
>>>
>>>
>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
>>>
>>>
>>> On Mon, Feb 22, 2016 at 5:16 PM, Mich Talebzadeh <
>>> mich.talebzadeh@cloudtechnologypartners.co.uk> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have data stored in Hive tables that I want to do simple manipulation.
>>>>
>>>> Currently in Spark I perform the following with getting the result set
>>>> using SQL from Hive tables, registering as a temporary table in Spark
>>>>
>>>> Now Ideally I can get the result set into a DF and work on DF to slice
>>>> and dice the data using functional programming with filter, map. split etc.
>>>>
>>>> I wanted to get some ideas on how to go about it.
>>>>
>>>> thanks
>>>>
>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>>
>>>> HiveContext.sql("use oraclehadoop")
>>>> val rs = HiveContext.sql("""SELECT t.calendar_month_desc,
>>>> c.channel_desc, SUM(s.amount_sold) AS TotalSales
>>>> FROM smallsales s, times t, channels c
>>>> WHERE s.time_id = t.time_id
>>>> AND   s.channel_id = c.channel_id
>>>> GROUP BY t.calendar_month_desc, c.channel_desc
>>>> """)
>>>> *rs.registerTempTable("tmp")*
>>>>
>>>>
>>>> HiveContext.sql("""
>>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
>>>> from tmp
>>>> ORDER BY MONTH, CHANNEL
>>>> """).collect.foreach(println)
>>>> HiveContext.sql("""
>>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>>>> FROM tmp
>>>> GROUP BY channel_desc
>>>> order by SALES DESC
>>>> """).collect.foreach(println)
>>>>
>>>>
>>>> --
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.
>>>>
>>>>
>>>>
>>
>>
>> --
>>
>> Dr Mich Talebzadeh
>>
>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> http://talebzadehmich.wordpress.com
>>
>> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.
>>
>>
>>
>

Re: Using functional programming rather than SQL

Posted by Ted Yu <yu...@gmail.com>.
Mich:
Please refer to the following test suite for examples on various DataFrame
operations:

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

On Mon, Feb 22, 2016 at 4:39 PM, Mich Talebzadeh <
mich.talebzadeh@cloudtechnologypartners.co.uk> wrote:

> Thanks Dean.
>
> I gather if I wanted to get the whole thing through FP with little or no
> use of SQL, then for the first instance as I get the data set from Hive
> (i.e,
>
> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc,
> SUM(s.amount_sold) AS TotalSales
> FROM smallsales s, times t, channels c
> WHERE s.time_id = t.time_id
> AND   s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
>
> I can even possibly use DF to do the above sql joins because what I am
> doing above can also be done in DF without SQL use? Is that possible? Or I
> have to use some form of SQL?
>
>
>
> The rest I can do simply using DFs.
>
>
>
> Thanks
>
>
>
>
>
> On 23/02/2016 00:26, Dean Wampler wrote:
>
> Kevin gave you the answer you need, but I'd like to comment on your
> subject line. SQL is a limited form of FP. Sure, there are no anonymous
> functions and other limitations, but it's declarative, like good FP
> programs should be, and it offers an important subset of the operators
> ("combinators") you want.
>
> Also, on a practical note, use the DataFrame API whenever you can, rather
> than dropping down to the RDD API, because the DataFrame API is far more
> performant. It's a classic case where restricting your options enables more
> aggressive optimizations behind the scenes. Michal Armbrust's talk at Spark
> Summit East nicely made this point.
> http://www.slideshare.net/databricks/structuring-spark-dataframes-datasets-and-streaming
>
> dean
>
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
> Typesafe <http://typesafe.com>
> @deanwampler <http://twitter.com/deanwampler>
> http://polyglotprogramming.com
>
> On Mon, Feb 22, 2016 at 6:45 PM, Kevin Mellott <ke...@gmail.com>
> wrote:
>
>> In your example, the *rs* instance should be a DataFrame object. In
>> other words, the result of *HiveContext.sql* is a DataFrame that you can
>> manipulate using *filter, map, *etc.
>>
>>
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
>>
>>
>> On Mon, Feb 22, 2016 at 5:16 PM, Mich Talebzadeh <
>> mich.talebzadeh@cloudtechnologypartners.co.uk> wrote:
>>
>>> Hi,
>>>
>>> I have data stored in Hive tables that I want to do simple manipulation.
>>>
>>> Currently in Spark I perform the following with getting the result set
>>> using SQL from Hive tables, registering as a temporary table in Spark
>>>
>>> Now Ideally I can get the result set into a DF and work on DF to slice
>>> and dice the data using functional programming with filter, map. split etc.
>>>
>>> I wanted to get some ideas on how to go about it.
>>>
>>> thanks
>>>
>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>
>>> HiveContext.sql("use oraclehadoop")
>>> val rs = HiveContext.sql("""SELECT t.calendar_month_desc,
>>> c.channel_desc, SUM(s.amount_sold) AS TotalSales
>>> FROM smallsales s, times t, channels c
>>> WHERE s.time_id = t.time_id
>>> AND   s.channel_id = c.channel_id
>>> GROUP BY t.calendar_month_desc, c.channel_desc
>>> """)
>>> *rs.registerTempTable("tmp")*
>>>
>>>
>>> HiveContext.sql("""
>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
>>> from tmp
>>> ORDER BY MONTH, CHANNEL
>>> """).collect.foreach(println)
>>> HiveContext.sql("""
>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>>> FROM tmp
>>> GROUP BY channel_desc
>>> order by SALES DESC
>>> """).collect.foreach(println)
>>>
>>>
>>> --
>>>
>>> Dr Mich Talebzadeh
>>>
>>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> http://talebzadehmich.wordpress.com
>>>
>>> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.
>>>
>>>
>>>
>
>
> --
>
> Dr Mich Talebzadeh
>
> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> http://talebzadehmich.wordpress.com
>
> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.
>
>
>

Re: Using functional programming rather than SQL

Posted by Mich Talebzadeh <mi...@cloudtechnologypartners.co.uk>.
 

Thanks Dean. 

I gather if I wanted to get the whole thing through FP with little or no
use of SQL, then for the first instance as I get the data set from Hive
(i.e, 

val rs = HiveContext.sql("""SELECT t.calendar_month_desc,
c.channel_desc, SUM(s.amount_sold) AS TotalSales
FROM smallsales s, times t, channels c
WHERE s.time_id = t.time_id
AND s.channel_id = c.channel_id
GROUP BY t.calendar_month_desc, c.channel_desc
""") 

I can even possibly use DF to do the above sql joins because what I am
doing above can also be done in DF without SQL use? Is that possible? Or
I have to use some form of SQL? 

The rest I can do simply using DFs. 

Thanks 

On 23/02/2016 00:26, Dean Wampler wrote: 

> Kevin gave you the answer you need, but I'd like to comment on your subject line. SQL is a limited form of FP. Sure, there are no anonymous functions and other limitations, but it's declarative, like good FP programs should be, and it offers an important subset of the operators ("combinators") you want. 
> 
> Also, on a practical note, use the DataFrame API whenever you can, rather than dropping down to the RDD API, because the DataFrame API is far more performant. It's a classic case where restricting your options enables more aggressive optimizations behind the scenes. Michal Armbrust's talk at Spark Summit East nicely made this point. http://www.slideshare.net/databricks/structuring-spark-dataframes-datasets-and-streaming [3] 
> 
> dean 
> 
> Dean Wampler, Ph.D. 
> Author: Programming Scala, 2nd Edition [4] (O'Reilly)
> 
> Typesafe [5]
> @deanwampler [6] 
> http://polyglotprogramming.com [7] 
> 
> On Mon, Feb 22, 2016 at 6:45 PM, Kevin Mellott <ke...@gmail.com> wrote:
> 
> In your example, the _rs_ instance should be a DataFrame object. In other words, the result of _HiveContext.sql_ is a DataFrame that you can manipulate using _filter, map, _etc. 
> 
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.hive.HiveContext [8] 
> 
> On Mon, Feb 22, 2016 at 5:16 PM, Mich Talebzadeh <mi...@cloudtechnologypartners.co.uk> wrote:
> 
> Hi, 
> 
> I have data stored in Hive tables that I want to do simple manipulation. 
> 
> Currently in Spark I perform the following with getting the result set using SQL from Hive tables, registering as a temporary table in Spark 
> 
> Now Ideally I can get the result set into a DF and work on DF to slice and dice the data using functional programming with filter, map. split etc. 
> 
> I wanted to get some ideas on how to go about it. 
> 
> thanks 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 
> 
> HiveContext.sql("use oraclehadoop")
> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS TotalSales
> FROM smallsales s, times t, channels c
> WHERE s.time_id = t.time_id
> AND s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> RS.REGISTERTEMPTABLE("TMP") 
> 
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL
> """).collect.foreach(println)
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC
> """).collect.foreach(println) 
> 
> -- 
> 
> Dr Mich Talebzadeh
> 
> LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw [1]
> 
> http://talebzadehmich.wordpress.com [2]
> 
> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.

-- 

Dr Mich Talebzadeh

LinkedIn
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

http://talebzadehmich.wordpress.com

NOTE: The information in this email is proprietary and confidential.
This message is for the designated recipient only, if you are not the
intended recipient, you should destroy it immediately. Any information
in this message shall not be understood as given or endorsed by Cloud
Technology Partners Ltd, its subsidiaries or their employees, unless
expressly so stated. It is the responsibility of the recipient to ensure
that this email is virus free, therefore neither Cloud Technology
partners Ltd, its subsidiaries nor their employees accept any
responsibility.

 

Links:
------
[1]
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
[2] http://talebzadehmich.wordpress.com
[3]
http://www.slideshare.net/databricks/structuring-spark-dataframes-datasets-and-streaming
[4] http://shop.oreilly.com/product/0636920033073.do
[5] http://typesafe.com
[6] http://twitter.com/deanwampler
[7] http://polyglotprogramming.com
[8]
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.hive.HiveContext

Re: Using functional programming rather than SQL

Posted by Dean Wampler <de...@gmail.com>.
Kevin gave you the answer you need, but I'd like to comment on your subject
line. SQL is a limited form of FP. Sure, there are no anonymous functions
and other limitations, but it's declarative, like good FP programs should
be, and it offers an important subset of the operators ("combinators") you
want.

Also, on a practical note, use the DataFrame API whenever you can, rather
than dropping down to the RDD API, because the DataFrame API is far more
performant. It's a classic case where restricting your options enables more
aggressive optimizations behind the scenes. Michal Armbrust's talk at Spark
Summit East nicely made this point.
http://www.slideshare.net/databricks/structuring-spark-dataframes-datasets-and-streaming

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Mon, Feb 22, 2016 at 6:45 PM, Kevin Mellott <ke...@gmail.com>
wrote:

> In your example, the *rs* instance should be a DataFrame object. In other
> words, the result of *HiveContext.sql* is a DataFrame that you can
> manipulate using *filter, map, *etc.
>
>
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
>
>
> On Mon, Feb 22, 2016 at 5:16 PM, Mich Talebzadeh <
> mich.talebzadeh@cloudtechnologypartners.co.uk> wrote:
>
>> Hi,
>>
>> I have data stored in Hive tables that I want to do simple manipulation.
>>
>> Currently in Spark I perform the following with getting the result set
>> using SQL from Hive tables, registering as a temporary table in Spark
>>
>> Now Ideally I can get the result set into a DF and work on DF to slice
>> and dice the data using functional programming with filter, map. split etc.
>>
>> I wanted to get some ideas on how to go about it.
>>
>> thanks
>>
>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>
>> HiveContext.sql("use oraclehadoop")
>> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc,
>> SUM(s.amount_sold) AS TotalSales
>> FROM smallsales s, times t, channels c
>> WHERE s.time_id = t.time_id
>> AND   s.channel_id = c.channel_id
>> GROUP BY t.calendar_month_desc, c.channel_desc
>> """)
>> *rs.registerTempTable("tmp")*
>>
>>
>> HiveContext.sql("""
>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
>> from tmp
>> ORDER BY MONTH, CHANNEL
>> """).collect.foreach(println)
>> HiveContext.sql("""
>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>> FROM tmp
>> GROUP BY channel_desc
>> order by SALES DESC
>> """).collect.foreach(println)
>>
>>
>> --
>>
>> Dr Mich Talebzadeh
>>
>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> http://talebzadehmich.wordpress.com
>>
>> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.
>>
>>
>>
>

Re: Using functional programming rather than SQL

Posted by Kevin Mellott <ke...@gmail.com>.
In your example, the *rs* instance should be a DataFrame object. In other
words, the result of *HiveContext.sql* is a DataFrame that you can
manipulate using *filter, map, *etc.

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.hive.HiveContext


On Mon, Feb 22, 2016 at 5:16 PM, Mich Talebzadeh <
mich.talebzadeh@cloudtechnologypartners.co.uk> wrote:

> Hi,
>
> I have data stored in Hive tables that I want to do simple manipulation.
>
> Currently in Spark I perform the following with getting the result set
> using SQL from Hive tables, registering as a temporary table in Spark
>
> Now Ideally I can get the result set into a DF and work on DF to slice and
> dice the data using functional programming with filter, map. split etc.
>
> I wanted to get some ideas on how to go about it.
>
> thanks
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>
> HiveContext.sql("use oraclehadoop")
> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc,
> SUM(s.amount_sold) AS TotalSales
> FROM smallsales s, times t, channels c
> WHERE s.time_id = t.time_id
> AND   s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> *rs.registerTempTable("tmp")*
>
>
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL
> """).collect.foreach(println)
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC
> """).collect.foreach(println)
>
>
> --
>
> Dr Mich Talebzadeh
>
> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> http://talebzadehmich.wordpress.com
>
> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.
>
>
>

Re: Using Spark functional programming rather than SQL, Spark on Hive tables

Posted by Sabarish Sasidharan <sa...@manthan.com>.
Spark has its own efficient in memory columnar format. So it's not ORC.
It's just that the data has to be serialized and deserialized over the
network. And that is consuming time.

Regards
Sab
On 24-Feb-2016 9:50 pm, "Mich Talebzadeh" <
mich.talebzadeh@cloudtechnologypartners.co.uk> wrote:

>
>
> *Hi,*
>
> *Tools*
>
> *Spark 1.5.2, Hadoop 2.6, Hive 2.0, Spark-Shell, Hive Database*
>
> *Objectives: Timing differences between running Spark using SQL and
> running Spark using functional programing (FP) (functional calls) on Hive
> tables*
>
> *Underlying tables: Three tables in Hive database using ORC format*
>
> The main differences in timings come from running the queries and fetching
> data. If you look the transformation part that is
>
> val rs =
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>
> Takes I second. On the other hand using SQL the query 1 takes 19 seconds
> compared to just under 4 minutes for functional programming
>
> The seconds query using SQL takes 28 seconds. Using FP it takes around 4
> minutes.
>
> These are my assumptions.
>
>    1. Running SQL the full query is executed in Hive which means that
>    Hive can take advantage of ORC optimization/storage index etc?
>    2. Running FP requires that data is fetched from the underlying tables
>    in Hive and brought back to Spark cluster (standalone here) and the joins
>    etc are done there
>
> The next step for me would be to:
>
>    1. Look at the query plans in Spark
>    2. Run the same code on Hive alone and compare results
>
>
>
> Any other suggestions are welcome.
>
> *Standard SQL code*
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> println ("\ncreating data set at "); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs = HiveContext.sql(
> """
> SELECT    t.calendar_month_desc
>         , c.channel_desc
>         , SUM(s.amount_sold) AS TotalSales
> FROM smallsales s
> INNER JOIN times t
> ON s.time_id = t.time_id
> INNER JOIN channels c
> ON s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> rs.registerTempTable("tmp")
> println ("\nfirst query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL LIMIT 5
> """).collect.foreach(println)
> println ("\nsecond query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC LIMIT 5
> """).collect.foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
> *Results*
>
> Started at [24/02/2016 09:00:50.50]
> res1: org.apache.spark.sql.DataFrame = [result: string]
>
> creating data set at [24/02/2016 09:00:53.53]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)
>
> First query at [24/02/2016 09:00:54.54]
> [1998-01,Direct Sales,9161730]
> [1998-01,Internet,1248581]
> [1998-01,Partners,2409776]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
>
> second query at [24/02/2016 09:01:13.13]
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
>
> Finished at [24/02/2016 09:01:31.31
>
> *Code using functional programming*
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> var s =
> HiveContext.table("sales").select("AMOUNT_SOLD","TIME_ID","CHANNEL_ID")
> val c = HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
> val t = HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")
> println ("\ncreating data set at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs =
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("\nfirst query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs1 =
> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("\nsecond query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs2
> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
> *Results*
>
> Started at [24/02/2016 08:52:27.27]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID:
> timestamp, CHANNEL_ID: bigint]
> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
> string]
> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
> CALENDAR_MONTH_DESC: string]
>
> creating data set at [24/02/2016 08:52:30.30]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)]
>
> first query at [24/02/2016 08:52:31.31]
> [1998-01,Direct Sales,9086830]
> [1998-01,Internet,1247641]
> [1998-01,Partners,2393567]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
> rs1: Unit = ()
>
> second query at [24/02/2016 08:56:17.17]
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
> rs2: Unit = ()
>
> Finished at
> [24/02/2016 09:00:14.14]
>
>
>
> On 24/02/2016 06:27, Sabarish Sasidharan wrote:
>
> When using SQL your full query, including the joins, were executed in
> Hive(or RDBMS) and only the results were brought into the Spark cluster. In
> the FP case, the data for the 3 tables is first pulled into the Spark
> cluster and then the join is executed.
>
> Thus the time difference.
>
> It's not immediately obvious why the results are different.
>
> Regards
> Sab
>
>
>

Re: Using Spark functional programming rather than SQL, Spark on Hive tables

Posted by Mich Talebzadeh <mi...@cloudtechnologypartners.co.uk>.
 

Well spotted Sab. You are correct. An oversight by me. They should both
use "sales". 

The results are now comparable 

The following statement 

"On the other hand using SQL the query 1 takes 19 seconds compared to
just under 4 minutes for functional programming 

The seconds query using SQL takes 28 seconds. Using FP it takes around 4
minutes." 

Should be amended to 

"Using SQL query 1 takes 3 min, 39 sec compared to 3 min, 44 sec using
FP 

Using SQL query 2 takes 3 min, 36 sec compared to 3 min, 53 sec using
FP" 

FP lags slightly behind SQL but not by any significant margin. 

Thanks 

On 24/02/2016 18:20, Sabarish Sasidharan wrote: 

> One more, you are referring to 2 different sales tables. That might account for the difference in numbers. 
> 
> Regards
> Sab 
> On 24-Feb-2016 9:50 pm, "Mich Talebzadeh" <mi...@cloudtechnologypartners.co.uk> wrote:
> 
> HI, 
> 
> TOOLS 
> 
> SPARK 1.5.2, HADOOP 2.6, HIVE 2.0, SPARK-SHELL, HIVE DATABASE 
> 
> OBJECTIVES: TIMING DIFFERENCES BETWEEN RUNNING SPARK USING SQL AND RUNNING SPARK USING FUNCTIONAL PROGRAMING (FP) (FUNCTIONAL CALLS) ON HIVE TABLES 
> 
> UNDERLYING TABLES: THREE TABLES IN HIVE DATABASE USING ORC FORMAT 
> 
> The main differences in timings come from running the queries and fetching data. If you look the transformation part that is 
> 
> val rs = s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales")) 
> 
> Takes I second. On the other hand using SQL the query 1 takes 19 seconds compared to just under 4 minutes for functional programming 
> 
> The seconds query using SQL takes 28 seconds. Using FP it takes around 4 minutes. 
> 
> These are my assumptions. 
> 
> * Running SQL the full query is executed in Hive which means that Hive can take advantage of ORC optimization/storage index etc?
> * Running FP requires that data is fetched from the underlying tables in Hive and brought back to Spark cluster (standalone here) and the joins etc are done there
> 
> The next step for me would be to: 
> 
> * Look at the query plans in Spark
> * Run the same code on Hive alone and compare results
> 
> Any other suggestions are welcome. 
> 
> STANDARD SQL CODE 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("nStarted at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> println ("ncreating data set at "); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> val rs = HiveContext.sql(
> """
> SELECT t.calendar_month_desc
> , c.channel_desc
> , SUM(s.amount_sold) AS TotalSales
> FROM smallsales s
> INNER JOIN times t
> ON s.time_id = t.time_id
> INNER JOIN channels c
> ON s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> rs.registerTempTable("tmp")
> println ("nfirst query at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL LIMIT 5
> """).collect.foreach(println)
> println ("nsecond query at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC LIMIT 5
> """).collect.foreach(println)
> println ("nFinished at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> sys.exit 
> 
> RESULTS 
> 
> Started at [24/02/2016 09:00:50.50]
> res1: org.apache.spark.sql.DataFrame = [result: string] 
> 
> creating data set at [24/02/2016 09:00:53.53]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, channel_desc: string, TotalSales: decimal(20,0) 
> 
> First query at [24/02/2016 09:00:54.54]
> [1998-01,Direct Sales,9161730]
> [1998-01,Internet,1248581]
> [1998-01,Partners,2409776]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193] 
> 
> second query at [24/02/2016 09:01:13.13]
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760] 
> 
> Finished at [24/02/2016 09:01:31.31 
> 
> CODE USING FUNCTIONAL PROGRAMMING 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("nStarted at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> var s = HiveContext.table("sales").select("AMOUNT_SOLD","TIME_ID","CHANNEL_ID")
> val c = HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
> val t = HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")
> println ("ncreating data set at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> val rs = s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("nfirst query at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> val rs1 = rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("nsecond query at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> val rs2 =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
> println ("nFinished at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> sys.exit 
> 
> RESULTS 
> 
> Started at [24/02/2016 08:52:27.27]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID: timestamp, CHANNEL_ID: bigint]
> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC: string]
> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp, CALENDAR_MONTH_DESC: string] 
> 
> creating data set at [24/02/2016 08:52:30.30]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, channel_desc: string, TotalSales: decimal(20,0)] 
> 
> first query at [24/02/2016 08:52:31.31]
> [1998-01,Direct Sales,9086830]
> [1998-01,Internet,1247641]
> [1998-01,Partners,2393567]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
> rs1: Unit = () 
> 
> second query at [24/02/2016 08:56:17.17]
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
> rs2: Unit = () 
> 
> Finished at
> [24/02/2016 09:00:14.14] 
> 
> On 24/02/2016 06:27, Sabarish Sasidharan wrote: 
> 
> When using SQL your full query, including the joins, were executed in Hive(or RDBMS) and only the results were brought into the Spark cluster. In the FP case, the data for the 3 tables is first pulled into the Spark cluster and then the join is executed. 
> 
> Thus the time difference. 
> 
> It's not immediately obvious why the results are different. 
> 
> Regards
> Sab

-- 

Dr Mich Talebzadeh

LinkedIn
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

http://talebzadehmich.wordpress.com

NOTE: The information in this email is proprietary and confidential.
This message is for the designated recipient only, if you are not the
intended recipient, you should destroy it immediately. Any information
in this message shall not be understood as given or endorsed by Cloud
Technology Partners Ltd, its subsidiaries or their employees, unless
expressly so stated. It is the responsibility of the recipient to ensure
that this email is virus free, therefore neither Cloud Technology
partners Ltd, its subsidiaries nor their employees accept any
responsibility.

 

Re: Using Spark functional programming rather than SQL, Spark on Hive tables

Posted by Mich Talebzadeh <mi...@cloudtechnologypartners.co.uk>.
 

Well spotted Sab. You are correct. An oversight by me. They should both
use "sales". 

The results are now comparable 

The following statement 

"On the other hand using SQL the query 1 takes 19 seconds compared to
just under 4 minutes for functional programming 

The seconds query using SQL takes 28 seconds. Using FP it takes around 4
minutes." 

Should be amended to 

"Using SQL query 1 takes 3 min, 39 sec compared to 3 min, 44 sec using
FP 

Using SQL query 2 takes 3 min, 36 sec compared to 3 min, 53 sec using
FP" 

FP lags slightly behind SQL but not by any significant margin. 

Thanks 

On 24/02/2016 18:20, Sabarish Sasidharan wrote: 

> One more, you are referring to 2 different sales tables. That might account for the difference in numbers. 
> 
> Regards
> Sab 
> On 24-Feb-2016 9:50 pm, "Mich Talebzadeh" <mi...@cloudtechnologypartners.co.uk> wrote:
> 
> HI, 
> 
> TOOLS 
> 
> SPARK 1.5.2, HADOOP 2.6, HIVE 2.0, SPARK-SHELL, HIVE DATABASE 
> 
> OBJECTIVES: TIMING DIFFERENCES BETWEEN RUNNING SPARK USING SQL AND RUNNING SPARK USING FUNCTIONAL PROGRAMING (FP) (FUNCTIONAL CALLS) ON HIVE TABLES 
> 
> UNDERLYING TABLES: THREE TABLES IN HIVE DATABASE USING ORC FORMAT 
> 
> The main differences in timings come from running the queries and fetching data. If you look the transformation part that is 
> 
> val rs = s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales")) 
> 
> Takes I second. On the other hand using SQL the query 1 takes 19 seconds compared to just under 4 minutes for functional programming 
> 
> The seconds query using SQL takes 28 seconds. Using FP it takes around 4 minutes. 
> 
> These are my assumptions. 
> 
> * Running SQL the full query is executed in Hive which means that Hive can take advantage of ORC optimization/storage index etc?
> * Running FP requires that data is fetched from the underlying tables in Hive and brought back to Spark cluster (standalone here) and the joins etc are done there
> 
> The next step for me would be to: 
> 
> * Look at the query plans in Spark
> * Run the same code on Hive alone and compare results
> 
> Any other suggestions are welcome. 
> 
> STANDARD SQL CODE 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("nStarted at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> println ("ncreating data set at "); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> val rs = HiveContext.sql(
> """
> SELECT t.calendar_month_desc
> , c.channel_desc
> , SUM(s.amount_sold) AS TotalSales
> FROM smallsales s
> INNER JOIN times t
> ON s.time_id = t.time_id
> INNER JOIN channels c
> ON s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> rs.registerTempTable("tmp")
> println ("nfirst query at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL LIMIT 5
> """).collect.foreach(println)
> println ("nsecond query at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC LIMIT 5
> """).collect.foreach(println)
> println ("nFinished at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> sys.exit 
> 
> RESULTS 
> 
> Started at [24/02/2016 09:00:50.50]
> res1: org.apache.spark.sql.DataFrame = [result: string] 
> 
> creating data set at [24/02/2016 09:00:53.53]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, channel_desc: string, TotalSales: decimal(20,0) 
> 
> First query at [24/02/2016 09:00:54.54]
> [1998-01,Direct Sales,9161730]
> [1998-01,Internet,1248581]
> [1998-01,Partners,2409776]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193] 
> 
> second query at [24/02/2016 09:01:13.13]
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760] 
> 
> Finished at [24/02/2016 09:01:31.31 
> 
> CODE USING FUNCTIONAL PROGRAMMING 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("nStarted at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> var s = HiveContext.table("sales").select("AMOUNT_SOLD","TIME_ID","CHANNEL_ID")
> val c = HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
> val t = HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")
> println ("ncreating data set at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> val rs = s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("nfirst query at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> val rs1 = rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("nsecond query at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> val rs2 =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
> println ("nFinished at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> sys.exit 
> 
> RESULTS 
> 
> Started at [24/02/2016 08:52:27.27]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID: timestamp, CHANNEL_ID: bigint]
> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC: string]
> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp, CALENDAR_MONTH_DESC: string] 
> 
> creating data set at [24/02/2016 08:52:30.30]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, channel_desc: string, TotalSales: decimal(20,0)] 
> 
> first query at [24/02/2016 08:52:31.31]
> [1998-01,Direct Sales,9086830]
> [1998-01,Internet,1247641]
> [1998-01,Partners,2393567]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
> rs1: Unit = () 
> 
> second query at [24/02/2016 08:56:17.17]
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
> rs2: Unit = () 
> 
> Finished at
> [24/02/2016 09:00:14.14] 
> 
> On 24/02/2016 06:27, Sabarish Sasidharan wrote: 
> 
> When using SQL your full query, including the joins, were executed in Hive(or RDBMS) and only the results were brought into the Spark cluster. In the FP case, the data for the 3 tables is first pulled into the Spark cluster and then the join is executed. 
> 
> Thus the time difference. 
> 
> It's not immediately obvious why the results are different. 
> 
> Regards
> Sab

-- 

Dr Mich Talebzadeh

LinkedIn
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

http://talebzadehmich.wordpress.com

NOTE: The information in this email is proprietary and confidential.
This message is for the designated recipient only, if you are not the
intended recipient, you should destroy it immediately. Any information
in this message shall not be understood as given or endorsed by Cloud
Technology Partners Ltd, its subsidiaries or their employees, unless
expressly so stated. It is the responsibility of the recipient to ensure
that this email is virus free, therefore neither Cloud Technology
partners Ltd, its subsidiaries nor their employees accept any
responsibility.

 

Re: Using Spark functional programming rather than SQL, Spark on Hive tables

Posted by Sabarish Sasidharan <sa...@manthan.com>.
One more, you are referring to 2 different sales tables. That might account
for the difference in numbers.

Regards
Sab
On 24-Feb-2016 9:50 pm, "Mich Talebzadeh" <
mich.talebzadeh@cloudtechnologypartners.co.uk> wrote:

>
>
> *Hi,*
>
> *Tools*
>
> *Spark 1.5.2, Hadoop 2.6, Hive 2.0, Spark-Shell, Hive Database*
>
> *Objectives: Timing differences between running Spark using SQL and
> running Spark using functional programing (FP) (functional calls) on Hive
> tables*
>
> *Underlying tables: Three tables in Hive database using ORC format*
>
> The main differences in timings come from running the queries and fetching
> data. If you look the transformation part that is
>
> val rs =
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>
> Takes I second. On the other hand using SQL the query 1 takes 19 seconds
> compared to just under 4 minutes for functional programming
>
> The seconds query using SQL takes 28 seconds. Using FP it takes around 4
> minutes.
>
> These are my assumptions.
>
>    1. Running SQL the full query is executed in Hive which means that
>    Hive can take advantage of ORC optimization/storage index etc?
>    2. Running FP requires that data is fetched from the underlying tables
>    in Hive and brought back to Spark cluster (standalone here) and the joins
>    etc are done there
>
> The next step for me would be to:
>
>    1. Look at the query plans in Spark
>    2. Run the same code on Hive alone and compare results
>
>
>
> Any other suggestions are welcome.
>
> *Standard SQL code*
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> println ("\ncreating data set at "); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs = HiveContext.sql(
> """
> SELECT    t.calendar_month_desc
>         , c.channel_desc
>         , SUM(s.amount_sold) AS TotalSales
> FROM smallsales s
> INNER JOIN times t
> ON s.time_id = t.time_id
> INNER JOIN channels c
> ON s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> rs.registerTempTable("tmp")
> println ("\nfirst query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL LIMIT 5
> """).collect.foreach(println)
> println ("\nsecond query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC LIMIT 5
> """).collect.foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
> *Results*
>
> Started at [24/02/2016 09:00:50.50]
> res1: org.apache.spark.sql.DataFrame = [result: string]
>
> creating data set at [24/02/2016 09:00:53.53]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)
>
> First query at [24/02/2016 09:00:54.54]
> [1998-01,Direct Sales,9161730]
> [1998-01,Internet,1248581]
> [1998-01,Partners,2409776]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
>
> second query at [24/02/2016 09:01:13.13]
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
>
> Finished at [24/02/2016 09:01:31.31
>
> *Code using functional programming*
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> var s =
> HiveContext.table("sales").select("AMOUNT_SOLD","TIME_ID","CHANNEL_ID")
> val c = HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
> val t = HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")
> println ("\ncreating data set at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs =
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("\nfirst query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs1 =
> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("\nsecond query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs2
> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
> *Results*
>
> Started at [24/02/2016 08:52:27.27]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID:
> timestamp, CHANNEL_ID: bigint]
> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
> string]
> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
> CALENDAR_MONTH_DESC: string]
>
> creating data set at [24/02/2016 08:52:30.30]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)]
>
> first query at [24/02/2016 08:52:31.31]
> [1998-01,Direct Sales,9086830]
> [1998-01,Internet,1247641]
> [1998-01,Partners,2393567]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
> rs1: Unit = ()
>
> second query at [24/02/2016 08:56:17.17]
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
> rs2: Unit = ()
>
> Finished at
> [24/02/2016 09:00:14.14]
>
>
>
> On 24/02/2016 06:27, Sabarish Sasidharan wrote:
>
> When using SQL your full query, including the joins, were executed in
> Hive(or RDBMS) and only the results were brought into the Spark cluster. In
> the FP case, the data for the 3 tables is first pulled into the Spark
> cluster and then the join is executed.
>
> Thus the time difference.
>
> It's not immediately obvious why the results are different.
>
> Regards
> Sab
>
>
>

Re: Using Spark functional programming rather than SQL, Spark on Hive tables

Posted by Mich Talebzadeh <mi...@cloudtechnologypartners.co.uk>.
 

HI, 

TOOLS 

SPARK 1.5.2, HADOOP 2.6, HIVE 2.0, SPARK-SHELL, HIVE DATABASE 

OBJECTIVES: TIMING DIFFERENCES BETWEEN RUNNING SPARK USING SQL AND
RUNNING SPARK USING FUNCTIONAL PROGRAMING (FP) (FUNCTIONAL CALLS) ON
HIVE TABLES 

UNDERLYING TABLES: THREE TABLES IN HIVE DATABASE USING ORC FORMAT 

The main differences in timings come from running the queries and
fetching data. If you look the transformation part that is 

val rs =
s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))


Takes I second. On the other hand using SQL the query 1 takes 19 seconds
compared to just under 4 minutes for functional programming 

The seconds query using SQL takes 28 seconds. Using FP it takes around 4
minutes. 

These are my assumptions. 

 	* Running SQL the full query is executed in Hive which means that Hive
can take advantage of ORC optimization/storage index etc?
 	* Running FP requires that data is fetched from the underlying tables
in Hive and brought back to Spark cluster (standalone here) and the
joins etc are done there

The next step for me would be to: 

 	* Look at the query plans in Spark
 	* Run the same code on Hive alone and compare results

Any other suggestions are welcome. 

STANDARD SQL CODE 

val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("nStarted at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("use oraclehadoop")
println ("ncreating data set at "); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
val rs = HiveContext.sql(
"""
SELECT t.calendar_month_desc
 , c.channel_desc
 , SUM(s.amount_sold) AS TotalSales
FROM smallsales s
INNER JOIN times t
ON s.time_id = t.time_id
INNER JOIN channels c
ON s.channel_id = c.channel_id
GROUP BY t.calendar_month_desc, c.channel_desc
""")
rs.registerTempTable("tmp")
println ("nfirst query at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("""
SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
from tmp
ORDER BY MONTH, CHANNEL LIMIT 5
""").collect.foreach(println)
println ("nsecond query at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("""
SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES
FROM tmp
GROUP BY channel_desc
order by SALES DESC LIMIT 5
""").collect.foreach(println)
println ("nFinished at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
sys.exit 

RESULTS 

Started at [24/02/2016 09:00:50.50]
res1: org.apache.spark.sql.DataFrame = [result: string] 

creating data set at [24/02/2016 09:00:53.53]
rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
channel_desc: string, TotalSales: decimal(20,0) 

First query at [24/02/2016 09:00:54.54]
[1998-01,Direct Sales,9161730]
[1998-01,Internet,1248581]
[1998-01,Partners,2409776]
[1998-02,Direct Sales,9161840]
[1998-02,Internet,1533193] 

second query at [24/02/2016 09:01:13.13]
[Direct Sales,9161840]
[Internet,3977374]
[Partners,3976291]
[Tele Sales,328760] 

Finished at [24/02/2016 09:01:31.31 

CODE USING FUNCTIONAL PROGRAMMING 

val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("nStarted at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("use oraclehadoop")
var s =
HiveContext.table("sales").select("AMOUNT_SOLD","TIME_ID","CHANNEL_ID")
val c =
HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
val t =
HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")
println ("ncreating data set at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
val rs =
s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
println ("nfirst query at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
val rs1 =
rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
println ("nsecond query at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
val rs2
=rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
println ("nFinished at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
sys.exit 

RESULTS 

Started at [24/02/2016 08:52:27.27]
res1: org.apache.spark.sql.DataFrame = [result: string]
s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0),
TIME_ID: timestamp, CHANNEL_ID: bigint]
c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
string]
t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
CALENDAR_MONTH_DESC: string] 

creating data set at [24/02/2016 08:52:30.30]
rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
channel_desc: string, TotalSales: decimal(20,0)] 

first query at [24/02/2016 08:52:31.31]
[1998-01,Direct Sales,9086830]
[1998-01,Internet,1247641]
[1998-01,Partners,2393567]
[1998-02,Direct Sales,9161840]
[1998-02,Internet,1533193]
rs1: Unit = () 

second query at [24/02/2016 08:56:17.17]
[Direct Sales,9161840]
[Internet,3977374]
[Partners,3976291]
[Tele Sales,328760]
rs2: Unit = () 

Finished at
[24/02/2016 09:00:14.14] 

On 24/02/2016 06:27, Sabarish Sasidharan wrote: 

> When using SQL your full query, including the joins, were executed in Hive(or RDBMS) and only the results were brought into the Spark cluster. In the FP case, the data for the 3 tables is first pulled into the Spark cluster and then the join is executed. 
> 
> Thus the time difference. 
> 
> It's not immediately obvious why the results are different. 
> 
> Regards
> Sab
 

Re: Using Spark functional programming rather than SQL, Spark on Hive tables

Posted by Mich Talebzadeh <mi...@cloudtechnologypartners.co.uk>.
 

HI, 

TOOLS 

SPARK 1.5.2, HADOOP 2.6, HIVE 2.0, SPARK-SHELL, HIVE DATABASE 

OBJECTIVES: TIMING DIFFERENCES BETWEEN RUNNING SPARK USING SQL AND
RUNNING SPARK USING FUNCTIONAL PROGRAMING (FP) (FUNCTIONAL CALLS) ON
HIVE TABLES 

UNDERLYING TABLES: THREE TABLES IN HIVE DATABASE USING ORC FORMAT 

The main differences in timings come from running the queries and
fetching data. If you look the transformation part that is 

val rs =
s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))


Takes I second. On the other hand using SQL the query 1 takes 19 seconds
compared to just under 4 minutes for functional programming 

The seconds query using SQL takes 28 seconds. Using FP it takes around 4
minutes. 

These are my assumptions. 

 	* Running SQL the full query is executed in Hive which means that Hive
can take advantage of ORC optimization/storage index etc?
 	* Running FP requires that data is fetched from the underlying tables
in Hive and brought back to Spark cluster (standalone here) and the
joins etc are done there

The next step for me would be to: 

 	* Look at the query plans in Spark
 	* Run the same code on Hive alone and compare results

Any other suggestions are welcome. 

STANDARD SQL CODE 

val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("nStarted at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("use oraclehadoop")
println ("ncreating data set at "); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
val rs = HiveContext.sql(
"""
SELECT t.calendar_month_desc
 , c.channel_desc
 , SUM(s.amount_sold) AS TotalSales
FROM smallsales s
INNER JOIN times t
ON s.time_id = t.time_id
INNER JOIN channels c
ON s.channel_id = c.channel_id
GROUP BY t.calendar_month_desc, c.channel_desc
""")
rs.registerTempTable("tmp")
println ("nfirst query at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("""
SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
from tmp
ORDER BY MONTH, CHANNEL LIMIT 5
""").collect.foreach(println)
println ("nsecond query at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("""
SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES
FROM tmp
GROUP BY channel_desc
order by SALES DESC LIMIT 5
""").collect.foreach(println)
println ("nFinished at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
sys.exit 

RESULTS 

Started at [24/02/2016 09:00:50.50]
res1: org.apache.spark.sql.DataFrame = [result: string] 

creating data set at [24/02/2016 09:00:53.53]
rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
channel_desc: string, TotalSales: decimal(20,0) 

First query at [24/02/2016 09:00:54.54]
[1998-01,Direct Sales,9161730]
[1998-01,Internet,1248581]
[1998-01,Partners,2409776]
[1998-02,Direct Sales,9161840]
[1998-02,Internet,1533193] 

second query at [24/02/2016 09:01:13.13]
[Direct Sales,9161840]
[Internet,3977374]
[Partners,3976291]
[Tele Sales,328760] 

Finished at [24/02/2016 09:01:31.31 

CODE USING FUNCTIONAL PROGRAMMING 

val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("nStarted at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("use oraclehadoop")
var s =
HiveContext.table("sales").select("AMOUNT_SOLD","TIME_ID","CHANNEL_ID")
val c =
HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
val t =
HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")
println ("ncreating data set at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
val rs =
s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
println ("nfirst query at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
val rs1 =
rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
println ("nsecond query at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
val rs2
=rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
println ("nFinished at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
sys.exit 

RESULTS 

Started at [24/02/2016 08:52:27.27]
res1: org.apache.spark.sql.DataFrame = [result: string]
s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0),
TIME_ID: timestamp, CHANNEL_ID: bigint]
c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
string]
t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
CALENDAR_MONTH_DESC: string] 

creating data set at [24/02/2016 08:52:30.30]
rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
channel_desc: string, TotalSales: decimal(20,0)] 

first query at [24/02/2016 08:52:31.31]
[1998-01,Direct Sales,9086830]
[1998-01,Internet,1247641]
[1998-01,Partners,2393567]
[1998-02,Direct Sales,9161840]
[1998-02,Internet,1533193]
rs1: Unit = () 

second query at [24/02/2016 08:56:17.17]
[Direct Sales,9161840]
[Internet,3977374]
[Partners,3976291]
[Tele Sales,328760]
rs2: Unit = () 

Finished at
[24/02/2016 09:00:14.14] 

On 24/02/2016 06:27, Sabarish Sasidharan wrote: 

> When using SQL your full query, including the joins, were executed in Hive(or RDBMS) and only the results were brought into the Spark cluster. In the FP case, the data for the 3 tables is first pulled into the Spark cluster and then the join is executed. 
> 
> Thus the time difference. 
> 
> It's not immediately obvious why the results are different. 
> 
> Regards
> Sab
 

Re: Using functional programming rather than SQL

Posted by Mich Talebzadeh <mi...@cloudtechnologypartners.co.uk>.
 

Hi Koert, 

My bad. I used a smaller size "sales" table in SQL plan. Kindly see my
new figures. 

On 24/02/2016 20:05, Koert Kuipers wrote: 

> my assumption, which is apparently incorrect, was that the SQL gets translated into a catalyst plan that is executed in spark. the dataframe operations (referred to by Mich as the FP results) also get translated into a catalyst plan that is executed on the exact same spark platform. so unless the SQL gets translated into a much better plan (perhaps thanks to some pushdown into ORC?), i dont see why it can be much faster.
> 
> On Wed, Feb 24, 2016 at 2:59 PM, Koert Kuipers <ko...@tresata.com> wrote:
> 
> i am still missing something. if it is executed in the source database, which is hive in this case, then it does need hive, no? how can you execute in hive without needing hive? 
> 
> On Wed, Feb 24, 2016 at 1:25 PM, Sabarish Sasidharan <sa...@manthan.com> wrote:
> 
> I never said it needs one. All I said is that when calling context.sql() the sql is executed in the source database (assuming datasource is Hive or some RDBMS) 
> 
> Regards
> Sab 
> 
> Regards
> Sab 
> 
> On 24-Feb-2016 11:49 pm, "Mohannad Ali" <ma...@gmail.com> wrote:
> 
> That is incorrect HiveContext does not need a hive instance to run. 
> On Feb 24, 2016 19:15, "Sabarish Sasidharan" <sa...@manthan.com> wrote:
> 
> Yes 
> 
> Regards
> Sab 
> On 24-Feb-2016 9:15 pm, "Koert Kuipers" <ko...@tresata.com> wrote:
> 
> are you saying that HiveContext.sql(...) runs on hive, and not on spark sql?
> 
> On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <sa...@manthan.com> wrote:
> 
> When using SQL your full query, including the joins, were executed in Hive(or RDBMS) and only the results were brought into the Spark cluster. In the FP case, the data for the 3 tables is first pulled into the Spark cluster and then the join is executed. 
> 
> Thus the time difference. 
> 
> It's not immediately obvious why the results are different. 
> 
> Regards
> Sab 
> 
> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <mi...@cloudtechnologypartners.co.uk> wrote:
> 
> Hi, 
> 
> First thanks everyone for their suggestions. Much appreciated. 
> 
> This was the original queries written in SQL and run against Spark-shell 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("nStarted at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop") 
> 
> val rs = HiveContext.sql(
> """
> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS TotalSales
> FROM smallsales s
> INNER JOIN times t
> ON s.time_id = t.time_id
> INNER JOIN channels c
> ON s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> rs.registerTempTable("tmp")
> println ("nfirst query")
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL LIMIT 5
> """).collect.foreach(println)
> println ("nsecond query")
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC LIMIT 5
> """).collect.foreach(println)
> println ("nFinished at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> sys.exit 
> 
> The second queries were written in FP as much as I could as below 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("nStarted at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM sales")
> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM times")
> val rs = s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("nfirst query")
> val rs1 = rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("nsecond query")
> val rs2 =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
> println ("nFinished at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> sys.exit 
> 
> However The first query results are slightly different in SQL and FP (may be the first query code in FP is not exactly correct?) and more importantly the FP takes order of magnitude longer compared to SQL (8 minutes compared to less than a minute). I am not surprised as I expected Functional Programming has to flatten up all those method calls and convert them to SQL? 
> 
> THE STANDARD SQL RESULTS 
> 
> Started at
> [23/02/2016 23:55:30.30]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, channel_desc: string, TotalSales: decimal(20,0)] 
> 
> first query
> [1998-01,Direct Sales,9161730]
> [1998-01,Internet,1248581]
> [1998-01,Partners,2409776]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193] 
> 
> second query
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760] 
> 
> Finished at
> [23/02/2016 23:56:11.11] 
> 
> THE FP RESULTS 
> 
> Started at
> [23/02/2016 23:45:58.58]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID: timestamp, CHANNEL_ID: bigint]
> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC: string]
> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp, CALENDAR_MONTH_DESC: string]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, channel_desc: string, TotalSales: decimal(20,0)] 
> 
> first query
> [1998-01,Direct Sales,9086830]
> [1998-01,Internet,1247641]
> [1998-01,Partners,2393567]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
> rs1: Unit = () 
> 
> second query
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
> rs2: Unit = () 
> 
> Finished at
> [23/02/2016 23:53:42.42] 
> 
> On 22/02/2016 23:16, Mich Talebzadeh wrote: 
> 
> Hi, 
> 
> I have data stored in Hive tables that I want to do simple manipulation. 
> 
> Currently in Spark I perform the following with getting the result set using SQL from Hive tables, registering as a temporary table in Spark 
> 
> Now Ideally I can get the result set into a DF and work on DF to slice and dice the data using functional programming with filter, map. split etc. 
> 
> I wanted to get some ideas on how to go about it. 
> 
> thanks 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 
> 
> HiveContext.sql("use oraclehadoop")
> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS TotalSales
> FROM smallsales s, times t, channels c
> WHERE s.time_id = t.time_id
> AND s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> RS.REGISTERTEMPTABLE("TMP") 
> 
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL
> """).collect.foreach(println)
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC
> """).collect.foreach(println) 
> 
> -- 
> 
> Dr Mich Talebzadeh
> 
> LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw [1]
> 
> http://talebzadehmich.wordpress.com [2]
> 
> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.
> 
> -- 
> 
> Dr Mich Talebzadeh
> 
> LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw [1]
> 
> http://talebzadehmich.wordpress.com [2]
> 
> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.

-- 

Dr Mich Talebzadeh

LinkedIn
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

http://talebzadehmich.wordpress.com

NOTE: The information in this email is proprietary and confidential.
This message is for the designated recipient only, if you are not the
intended recipient, you should destroy it immediately. Any information
in this message shall not be understood as given or endorsed by Cloud
Technology Partners Ltd, its subsidiaries or their employees, unless
expressly so stated. It is the responsibility of the recipient to ensure
that this email is virus free, therefore neither Cloud Technology
partners Ltd, its subsidiaries nor their employees accept any
responsibility.

 

Links:
------
[1]
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
[2] http://talebzadehmich.wordpress.com

Re: Using functional programming rather than SQL

Posted by Mich Talebzadeh <mi...@cloudtechnologypartners.co.uk>.
 

This is a point that I like to clarify please. 

These are my assumptions:. 

 	* Data resides in Hive tables in a Hive database
 	* Data has to be extracted from these tables. Tables are ORC so they
have ORC optimizations (Storage indexes, file, stride (64MB chunks of
data) , rowsets (in 10K rows) that contain min, max, sum for each column
at these three levels)
 	* HiveContext means use Hive internal optimization as well? Right
 	* Spark contacts Hive and instructs to get the data from Hive. Hive
doers all that
 	* Spark takes that data into memory space and does the queries

Does that make sense? 

On 24/02/2016 20:05, Koert Kuipers wrote: 

> my assumption, which is apparently incorrect, was that the SQL gets translated into a catalyst plan that is executed in spark. the dataframe operations (referred to by Mich as the FP results) also get translated into a catalyst plan that is executed on the exact same spark platform. so unless the SQL gets translated into a much better plan (perhaps thanks to some pushdown into ORC?), i dont see why it can be much faster.
> 
> On Wed, Feb 24, 2016 at 2:59 PM, Koert Kuipers <ko...@tresata.com> wrote:
> 
> i am still missing something. if it is executed in the source database, which is hive in this case, then it does need hive, no? how can you execute in hive without needing hive? 
> 
> On Wed, Feb 24, 2016 at 1:25 PM, Sabarish Sasidharan <sa...@manthan.com> wrote:
> 
> I never said it needs one. All I said is that when calling context.sql() the sql is executed in the source database (assuming datasource is Hive or some RDBMS) 
> 
> Regards
> Sab 
> 
> Regards
> Sab 
> 
> On 24-Feb-2016 11:49 pm, "Mohannad Ali" <ma...@gmail.com> wrote:
> 
> That is incorrect HiveContext does not need a hive instance to run. 
> On Feb 24, 2016 19:15, "Sabarish Sasidharan" <sa...@manthan.com> wrote:
> 
> Yes 
> 
> Regards
> Sab 
> On 24-Feb-2016 9:15 pm, "Koert Kuipers" <ko...@tresata.com> wrote:
> 
> are you saying that HiveContext.sql(...) runs on hive, and not on spark sql?
> 
> On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <sa...@manthan.com> wrote:
> 
> When using SQL your full query, including the joins, were executed in Hive(or RDBMS) and only the results were brought into the Spark cluster. In the FP case, the data for the 3 tables is first pulled into the Spark cluster and then the join is executed. 
> 
> Thus the time difference. 
> 
> It's not immediately obvious why the results are different. 
> 
> Regards
> Sab 
> 
> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <mi...@cloudtechnologypartners.co.uk> wrote:
> 
> Hi, 
> 
> First thanks everyone for their suggestions. Much appreciated. 
> 
> This was the original queries written in SQL and run against Spark-shell 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("nStarted at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop") 
> 
> val rs = HiveContext.sql(
> """
> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS TotalSales
> FROM smallsales s
> INNER JOIN times t
> ON s.time_id = t.time_id
> INNER JOIN channels c
> ON s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> rs.registerTempTable("tmp")
> println ("nfirst query")
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL LIMIT 5
> """).collect.foreach(println)
> println ("nsecond query")
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC LIMIT 5
> """).collect.foreach(println)
> println ("nFinished at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> sys.exit 
> 
> The second queries were written in FP as much as I could as below 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("nStarted at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM sales")
> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM times")
> val rs = s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("nfirst query")
> val rs1 = rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("nsecond query")
> val rs2 =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
> println ("nFinished at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> sys.exit 
> 
> However The first query results are slightly different in SQL and FP (may be the first query code in FP is not exactly correct?) and more importantly the FP takes order of magnitude longer compared to SQL (8 minutes compared to less than a minute). I am not surprised as I expected Functional Programming has to flatten up all those method calls and convert them to SQL? 
> 
> THE STANDARD SQL RESULTS 
> 
> Started at
> [23/02/2016 23:55:30.30]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, channel_desc: string, TotalSales: decimal(20,0)] 
> 
> first query
> [1998-01,Direct Sales,9161730]
> [1998-01,Internet,1248581]
> [1998-01,Partners,2409776]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193] 
> 
> second query
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760] 
> 
> Finished at
> [23/02/2016 23:56:11.11] 
> 
> THE FP RESULTS 
> 
> Started at
> [23/02/2016 23:45:58.58]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID: timestamp, CHANNEL_ID: bigint]
> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC: string]
> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp, CALENDAR_MONTH_DESC: string]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, channel_desc: string, TotalSales: decimal(20,0)] 
> 
> first query
> [1998-01,Direct Sales,9086830]
> [1998-01,Internet,1247641]
> [1998-01,Partners,2393567]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
> rs1: Unit = () 
> 
> second query
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
> rs2: Unit = () 
> 
> Finished at
> [23/02/2016 23:53:42.42] 
> 
> On 22/02/2016 23:16, Mich Talebzadeh wrote: 
> 
> Hi, 
> 
> I have data stored in Hive tables that I want to do simple manipulation. 
> 
> Currently in Spark I perform the following with getting the result set using SQL from Hive tables, registering as a temporary table in Spark 
> 
> Now Ideally I can get the result set into a DF and work on DF to slice and dice the data using functional programming with filter, map. split etc. 
> 
> I wanted to get some ideas on how to go about it. 
> 
> thanks 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 
> 
> HiveContext.sql("use oraclehadoop")
> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS TotalSales
> FROM smallsales s, times t, channels c
> WHERE s.time_id = t.time_id
> AND s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> RS.REGISTERTEMPTABLE("TMP") 
> 
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL
> """).collect.foreach(println)
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC
> """).collect.foreach(println) 
> 
> -- 
> 
> Dr Mich Talebzadeh
> 
> LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw [1]
> 
> http://talebzadehmich.wordpress.com [2]
> 
> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.
> 
> -- 
> 
> Dr Mich Talebzadeh
> 
> LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw [1]
> 
> http://talebzadehmich.wordpress.com [2]
> 
> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.

-- 

Dr Mich Talebzadeh

LinkedIn
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

http://talebzadehmich.wordpress.com

NOTE: The information in this email is proprietary and confidential.
This message is for the designated recipient only, if you are not the
intended recipient, you should destroy it immediately. Any information
in this message shall not be understood as given or endorsed by Cloud
Technology Partners Ltd, its subsidiaries or their employees, unless
expressly so stated. It is the responsibility of the recipient to ensure
that this email is virus free, therefore neither Cloud Technology
partners Ltd, its subsidiaries nor their employees accept any
responsibility.

 

Links:
------
[1]
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
[2] http://talebzadehmich.wordpress.com

Re: Using functional programming rather than SQL

Posted by Koert Kuipers <ko...@tresata.com>.
my assumption, which is apparently incorrect, was that the SQL gets
translated into a catalyst plan that is executed in spark. the dataframe
operations (referred to by Mich as the FP results) also get translated into
a catalyst plan that is executed on the exact same spark platform. so
unless the SQL gets translated into a much better plan (perhaps thanks to
some pushdown into ORC?), i dont see why it can be much faster.




On Wed, Feb 24, 2016 at 2:59 PM, Koert Kuipers <ko...@tresata.com> wrote:

> i am still missing something. if it is executed in the source database,
> which is hive in this case, then it does need hive, no? how can you execute
> in hive without needing hive?
>
> On Wed, Feb 24, 2016 at 1:25 PM, Sabarish Sasidharan <
> sabarish.sasidharan@manthan.com> wrote:
>
>> I never said it needs one. All I said is that when calling context.sql()
>> the sql is executed in the source database (assuming datasource is Hive or
>> some RDBMS)
>>
>> Regards
>> Sab
>>
>> Regards
>> Sab
>> On 24-Feb-2016 11:49 pm, "Mohannad Ali" <ma...@gmail.com> wrote:
>>
>>> That is incorrect HiveContext does not need a hive instance to run.
>>> On Feb 24, 2016 19:15, "Sabarish Sasidharan" <
>>> sabarish.sasidharan@manthan.com> wrote:
>>>
>>>> Yes
>>>>
>>>> Regards
>>>> Sab
>>>> On 24-Feb-2016 9:15 pm, "Koert Kuipers" <ko...@tresata.com> wrote:
>>>>
>>>>> are you saying that HiveContext.sql(...) runs on hive, and not on
>>>>> spark sql?
>>>>>
>>>>> On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <
>>>>> sabarish.sasidharan@manthan.com> wrote:
>>>>>
>>>>>> When using SQL your full query, including the joins, were executed in
>>>>>> Hive(or RDBMS) and only the results were brought into the Spark cluster. In
>>>>>> the FP case, the data for the 3 tables is first pulled into the Spark
>>>>>> cluster and then the join is executed.
>>>>>>
>>>>>> Thus the time difference.
>>>>>>
>>>>>> It's not immediately obvious why the results are different.
>>>>>>
>>>>>> Regards
>>>>>> Sab
>>>>>> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
>>>>>> mich.talebzadeh@cloudtechnologypartners.co.uk> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> First thanks everyone for their suggestions. Much appreciated.
>>>>>>>
>>>>>>> This was the original queries written in SQL and run against
>>>>>>> Spark-shell
>>>>>>>
>>>>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>>>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>>>>> ").collect.foreach(println)
>>>>>>> HiveContext.sql("use oraclehadoop")
>>>>>>>
>>>>>>> val rs = HiveContext.sql(
>>>>>>> """
>>>>>>> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
>>>>>>> TotalSales
>>>>>>> FROM smallsales s
>>>>>>> INNER JOIN times t
>>>>>>> ON s.time_id = t.time_id
>>>>>>> INNER JOIN channels c
>>>>>>> ON s.channel_id = c.channel_id
>>>>>>> GROUP BY t.calendar_month_desc, c.channel_desc
>>>>>>> """)
>>>>>>> rs.registerTempTable("tmp")
>>>>>>> println ("\nfirst query")
>>>>>>> HiveContext.sql("""
>>>>>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL,
>>>>>>> TotalSales
>>>>>>> from tmp
>>>>>>> ORDER BY MONTH, CHANNEL LIMIT 5
>>>>>>> """).collect.foreach(println)
>>>>>>> println ("\nsecond query")
>>>>>>> HiveContext.sql("""
>>>>>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>>>>>>> FROM tmp
>>>>>>> GROUP BY channel_desc
>>>>>>> order by SALES DESC LIMIT 5
>>>>>>> """).collect.foreach(println)
>>>>>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>>>>> ").collect.foreach(println)
>>>>>>> sys.exit
>>>>>>>
>>>>>>> The second queries were written in FP as much as I could as below
>>>>>>>
>>>>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>>>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>>>>> ").collect.foreach(println)
>>>>>>> HiveContext.sql("use oraclehadoop")
>>>>>>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID
>>>>>>> FROM sales")
>>>>>>> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM
>>>>>>> channels")
>>>>>>> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM
>>>>>>> times")
>>>>>>> val rs =
>>>>>>> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>>>>>>> println ("\nfirst query")
>>>>>>> val rs1 =
>>>>>>> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
>>>>>>> println ("\nsecond query")
>>>>>>> val rs2
>>>>>>> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
>>>>>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>>>>> ").collect.foreach(println)
>>>>>>> sys.exit
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> However The first query results are slightly different in SQL and FP
>>>>>>> (may be the first query code in FP is not exactly correct?) and more
>>>>>>> importantly the FP takes order of magnitude longer compared to SQL (8
>>>>>>> minutes compared to less than a minute). I am not surprised as I expected
>>>>>>> Functional Programming has to flatten up all those method calls and convert
>>>>>>> them to SQL?
>>>>>>>
>>>>>>> *The standard SQL results*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Started at
>>>>>>> [23/02/2016 23:55:30.30]
>>>>>>> res1: org.apache.spark.sql.DataFrame = [result: string]
>>>>>>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>>>>>>> channel_desc: string, TotalSales: decimal(20,0)]
>>>>>>>
>>>>>>> first query
>>>>>>> [1998-01,Direct Sales,9161730]
>>>>>>> [1998-01,Internet,1248581]
>>>>>>> [1998-01,Partners,2409776]
>>>>>>> [1998-02,Direct Sales,9161840]
>>>>>>> [1998-02,Internet,1533193]
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> second query
>>>>>>> [Direct Sales,9161840]
>>>>>>> [Internet,3977374]
>>>>>>> [Partners,3976291]
>>>>>>> [Tele Sales,328760]
>>>>>>>
>>>>>>> Finished at
>>>>>>> [23/02/2016 23:56:11.11]
>>>>>>>
>>>>>>> *The FP results*
>>>>>>>
>>>>>>> Started at
>>>>>>> [23/02/2016 23:45:58.58]
>>>>>>> res1: org.apache.spark.sql.DataFrame = [result: string]
>>>>>>> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0),
>>>>>>> TIME_ID: timestamp, CHANNEL_ID: bigint]
>>>>>>> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double,
>>>>>>> CHANNEL_DESC: string]
>>>>>>> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
>>>>>>> CALENDAR_MONTH_DESC: string]
>>>>>>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>>>>>>> channel_desc: string, TotalSales: decimal(20,0)]
>>>>>>>
>>>>>>> first query
>>>>>>> [1998-01,Direct Sales,9086830]
>>>>>>> [1998-01,Internet,1247641]
>>>>>>> [1998-01,Partners,2393567]
>>>>>>> [1998-02,Direct Sales,9161840]
>>>>>>> [1998-02,Internet,1533193]
>>>>>>> rs1: Unit = ()
>>>>>>>
>>>>>>> second query
>>>>>>> [Direct Sales,9161840]
>>>>>>> [Internet,3977374]
>>>>>>> [Partners,3976291]
>>>>>>> [Tele Sales,328760]
>>>>>>> rs2: Unit = ()
>>>>>>>
>>>>>>> Finished at
>>>>>>> [23/02/2016 23:53:42.42]
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 22/02/2016 23:16, Mich Talebzadeh wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I have data stored in Hive tables that I want to do simple
>>>>>>> manipulation.
>>>>>>>
>>>>>>> Currently in Spark I perform the following with getting the result
>>>>>>> set using SQL from Hive tables, registering as a temporary table in Spark
>>>>>>>
>>>>>>> Now Ideally I can get the result set into a DF and work on DF to
>>>>>>> slice and dice the data using functional programming with filter, map.
>>>>>>> split etc.
>>>>>>>
>>>>>>> I wanted to get some ideas on how to go about it.
>>>>>>>
>>>>>>> thanks
>>>>>>>
>>>>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>>>>>
>>>>>>> HiveContext.sql("use oraclehadoop")
>>>>>>> val rs = HiveContext.sql("""SELECT t.calendar_month_desc,
>>>>>>> c.channel_desc, SUM(s.amount_sold) AS TotalSales
>>>>>>> FROM smallsales s, times t, channels c
>>>>>>> WHERE s.time_id = t.time_id
>>>>>>> AND   s.channel_id = c.channel_id
>>>>>>> GROUP BY t.calendar_month_desc, c.channel_desc
>>>>>>> """)
>>>>>>> *rs.registerTempTable("tmp")*
>>>>>>>
>>>>>>>
>>>>>>> HiveContext.sql("""
>>>>>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL,
>>>>>>> TotalSales
>>>>>>> from tmp
>>>>>>> ORDER BY MONTH, CHANNEL
>>>>>>> """).collect.foreach(println)
>>>>>>> HiveContext.sql("""
>>>>>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>>>>>>> FROM tmp
>>>>>>> GROUP BY channel_desc
>>>>>>> order by SALES DESC
>>>>>>> """).collect.foreach(println)
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Dr Mich Talebzadeh
>>>>>>>
>>>>>>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>
>>>>>>> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Dr Mich Talebzadeh
>>>>>>>
>>>>>>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>
>>>>>>> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>

Re: Using functional programming rather than SQL

Posted by Koert Kuipers <ko...@tresata.com>.
i am still missing something. if it is executed in the source database,
which is hive in this case, then it does need hive, no? how can you execute
in hive without needing hive?

On Wed, Feb 24, 2016 at 1:25 PM, Sabarish Sasidharan <
sabarish.sasidharan@manthan.com> wrote:

> I never said it needs one. All I said is that when calling context.sql()
> the sql is executed in the source database (assuming datasource is Hive or
> some RDBMS)
>
> Regards
> Sab
>
> Regards
> Sab
> On 24-Feb-2016 11:49 pm, "Mohannad Ali" <ma...@gmail.com> wrote:
>
>> That is incorrect HiveContext does not need a hive instance to run.
>> On Feb 24, 2016 19:15, "Sabarish Sasidharan" <
>> sabarish.sasidharan@manthan.com> wrote:
>>
>>> Yes
>>>
>>> Regards
>>> Sab
>>> On 24-Feb-2016 9:15 pm, "Koert Kuipers" <ko...@tresata.com> wrote:
>>>
>>>> are you saying that HiveContext.sql(...) runs on hive, and not on
>>>> spark sql?
>>>>
>>>> On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <
>>>> sabarish.sasidharan@manthan.com> wrote:
>>>>
>>>>> When using SQL your full query, including the joins, were executed in
>>>>> Hive(or RDBMS) and only the results were brought into the Spark cluster. In
>>>>> the FP case, the data for the 3 tables is first pulled into the Spark
>>>>> cluster and then the join is executed.
>>>>>
>>>>> Thus the time difference.
>>>>>
>>>>> It's not immediately obvious why the results are different.
>>>>>
>>>>> Regards
>>>>> Sab
>>>>> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
>>>>> mich.talebzadeh@cloudtechnologypartners.co.uk> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> First thanks everyone for their suggestions. Much appreciated.
>>>>>>
>>>>>> This was the original queries written in SQL and run against
>>>>>> Spark-shell
>>>>>>
>>>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>>>> ").collect.foreach(println)
>>>>>> HiveContext.sql("use oraclehadoop")
>>>>>>
>>>>>> val rs = HiveContext.sql(
>>>>>> """
>>>>>> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
>>>>>> TotalSales
>>>>>> FROM smallsales s
>>>>>> INNER JOIN times t
>>>>>> ON s.time_id = t.time_id
>>>>>> INNER JOIN channels c
>>>>>> ON s.channel_id = c.channel_id
>>>>>> GROUP BY t.calendar_month_desc, c.channel_desc
>>>>>> """)
>>>>>> rs.registerTempTable("tmp")
>>>>>> println ("\nfirst query")
>>>>>> HiveContext.sql("""
>>>>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL,
>>>>>> TotalSales
>>>>>> from tmp
>>>>>> ORDER BY MONTH, CHANNEL LIMIT 5
>>>>>> """).collect.foreach(println)
>>>>>> println ("\nsecond query")
>>>>>> HiveContext.sql("""
>>>>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>>>>>> FROM tmp
>>>>>> GROUP BY channel_desc
>>>>>> order by SALES DESC LIMIT 5
>>>>>> """).collect.foreach(println)
>>>>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>>>> ").collect.foreach(println)
>>>>>> sys.exit
>>>>>>
>>>>>> The second queries were written in FP as much as I could as below
>>>>>>
>>>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>>>> ").collect.foreach(println)
>>>>>> HiveContext.sql("use oraclehadoop")
>>>>>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
>>>>>> sales")
>>>>>> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM
>>>>>> channels")
>>>>>> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM
>>>>>> times")
>>>>>> val rs =
>>>>>> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>>>>>> println ("\nfirst query")
>>>>>> val rs1 =
>>>>>> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
>>>>>> println ("\nsecond query")
>>>>>> val rs2
>>>>>> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
>>>>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>>>> ").collect.foreach(println)
>>>>>> sys.exit
>>>>>>
>>>>>>
>>>>>>
>>>>>> However The first query results are slightly different in SQL and FP
>>>>>> (may be the first query code in FP is not exactly correct?) and more
>>>>>> importantly the FP takes order of magnitude longer compared to SQL (8
>>>>>> minutes compared to less than a minute). I am not surprised as I expected
>>>>>> Functional Programming has to flatten up all those method calls and convert
>>>>>> them to SQL?
>>>>>>
>>>>>> *The standard SQL results*
>>>>>>
>>>>>>
>>>>>>
>>>>>> Started at
>>>>>> [23/02/2016 23:55:30.30]
>>>>>> res1: org.apache.spark.sql.DataFrame = [result: string]
>>>>>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>>>>>> channel_desc: string, TotalSales: decimal(20,0)]
>>>>>>
>>>>>> first query
>>>>>> [1998-01,Direct Sales,9161730]
>>>>>> [1998-01,Internet,1248581]
>>>>>> [1998-01,Partners,2409776]
>>>>>> [1998-02,Direct Sales,9161840]
>>>>>> [1998-02,Internet,1533193]
>>>>>>
>>>>>>
>>>>>>
>>>>>> second query
>>>>>> [Direct Sales,9161840]
>>>>>> [Internet,3977374]
>>>>>> [Partners,3976291]
>>>>>> [Tele Sales,328760]
>>>>>>
>>>>>> Finished at
>>>>>> [23/02/2016 23:56:11.11]
>>>>>>
>>>>>> *The FP results*
>>>>>>
>>>>>> Started at
>>>>>> [23/02/2016 23:45:58.58]
>>>>>> res1: org.apache.spark.sql.DataFrame = [result: string]
>>>>>> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0),
>>>>>> TIME_ID: timestamp, CHANNEL_ID: bigint]
>>>>>> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double,
>>>>>> CHANNEL_DESC: string]
>>>>>> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
>>>>>> CALENDAR_MONTH_DESC: string]
>>>>>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>>>>>> channel_desc: string, TotalSales: decimal(20,0)]
>>>>>>
>>>>>> first query
>>>>>> [1998-01,Direct Sales,9086830]
>>>>>> [1998-01,Internet,1247641]
>>>>>> [1998-01,Partners,2393567]
>>>>>> [1998-02,Direct Sales,9161840]
>>>>>> [1998-02,Internet,1533193]
>>>>>> rs1: Unit = ()
>>>>>>
>>>>>> second query
>>>>>> [Direct Sales,9161840]
>>>>>> [Internet,3977374]
>>>>>> [Partners,3976291]
>>>>>> [Tele Sales,328760]
>>>>>> rs2: Unit = ()
>>>>>>
>>>>>> Finished at
>>>>>> [23/02/2016 23:53:42.42]
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 22/02/2016 23:16, Mich Talebzadeh wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have data stored in Hive tables that I want to do simple
>>>>>> manipulation.
>>>>>>
>>>>>> Currently in Spark I perform the following with getting the result
>>>>>> set using SQL from Hive tables, registering as a temporary table in Spark
>>>>>>
>>>>>> Now Ideally I can get the result set into a DF and work on DF to
>>>>>> slice and dice the data using functional programming with filter, map.
>>>>>> split etc.
>>>>>>
>>>>>> I wanted to get some ideas on how to go about it.
>>>>>>
>>>>>> thanks
>>>>>>
>>>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>>>>
>>>>>> HiveContext.sql("use oraclehadoop")
>>>>>> val rs = HiveContext.sql("""SELECT t.calendar_month_desc,
>>>>>> c.channel_desc, SUM(s.amount_sold) AS TotalSales
>>>>>> FROM smallsales s, times t, channels c
>>>>>> WHERE s.time_id = t.time_id
>>>>>> AND   s.channel_id = c.channel_id
>>>>>> GROUP BY t.calendar_month_desc, c.channel_desc
>>>>>> """)
>>>>>> *rs.registerTempTable("tmp")*
>>>>>>
>>>>>>
>>>>>> HiveContext.sql("""
>>>>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL,
>>>>>> TotalSales
>>>>>> from tmp
>>>>>> ORDER BY MONTH, CHANNEL
>>>>>> """).collect.foreach(println)
>>>>>> HiveContext.sql("""
>>>>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>>>>>> FROM tmp
>>>>>> GROUP BY channel_desc
>>>>>> order by SALES DESC
>>>>>> """).collect.foreach(println)
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Dr Mich Talebzadeh
>>>>>>
>>>>>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>> http://talebzadehmich.wordpress.com
>>>>>>
>>>>>> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Dr Mich Talebzadeh
>>>>>>
>>>>>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>> http://talebzadehmich.wordpress.com
>>>>>>
>>>>>> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.
>>>>>>
>>>>>>
>>>>>>
>>>>

Re: Using functional programming rather than SQL

Posted by Mohannad Ali <ma...@gmail.com>.
My apologies I definitely misunderstood. You are 100% correct.
On Feb 24, 2016 19:25, "Sabarish Sasidharan" <
sabarish.sasidharan@manthan.com> wrote:

> I never said it needs one. All I said is that when calling context.sql()
> the sql is executed in the source database (assuming datasource is Hive or
> some RDBMS)
>
> Regards
> Sab
>
> Regards
> Sab
> On 24-Feb-2016 11:49 pm, "Mohannad Ali" <ma...@gmail.com> wrote:
>
>> That is incorrect HiveContext does not need a hive instance to run.
>> On Feb 24, 2016 19:15, "Sabarish Sasidharan" <
>> sabarish.sasidharan@manthan.com> wrote:
>>
>>> Yes
>>>
>>> Regards
>>> Sab
>>> On 24-Feb-2016 9:15 pm, "Koert Kuipers" <ko...@tresata.com> wrote:
>>>
>>>> are you saying that HiveContext.sql(...) runs on hive, and not on
>>>> spark sql?
>>>>
>>>> On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <
>>>> sabarish.sasidharan@manthan.com> wrote:
>>>>
>>>>> When using SQL your full query, including the joins, were executed in
>>>>> Hive(or RDBMS) and only the results were brought into the Spark cluster. In
>>>>> the FP case, the data for the 3 tables is first pulled into the Spark
>>>>> cluster and then the join is executed.
>>>>>
>>>>> Thus the time difference.
>>>>>
>>>>> It's not immediately obvious why the results are different.
>>>>>
>>>>> Regards
>>>>> Sab
>>>>> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
>>>>> mich.talebzadeh@cloudtechnologypartners.co.uk> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> First thanks everyone for their suggestions. Much appreciated.
>>>>>>
>>>>>> This was the original queries written in SQL and run against
>>>>>> Spark-shell
>>>>>>
>>>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>>>> ").collect.foreach(println)
>>>>>> HiveContext.sql("use oraclehadoop")
>>>>>>
>>>>>> val rs = HiveContext.sql(
>>>>>> """
>>>>>> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
>>>>>> TotalSales
>>>>>> FROM smallsales s
>>>>>> INNER JOIN times t
>>>>>> ON s.time_id = t.time_id
>>>>>> INNER JOIN channels c
>>>>>> ON s.channel_id = c.channel_id
>>>>>> GROUP BY t.calendar_month_desc, c.channel_desc
>>>>>> """)
>>>>>> rs.registerTempTable("tmp")
>>>>>> println ("\nfirst query")
>>>>>> HiveContext.sql("""
>>>>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL,
>>>>>> TotalSales
>>>>>> from tmp
>>>>>> ORDER BY MONTH, CHANNEL LIMIT 5
>>>>>> """).collect.foreach(println)
>>>>>> println ("\nsecond query")
>>>>>> HiveContext.sql("""
>>>>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>>>>>> FROM tmp
>>>>>> GROUP BY channel_desc
>>>>>> order by SALES DESC LIMIT 5
>>>>>> """).collect.foreach(println)
>>>>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>>>> ").collect.foreach(println)
>>>>>> sys.exit
>>>>>>
>>>>>> The second queries were written in FP as much as I could as below
>>>>>>
>>>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>>>> ").collect.foreach(println)
>>>>>> HiveContext.sql("use oraclehadoop")
>>>>>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
>>>>>> sales")
>>>>>> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM
>>>>>> channels")
>>>>>> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM
>>>>>> times")
>>>>>> val rs =
>>>>>> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>>>>>> println ("\nfirst query")
>>>>>> val rs1 =
>>>>>> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
>>>>>> println ("\nsecond query")
>>>>>> val rs2
>>>>>> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
>>>>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>>>> ").collect.foreach(println)
>>>>>> sys.exit
>>>>>>
>>>>>>
>>>>>>
>>>>>> However The first query results are slightly different in SQL and FP
>>>>>> (may be the first query code in FP is not exactly correct?) and more
>>>>>> importantly the FP takes order of magnitude longer compared to SQL (8
>>>>>> minutes compared to less than a minute). I am not surprised as I expected
>>>>>> Functional Programming has to flatten up all those method calls and convert
>>>>>> them to SQL?
>>>>>>
>>>>>> *The standard SQL results*
>>>>>>
>>>>>>
>>>>>>
>>>>>> Started at
>>>>>> [23/02/2016 23:55:30.30]
>>>>>> res1: org.apache.spark.sql.DataFrame = [result: string]
>>>>>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>>>>>> channel_desc: string, TotalSales: decimal(20,0)]
>>>>>>
>>>>>> first query
>>>>>> [1998-01,Direct Sales,9161730]
>>>>>> [1998-01,Internet,1248581]
>>>>>> [1998-01,Partners,2409776]
>>>>>> [1998-02,Direct Sales,9161840]
>>>>>> [1998-02,Internet,1533193]
>>>>>>
>>>>>>
>>>>>>
>>>>>> second query
>>>>>> [Direct Sales,9161840]
>>>>>> [Internet,3977374]
>>>>>> [Partners,3976291]
>>>>>> [Tele Sales,328760]
>>>>>>
>>>>>> Finished at
>>>>>> [23/02/2016 23:56:11.11]
>>>>>>
>>>>>> *The FP results*
>>>>>>
>>>>>> Started at
>>>>>> [23/02/2016 23:45:58.58]
>>>>>> res1: org.apache.spark.sql.DataFrame = [result: string]
>>>>>> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0),
>>>>>> TIME_ID: timestamp, CHANNEL_ID: bigint]
>>>>>> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double,
>>>>>> CHANNEL_DESC: string]
>>>>>> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
>>>>>> CALENDAR_MONTH_DESC: string]
>>>>>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>>>>>> channel_desc: string, TotalSales: decimal(20,0)]
>>>>>>
>>>>>> first query
>>>>>> [1998-01,Direct Sales,9086830]
>>>>>> [1998-01,Internet,1247641]
>>>>>> [1998-01,Partners,2393567]
>>>>>> [1998-02,Direct Sales,9161840]
>>>>>> [1998-02,Internet,1533193]
>>>>>> rs1: Unit = ()
>>>>>>
>>>>>> second query
>>>>>> [Direct Sales,9161840]
>>>>>> [Internet,3977374]
>>>>>> [Partners,3976291]
>>>>>> [Tele Sales,328760]
>>>>>> rs2: Unit = ()
>>>>>>
>>>>>> Finished at
>>>>>> [23/02/2016 23:53:42.42]
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 22/02/2016 23:16, Mich Talebzadeh wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have data stored in Hive tables that I want to do simple
>>>>>> manipulation.
>>>>>>
>>>>>> Currently in Spark I perform the following with getting the result
>>>>>> set using SQL from Hive tables, registering as a temporary table in Spark
>>>>>>
>>>>>> Now Ideally I can get the result set into a DF and work on DF to
>>>>>> slice and dice the data using functional programming with filter, map.
>>>>>> split etc.
>>>>>>
>>>>>> I wanted to get some ideas on how to go about it.
>>>>>>
>>>>>> thanks
>>>>>>
>>>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>>>>
>>>>>> HiveContext.sql("use oraclehadoop")
>>>>>> val rs = HiveContext.sql("""SELECT t.calendar_month_desc,
>>>>>> c.channel_desc, SUM(s.amount_sold) AS TotalSales
>>>>>> FROM smallsales s, times t, channels c
>>>>>> WHERE s.time_id = t.time_id
>>>>>> AND   s.channel_id = c.channel_id
>>>>>> GROUP BY t.calendar_month_desc, c.channel_desc
>>>>>> """)
>>>>>> *rs.registerTempTable("tmp")*
>>>>>>
>>>>>>
>>>>>> HiveContext.sql("""
>>>>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL,
>>>>>> TotalSales
>>>>>> from tmp
>>>>>> ORDER BY MONTH, CHANNEL
>>>>>> """).collect.foreach(println)
>>>>>> HiveContext.sql("""
>>>>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>>>>>> FROM tmp
>>>>>> GROUP BY channel_desc
>>>>>> order by SALES DESC
>>>>>> """).collect.foreach(println)
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Dr Mich Talebzadeh
>>>>>>
>>>>>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>> http://talebzadehmich.wordpress.com
>>>>>>
>>>>>> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Dr Mich Talebzadeh
>>>>>>
>>>>>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>> http://talebzadehmich.wordpress.com
>>>>>>
>>>>>> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.
>>>>>>
>>>>>>
>>>>>>
>>>>

Re: Using functional programming rather than SQL

Posted by Sabarish Sasidharan <sa...@manthan.com>.
I never said it needs one. All I said is that when calling context.sql()
the sql is executed in the source database (assuming datasource is Hive or
some RDBMS)

Regards
Sab

Regards
Sab
On 24-Feb-2016 11:49 pm, "Mohannad Ali" <ma...@gmail.com> wrote:

> That is incorrect HiveContext does not need a hive instance to run.
> On Feb 24, 2016 19:15, "Sabarish Sasidharan" <
> sabarish.sasidharan@manthan.com> wrote:
>
>> Yes
>>
>> Regards
>> Sab
>> On 24-Feb-2016 9:15 pm, "Koert Kuipers" <ko...@tresata.com> wrote:
>>
>>> are you saying that HiveContext.sql(...) runs on hive, and not on spark
>>> sql?
>>>
>>> On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <
>>> sabarish.sasidharan@manthan.com> wrote:
>>>
>>>> When using SQL your full query, including the joins, were executed in
>>>> Hive(or RDBMS) and only the results were brought into the Spark cluster. In
>>>> the FP case, the data for the 3 tables is first pulled into the Spark
>>>> cluster and then the join is executed.
>>>>
>>>> Thus the time difference.
>>>>
>>>> It's not immediately obvious why the results are different.
>>>>
>>>> Regards
>>>> Sab
>>>> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
>>>> mich.talebzadeh@cloudtechnologypartners.co.uk> wrote:
>>>>
>>>>>
>>>>>
>>>>> Hi,
>>>>>
>>>>> First thanks everyone for their suggestions. Much appreciated.
>>>>>
>>>>> This was the original queries written in SQL and run against
>>>>> Spark-shell
>>>>>
>>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>>> ").collect.foreach(println)
>>>>> HiveContext.sql("use oraclehadoop")
>>>>>
>>>>> val rs = HiveContext.sql(
>>>>> """
>>>>> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
>>>>> TotalSales
>>>>> FROM smallsales s
>>>>> INNER JOIN times t
>>>>> ON s.time_id = t.time_id
>>>>> INNER JOIN channels c
>>>>> ON s.channel_id = c.channel_id
>>>>> GROUP BY t.calendar_month_desc, c.channel_desc
>>>>> """)
>>>>> rs.registerTempTable("tmp")
>>>>> println ("\nfirst query")
>>>>> HiveContext.sql("""
>>>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL,
>>>>> TotalSales
>>>>> from tmp
>>>>> ORDER BY MONTH, CHANNEL LIMIT 5
>>>>> """).collect.foreach(println)
>>>>> println ("\nsecond query")
>>>>> HiveContext.sql("""
>>>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>>>>> FROM tmp
>>>>> GROUP BY channel_desc
>>>>> order by SALES DESC LIMIT 5
>>>>> """).collect.foreach(println)
>>>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>>> ").collect.foreach(println)
>>>>> sys.exit
>>>>>
>>>>> The second queries were written in FP as much as I could as below
>>>>>
>>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>>> ").collect.foreach(println)
>>>>> HiveContext.sql("use oraclehadoop")
>>>>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
>>>>> sales")
>>>>> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM
>>>>> channels")
>>>>> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM
>>>>> times")
>>>>> val rs =
>>>>> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>>>>> println ("\nfirst query")
>>>>> val rs1 =
>>>>> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
>>>>> println ("\nsecond query")
>>>>> val rs2
>>>>> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
>>>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>>> ").collect.foreach(println)
>>>>> sys.exit
>>>>>
>>>>>
>>>>>
>>>>> However The first query results are slightly different in SQL and FP
>>>>> (may be the first query code in FP is not exactly correct?) and more
>>>>> importantly the FP takes order of magnitude longer compared to SQL (8
>>>>> minutes compared to less than a minute). I am not surprised as I expected
>>>>> Functional Programming has to flatten up all those method calls and convert
>>>>> them to SQL?
>>>>>
>>>>> *The standard SQL results*
>>>>>
>>>>>
>>>>>
>>>>> Started at
>>>>> [23/02/2016 23:55:30.30]
>>>>> res1: org.apache.spark.sql.DataFrame = [result: string]
>>>>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>>>>> channel_desc: string, TotalSales: decimal(20,0)]
>>>>>
>>>>> first query
>>>>> [1998-01,Direct Sales,9161730]
>>>>> [1998-01,Internet,1248581]
>>>>> [1998-01,Partners,2409776]
>>>>> [1998-02,Direct Sales,9161840]
>>>>> [1998-02,Internet,1533193]
>>>>>
>>>>>
>>>>>
>>>>> second query
>>>>> [Direct Sales,9161840]
>>>>> [Internet,3977374]
>>>>> [Partners,3976291]
>>>>> [Tele Sales,328760]
>>>>>
>>>>> Finished at
>>>>> [23/02/2016 23:56:11.11]
>>>>>
>>>>> *The FP results*
>>>>>
>>>>> Started at
>>>>> [23/02/2016 23:45:58.58]
>>>>> res1: org.apache.spark.sql.DataFrame = [result: string]
>>>>> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0),
>>>>> TIME_ID: timestamp, CHANNEL_ID: bigint]
>>>>> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
>>>>> string]
>>>>> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
>>>>> CALENDAR_MONTH_DESC: string]
>>>>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>>>>> channel_desc: string, TotalSales: decimal(20,0)]
>>>>>
>>>>> first query
>>>>> [1998-01,Direct Sales,9086830]
>>>>> [1998-01,Internet,1247641]
>>>>> [1998-01,Partners,2393567]
>>>>> [1998-02,Direct Sales,9161840]
>>>>> [1998-02,Internet,1533193]
>>>>> rs1: Unit = ()
>>>>>
>>>>> second query
>>>>> [Direct Sales,9161840]
>>>>> [Internet,3977374]
>>>>> [Partners,3976291]
>>>>> [Tele Sales,328760]
>>>>> rs2: Unit = ()
>>>>>
>>>>> Finished at
>>>>> [23/02/2016 23:53:42.42]
>>>>>
>>>>>
>>>>>
>>>>> On 22/02/2016 23:16, Mich Talebzadeh wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I have data stored in Hive tables that I want to do simple
>>>>> manipulation.
>>>>>
>>>>> Currently in Spark I perform the following with getting the result set
>>>>> using SQL from Hive tables, registering as a temporary table in Spark
>>>>>
>>>>> Now Ideally I can get the result set into a DF and work on DF to slice
>>>>> and dice the data using functional programming with filter, map. split etc.
>>>>>
>>>>> I wanted to get some ideas on how to go about it.
>>>>>
>>>>> thanks
>>>>>
>>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>>>
>>>>> HiveContext.sql("use oraclehadoop")
>>>>> val rs = HiveContext.sql("""SELECT t.calendar_month_desc,
>>>>> c.channel_desc, SUM(s.amount_sold) AS TotalSales
>>>>> FROM smallsales s, times t, channels c
>>>>> WHERE s.time_id = t.time_id
>>>>> AND   s.channel_id = c.channel_id
>>>>> GROUP BY t.calendar_month_desc, c.channel_desc
>>>>> """)
>>>>> *rs.registerTempTable("tmp")*
>>>>>
>>>>>
>>>>> HiveContext.sql("""
>>>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL,
>>>>> TotalSales
>>>>> from tmp
>>>>> ORDER BY MONTH, CHANNEL
>>>>> """).collect.foreach(println)
>>>>> HiveContext.sql("""
>>>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>>>>> FROM tmp
>>>>> GROUP BY channel_desc
>>>>> order by SALES DESC
>>>>> """).collect.foreach(println)
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.
>>>>>
>>>>>
>>>>>
>>>

Re: Using functional programming rather than SQL

Posted by Mohannad Ali <ma...@gmail.com>.
That is incorrect HiveContext does not need a hive instance to run.
On Feb 24, 2016 19:15, "Sabarish Sasidharan" <
sabarish.sasidharan@manthan.com> wrote:

> Yes
>
> Regards
> Sab
> On 24-Feb-2016 9:15 pm, "Koert Kuipers" <ko...@tresata.com> wrote:
>
>> are you saying that HiveContext.sql(...) runs on hive, and not on spark
>> sql?
>>
>> On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <
>> sabarish.sasidharan@manthan.com> wrote:
>>
>>> When using SQL your full query, including the joins, were executed in
>>> Hive(or RDBMS) and only the results were brought into the Spark cluster. In
>>> the FP case, the data for the 3 tables is first pulled into the Spark
>>> cluster and then the join is executed.
>>>
>>> Thus the time difference.
>>>
>>> It's not immediately obvious why the results are different.
>>>
>>> Regards
>>> Sab
>>> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
>>> mich.talebzadeh@cloudtechnologypartners.co.uk> wrote:
>>>
>>>>
>>>>
>>>> Hi,
>>>>
>>>> First thanks everyone for their suggestions. Much appreciated.
>>>>
>>>> This was the original queries written in SQL and run against Spark-shell
>>>>
>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>> ").collect.foreach(println)
>>>> HiveContext.sql("use oraclehadoop")
>>>>
>>>> val rs = HiveContext.sql(
>>>> """
>>>> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
>>>> TotalSales
>>>> FROM smallsales s
>>>> INNER JOIN times t
>>>> ON s.time_id = t.time_id
>>>> INNER JOIN channels c
>>>> ON s.channel_id = c.channel_id
>>>> GROUP BY t.calendar_month_desc, c.channel_desc
>>>> """)
>>>> rs.registerTempTable("tmp")
>>>> println ("\nfirst query")
>>>> HiveContext.sql("""
>>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
>>>> from tmp
>>>> ORDER BY MONTH, CHANNEL LIMIT 5
>>>> """).collect.foreach(println)
>>>> println ("\nsecond query")
>>>> HiveContext.sql("""
>>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>>>> FROM tmp
>>>> GROUP BY channel_desc
>>>> order by SALES DESC LIMIT 5
>>>> """).collect.foreach(println)
>>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>> ").collect.foreach(println)
>>>> sys.exit
>>>>
>>>> The second queries were written in FP as much as I could as below
>>>>
>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>> ").collect.foreach(println)
>>>> HiveContext.sql("use oraclehadoop")
>>>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
>>>> sales")
>>>> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
>>>> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM
>>>> times")
>>>> val rs =
>>>> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>>>> println ("\nfirst query")
>>>> val rs1 =
>>>> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
>>>> println ("\nsecond query")
>>>> val rs2
>>>> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
>>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>> ").collect.foreach(println)
>>>> sys.exit
>>>>
>>>>
>>>>
>>>> However The first query results are slightly different in SQL and FP
>>>> (may be the first query code in FP is not exactly correct?) and more
>>>> importantly the FP takes order of magnitude longer compared to SQL (8
>>>> minutes compared to less than a minute). I am not surprised as I expected
>>>> Functional Programming has to flatten up all those method calls and convert
>>>> them to SQL?
>>>>
>>>> *The standard SQL results*
>>>>
>>>>
>>>>
>>>> Started at
>>>> [23/02/2016 23:55:30.30]
>>>> res1: org.apache.spark.sql.DataFrame = [result: string]
>>>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>>>> channel_desc: string, TotalSales: decimal(20,0)]
>>>>
>>>> first query
>>>> [1998-01,Direct Sales,9161730]
>>>> [1998-01,Internet,1248581]
>>>> [1998-01,Partners,2409776]
>>>> [1998-02,Direct Sales,9161840]
>>>> [1998-02,Internet,1533193]
>>>>
>>>>
>>>>
>>>> second query
>>>> [Direct Sales,9161840]
>>>> [Internet,3977374]
>>>> [Partners,3976291]
>>>> [Tele Sales,328760]
>>>>
>>>> Finished at
>>>> [23/02/2016 23:56:11.11]
>>>>
>>>> *The FP results*
>>>>
>>>> Started at
>>>> [23/02/2016 23:45:58.58]
>>>> res1: org.apache.spark.sql.DataFrame = [result: string]
>>>> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0),
>>>> TIME_ID: timestamp, CHANNEL_ID: bigint]
>>>> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
>>>> string]
>>>> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
>>>> CALENDAR_MONTH_DESC: string]
>>>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>>>> channel_desc: string, TotalSales: decimal(20,0)]
>>>>
>>>> first query
>>>> [1998-01,Direct Sales,9086830]
>>>> [1998-01,Internet,1247641]
>>>> [1998-01,Partners,2393567]
>>>> [1998-02,Direct Sales,9161840]
>>>> [1998-02,Internet,1533193]
>>>> rs1: Unit = ()
>>>>
>>>> second query
>>>> [Direct Sales,9161840]
>>>> [Internet,3977374]
>>>> [Partners,3976291]
>>>> [Tele Sales,328760]
>>>> rs2: Unit = ()
>>>>
>>>> Finished at
>>>> [23/02/2016 23:53:42.42]
>>>>
>>>>
>>>>
>>>> On 22/02/2016 23:16, Mich Talebzadeh wrote:
>>>>
>>>> Hi,
>>>>
>>>> I have data stored in Hive tables that I want to do simple manipulation.
>>>>
>>>> Currently in Spark I perform the following with getting the result set
>>>> using SQL from Hive tables, registering as a temporary table in Spark
>>>>
>>>> Now Ideally I can get the result set into a DF and work on DF to slice
>>>> and dice the data using functional programming with filter, map. split etc.
>>>>
>>>> I wanted to get some ideas on how to go about it.
>>>>
>>>> thanks
>>>>
>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>>
>>>> HiveContext.sql("use oraclehadoop")
>>>> val rs = HiveContext.sql("""SELECT t.calendar_month_desc,
>>>> c.channel_desc, SUM(s.amount_sold) AS TotalSales
>>>> FROM smallsales s, times t, channels c
>>>> WHERE s.time_id = t.time_id
>>>> AND   s.channel_id = c.channel_id
>>>> GROUP BY t.calendar_month_desc, c.channel_desc
>>>> """)
>>>> *rs.registerTempTable("tmp")*
>>>>
>>>>
>>>> HiveContext.sql("""
>>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
>>>> from tmp
>>>> ORDER BY MONTH, CHANNEL
>>>> """).collect.foreach(println)
>>>> HiveContext.sql("""
>>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>>>> FROM tmp
>>>> GROUP BY channel_desc
>>>> order by SALES DESC
>>>> """).collect.foreach(println)
>>>>
>>>>
>>>> --
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.
>>>>
>>>>
>>>>
>>

Re: Using functional programming rather than SQL

Posted by Sabarish Sasidharan <sa...@manthan.com>.
Yes

Regards
Sab
On 24-Feb-2016 9:15 pm, "Koert Kuipers" <ko...@tresata.com> wrote:

> are you saying that HiveContext.sql(...) runs on hive, and not on spark
> sql?
>
> On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <
> sabarish.sasidharan@manthan.com> wrote:
>
>> When using SQL your full query, including the joins, were executed in
>> Hive(or RDBMS) and only the results were brought into the Spark cluster. In
>> the FP case, the data for the 3 tables is first pulled into the Spark
>> cluster and then the join is executed.
>>
>> Thus the time difference.
>>
>> It's not immediately obvious why the results are different.
>>
>> Regards
>> Sab
>> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
>> mich.talebzadeh@cloudtechnologypartners.co.uk> wrote:
>>
>>>
>>>
>>> Hi,
>>>
>>> First thanks everyone for their suggestions. Much appreciated.
>>>
>>> This was the original queries written in SQL and run against Spark-shell
>>>
>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> HiveContext.sql("use oraclehadoop")
>>>
>>> val rs = HiveContext.sql(
>>> """
>>> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
>>> TotalSales
>>> FROM smallsales s
>>> INNER JOIN times t
>>> ON s.time_id = t.time_id
>>> INNER JOIN channels c
>>> ON s.channel_id = c.channel_id
>>> GROUP BY t.calendar_month_desc, c.channel_desc
>>> """)
>>> rs.registerTempTable("tmp")
>>> println ("\nfirst query")
>>> HiveContext.sql("""
>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
>>> from tmp
>>> ORDER BY MONTH, CHANNEL LIMIT 5
>>> """).collect.foreach(println)
>>> println ("\nsecond query")
>>> HiveContext.sql("""
>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>>> FROM tmp
>>> GROUP BY channel_desc
>>> order by SALES DESC LIMIT 5
>>> """).collect.foreach(println)
>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> sys.exit
>>>
>>> The second queries were written in FP as much as I could as below
>>>
>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> HiveContext.sql("use oraclehadoop")
>>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
>>> sales")
>>> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
>>> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM times")
>>> val rs =
>>> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>>> println ("\nfirst query")
>>> val rs1 =
>>> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
>>> println ("\nsecond query")
>>> val rs2
>>> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> sys.exit
>>>
>>>
>>>
>>> However The first query results are slightly different in SQL and FP
>>> (may be the first query code in FP is not exactly correct?) and more
>>> importantly the FP takes order of magnitude longer compared to SQL (8
>>> minutes compared to less than a minute). I am not surprised as I expected
>>> Functional Programming has to flatten up all those method calls and convert
>>> them to SQL?
>>>
>>> *The standard SQL results*
>>>
>>>
>>>
>>> Started at
>>> [23/02/2016 23:55:30.30]
>>> res1: org.apache.spark.sql.DataFrame = [result: string]
>>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>>> channel_desc: string, TotalSales: decimal(20,0)]
>>>
>>> first query
>>> [1998-01,Direct Sales,9161730]
>>> [1998-01,Internet,1248581]
>>> [1998-01,Partners,2409776]
>>> [1998-02,Direct Sales,9161840]
>>> [1998-02,Internet,1533193]
>>>
>>>
>>>
>>> second query
>>> [Direct Sales,9161840]
>>> [Internet,3977374]
>>> [Partners,3976291]
>>> [Tele Sales,328760]
>>>
>>> Finished at
>>> [23/02/2016 23:56:11.11]
>>>
>>> *The FP results*
>>>
>>> Started at
>>> [23/02/2016 23:45:58.58]
>>> res1: org.apache.spark.sql.DataFrame = [result: string]
>>> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0),
>>> TIME_ID: timestamp, CHANNEL_ID: bigint]
>>> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
>>> string]
>>> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
>>> CALENDAR_MONTH_DESC: string]
>>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>>> channel_desc: string, TotalSales: decimal(20,0)]
>>>
>>> first query
>>> [1998-01,Direct Sales,9086830]
>>> [1998-01,Internet,1247641]
>>> [1998-01,Partners,2393567]
>>> [1998-02,Direct Sales,9161840]
>>> [1998-02,Internet,1533193]
>>> rs1: Unit = ()
>>>
>>> second query
>>> [Direct Sales,9161840]
>>> [Internet,3977374]
>>> [Partners,3976291]
>>> [Tele Sales,328760]
>>> rs2: Unit = ()
>>>
>>> Finished at
>>> [23/02/2016 23:53:42.42]
>>>
>>>
>>>
>>> On 22/02/2016 23:16, Mich Talebzadeh wrote:
>>>
>>> Hi,
>>>
>>> I have data stored in Hive tables that I want to do simple manipulation.
>>>
>>> Currently in Spark I perform the following with getting the result set
>>> using SQL from Hive tables, registering as a temporary table in Spark
>>>
>>> Now Ideally I can get the result set into a DF and work on DF to slice
>>> and dice the data using functional programming with filter, map. split etc.
>>>
>>> I wanted to get some ideas on how to go about it.
>>>
>>> thanks
>>>
>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>
>>> HiveContext.sql("use oraclehadoop")
>>> val rs = HiveContext.sql("""SELECT t.calendar_month_desc,
>>> c.channel_desc, SUM(s.amount_sold) AS TotalSales
>>> FROM smallsales s, times t, channels c
>>> WHERE s.time_id = t.time_id
>>> AND   s.channel_id = c.channel_id
>>> GROUP BY t.calendar_month_desc, c.channel_desc
>>> """)
>>> *rs.registerTempTable("tmp")*
>>>
>>>
>>> HiveContext.sql("""
>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
>>> from tmp
>>> ORDER BY MONTH, CHANNEL
>>> """).collect.foreach(println)
>>> HiveContext.sql("""
>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>>> FROM tmp
>>> GROUP BY channel_desc
>>> order by SALES DESC
>>> """).collect.foreach(println)
>>>
>>>
>>> --
>>>
>>> Dr Mich Talebzadeh
>>>
>>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> http://talebzadehmich.wordpress.com
>>>
>>> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.
>>>
>>>
>>>
>>>
>>>
>>> --
>>>
>>> Dr Mich Talebzadeh
>>>
>>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> http://talebzadehmich.wordpress.com
>>>
>>> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.
>>>
>>>
>>>
>

Re: Using functional programming rather than SQL

Posted by Koert Kuipers <ko...@tresata.com>.
are you saying that HiveContext.sql(...) runs on hive, and not on spark sql?

On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <
sabarish.sasidharan@manthan.com> wrote:

> When using SQL your full query, including the joins, were executed in
> Hive(or RDBMS) and only the results were brought into the Spark cluster. In
> the FP case, the data for the 3 tables is first pulled into the Spark
> cluster and then the join is executed.
>
> Thus the time difference.
>
> It's not immediately obvious why the results are different.
>
> Regards
> Sab
> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
> mich.talebzadeh@cloudtechnologypartners.co.uk> wrote:
>
>>
>>
>> Hi,
>>
>> First thanks everyone for their suggestions. Much appreciated.
>>
>> This was the original queries written in SQL and run against Spark-shell
>>
>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> println ("\nStarted at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>> ").collect.foreach(println)
>> HiveContext.sql("use oraclehadoop")
>>
>> val rs = HiveContext.sql(
>> """
>> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
>> TotalSales
>> FROM smallsales s
>> INNER JOIN times t
>> ON s.time_id = t.time_id
>> INNER JOIN channels c
>> ON s.channel_id = c.channel_id
>> GROUP BY t.calendar_month_desc, c.channel_desc
>> """)
>> rs.registerTempTable("tmp")
>> println ("\nfirst query")
>> HiveContext.sql("""
>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
>> from tmp
>> ORDER BY MONTH, CHANNEL LIMIT 5
>> """).collect.foreach(println)
>> println ("\nsecond query")
>> HiveContext.sql("""
>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>> FROM tmp
>> GROUP BY channel_desc
>> order by SALES DESC LIMIT 5
>> """).collect.foreach(println)
>> println ("\nFinished at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>> ").collect.foreach(println)
>> sys.exit
>>
>> The second queries were written in FP as much as I could as below
>>
>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> println ("\nStarted at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>> ").collect.foreach(println)
>> HiveContext.sql("use oraclehadoop")
>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
>> sales")
>> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
>> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM times")
>> val rs =
>> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>> println ("\nfirst query")
>> val rs1 =
>> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
>> println ("\nsecond query")
>> val rs2
>> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
>> println ("\nFinished at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>> ").collect.foreach(println)
>> sys.exit
>>
>>
>>
>> However The first query results are slightly different in SQL and FP (may
>> be the first query code in FP is not exactly correct?) and more importantly
>> the FP takes order of magnitude longer compared to SQL (8 minutes compared
>> to less than a minute). I am not surprised as I expected Functional
>> Programming has to flatten up all those method calls and convert them to
>> SQL?
>>
>> *The standard SQL results*
>>
>>
>>
>> Started at
>> [23/02/2016 23:55:30.30]
>> res1: org.apache.spark.sql.DataFrame = [result: string]
>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>> channel_desc: string, TotalSales: decimal(20,0)]
>>
>> first query
>> [1998-01,Direct Sales,9161730]
>> [1998-01,Internet,1248581]
>> [1998-01,Partners,2409776]
>> [1998-02,Direct Sales,9161840]
>> [1998-02,Internet,1533193]
>>
>>
>>
>> second query
>> [Direct Sales,9161840]
>> [Internet,3977374]
>> [Partners,3976291]
>> [Tele Sales,328760]
>>
>> Finished at
>> [23/02/2016 23:56:11.11]
>>
>> *The FP results*
>>
>> Started at
>> [23/02/2016 23:45:58.58]
>> res1: org.apache.spark.sql.DataFrame = [result: string]
>> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID:
>> timestamp, CHANNEL_ID: bigint]
>> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
>> string]
>> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
>> CALENDAR_MONTH_DESC: string]
>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>> channel_desc: string, TotalSales: decimal(20,0)]
>>
>> first query
>> [1998-01,Direct Sales,9086830]
>> [1998-01,Internet,1247641]
>> [1998-01,Partners,2393567]
>> [1998-02,Direct Sales,9161840]
>> [1998-02,Internet,1533193]
>> rs1: Unit = ()
>>
>> second query
>> [Direct Sales,9161840]
>> [Internet,3977374]
>> [Partners,3976291]
>> [Tele Sales,328760]
>> rs2: Unit = ()
>>
>> Finished at
>> [23/02/2016 23:53:42.42]
>>
>>
>>
>> On 22/02/2016 23:16, Mich Talebzadeh wrote:
>>
>> Hi,
>>
>> I have data stored in Hive tables that I want to do simple manipulation.
>>
>> Currently in Spark I perform the following with getting the result set
>> using SQL from Hive tables, registering as a temporary table in Spark
>>
>> Now Ideally I can get the result set into a DF and work on DF to slice
>> and dice the data using functional programming with filter, map. split etc.
>>
>> I wanted to get some ideas on how to go about it.
>>
>> thanks
>>
>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>
>> HiveContext.sql("use oraclehadoop")
>> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc,
>> SUM(s.amount_sold) AS TotalSales
>> FROM smallsales s, times t, channels c
>> WHERE s.time_id = t.time_id
>> AND   s.channel_id = c.channel_id
>> GROUP BY t.calendar_month_desc, c.channel_desc
>> """)
>> *rs.registerTempTable("tmp")*
>>
>>
>> HiveContext.sql("""
>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
>> from tmp
>> ORDER BY MONTH, CHANNEL
>> """).collect.foreach(println)
>> HiveContext.sql("""
>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>> FROM tmp
>> GROUP BY channel_desc
>> order by SALES DESC
>> """).collect.foreach(println)
>>
>>
>> --
>>
>> Dr Mich Talebzadeh
>>
>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> http://talebzadehmich.wordpress.com
>>
>> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.
>>
>>
>>
>>
>>
>> --
>>
>> Dr Mich Talebzadeh
>>
>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> http://talebzadehmich.wordpress.com
>>
>> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.
>>
>>
>>

Re: Using functional programming rather than SQL

Posted by Sabarish Sasidharan <sa...@manthan.com>.
When using SQL your full query, including the joins, were executed in
Hive(or RDBMS) and only the results were brought into the Spark cluster. In
the FP case, the data for the 3 tables is first pulled into the Spark
cluster and then the join is executed.

Thus the time difference.

It's not immediately obvious why the results are different.

Regards
Sab
On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
mich.talebzadeh@cloudtechnologypartners.co.uk> wrote:

>
>
> Hi,
>
> First thanks everyone for their suggestions. Much appreciated.
>
> This was the original queries written in SQL and run against Spark-shell
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
>
> val rs = HiveContext.sql(
> """
> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
> TotalSales
> FROM smallsales s
> INNER JOIN times t
> ON s.time_id = t.time_id
> INNER JOIN channels c
> ON s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> rs.registerTempTable("tmp")
> println ("\nfirst query")
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL LIMIT 5
> """).collect.foreach(println)
> println ("\nsecond query")
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC LIMIT 5
> """).collect.foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
> The second queries were written in FP as much as I could as below
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
> sales")
> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM times")
> val rs =
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("\nfirst query")
> val rs1 =
> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("\nsecond query")
> val rs2
> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
>
>
> However The first query results are slightly different in SQL and FP (may
> be the first query code in FP is not exactly correct?) and more importantly
> the FP takes order of magnitude longer compared to SQL (8 minutes compared
> to less than a minute). I am not surprised as I expected Functional
> Programming has to flatten up all those method calls and convert them to
> SQL?
>
> *The standard SQL results*
>
>
>
> Started at
> [23/02/2016 23:55:30.30]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)]
>
> first query
> [1998-01,Direct Sales,9161730]
> [1998-01,Internet,1248581]
> [1998-01,Partners,2409776]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
>
>
>
> second query
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
>
> Finished at
> [23/02/2016 23:56:11.11]
>
> *The FP results*
>
> Started at
> [23/02/2016 23:45:58.58]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID:
> timestamp, CHANNEL_ID: bigint]
> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
> string]
> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
> CALENDAR_MONTH_DESC: string]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)]
>
> first query
> [1998-01,Direct Sales,9086830]
> [1998-01,Internet,1247641]
> [1998-01,Partners,2393567]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
> rs1: Unit = ()
>
> second query
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
> rs2: Unit = ()
>
> Finished at
> [23/02/2016 23:53:42.42]
>
>
>
> On 22/02/2016 23:16, Mich Talebzadeh wrote:
>
> Hi,
>
> I have data stored in Hive tables that I want to do simple manipulation.
>
> Currently in Spark I perform the following with getting the result set
> using SQL from Hive tables, registering as a temporary table in Spark
>
> Now Ideally I can get the result set into a DF and work on DF to slice and
> dice the data using functional programming with filter, map. split etc.
>
> I wanted to get some ideas on how to go about it.
>
> thanks
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>
> HiveContext.sql("use oraclehadoop")
> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc,
> SUM(s.amount_sold) AS TotalSales
> FROM smallsales s, times t, channels c
> WHERE s.time_id = t.time_id
> AND   s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> *rs.registerTempTable("tmp")*
>
>
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL
> """).collect.foreach(println)
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC
> """).collect.foreach(println)
>
>
> --
>
> Dr Mich Talebzadeh
>
> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> http://talebzadehmich.wordpress.com
>
> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.
>
>
>
>
>
> --
>
> Dr Mich Talebzadeh
>
> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> http://talebzadehmich.wordpress.com
>
> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.
>
>
>

Re: Using functional programming rather than SQL

Posted by Koert Kuipers <ko...@tresata.com>.
​instead of:
var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
sales")
you should be able to do something like:
val s = HiveContext.table("sales").select("AMOUNT_SOLD", "TIME_ID",
"CHANNEL_ID")

its not obvious to me why the dataframe (aka FP) version would be
significantly slower, they should translate into similar/same execution
plans. the explain method on DataFrame should show you the plans. ​



On Tue, Feb 23, 2016 at 7:09 PM, Mich Talebzadeh <
mich.talebzadeh@cloudtechnologypartners.co.uk> wrote:

>
>
> Hi,
>
> First thanks everyone for their suggestions. Much appreciated.
>
> This was the original queries written in SQL and run against Spark-shell
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
>
> val rs = HiveContext.sql(
> """
> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
> TotalSales
> FROM smallsales s
> INNER JOIN times t
> ON s.time_id = t.time_id
> INNER JOIN channels c
> ON s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> rs.registerTempTable("tmp")
> println ("\nfirst query")
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL LIMIT 5
> """).collect.foreach(println)
> println ("\nsecond query")
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC LIMIT 5
> """).collect.foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
> The second queries were written in FP as much as I could as below
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
> sales")
> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM times")
> val rs =
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("\nfirst query")
> val rs1 =
> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("\nsecond query")
> val rs2
> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
>
>
> However The first query results are slightly different in SQL and FP (may
> be the first query code in FP is not exactly correct?) and more importantly
> the FP takes order of magnitude longer compared to SQL (8 minutes compared
> to less than a minute). I am not surprised as I expected Functional
> Programming has to flatten up all those method calls and convert them to
> SQL?
>
> *The standard SQL results*
>
>
>
> Started at
> [23/02/2016 23:55:30.30]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)]
>
> first query
> [1998-01,Direct Sales,9161730]
> [1998-01,Internet,1248581]
> [1998-01,Partners,2409776]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
>
>
>
> second query
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
>
> Finished at
> [23/02/2016 23:56:11.11]
>
> *The FP results*
>
> Started at
> [23/02/2016 23:45:58.58]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID:
> timestamp, CHANNEL_ID: bigint]
> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
> string]
> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
> CALENDAR_MONTH_DESC: string]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)]
>
> first query
> [1998-01,Direct Sales,9086830]
> [1998-01,Internet,1247641]
> [1998-01,Partners,2393567]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
> rs1: Unit = ()
>
> second query
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
> rs2: Unit = ()
>
> Finished at
> [23/02/2016 23:53:42.42]
>
>
>
> On 22/02/2016 23:16, Mich Talebzadeh wrote:
>
> Hi,
>
> I have data stored in Hive tables that I want to do simple manipulation.
>
> Currently in Spark I perform the following with getting the result set
> using SQL from Hive tables, registering as a temporary table in Spark
>
> Now Ideally I can get the result set into a DF and work on DF to slice and
> dice the data using functional programming with filter, map. split etc.
>
> I wanted to get some ideas on how to go about it.
>
> thanks
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>
> HiveContext.sql("use oraclehadoop")
> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc,
> SUM(s.amount_sold) AS TotalSales
> FROM smallsales s, times t, channels c
> WHERE s.time_id = t.time_id
> AND   s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> *rs.registerTempTable("tmp")*
>
>
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL
> """).collect.foreach(println)
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC
> """).collect.foreach(println)
>
>
> --
>
> Dr Mich Talebzadeh
>
> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> http://talebzadehmich.wordpress.com
>
> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.
>
>
>
>
>
> --
>
> Dr Mich Talebzadeh
>
> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> http://talebzadehmich.wordpress.com
>
> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.
>
>
>

Re: Using functional programming rather than SQL

Posted by Mich Talebzadeh <mi...@cloudtechnologypartners.co.uk>.
 

Hi, 

First thanks everyone for their suggestions. Much appreciated. 

This was the original queries written in SQL and run against Spark-shell


val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("nStarted at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("use oraclehadoop") 

val rs = HiveContext.sql(
"""
SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
TotalSales
FROM smallsales s
INNER JOIN times t
ON s.time_id = t.time_id
INNER JOIN channels c
ON s.channel_id = c.channel_id
GROUP BY t.calendar_month_desc, c.channel_desc
""")
rs.registerTempTable("tmp")
println ("nfirst query")
HiveContext.sql("""
SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
from tmp
ORDER BY MONTH, CHANNEL LIMIT 5
""").collect.foreach(println)
println ("nsecond query")
HiveContext.sql("""
SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES
FROM tmp
GROUP BY channel_desc
order by SALES DESC LIMIT 5
""").collect.foreach(println)
println ("nFinished at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
sys.exit 

The second queries were written in FP as much as I could as below 

val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("nStarted at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("use oraclehadoop")
var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
sales")
val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM
times")
val rs =
s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
println ("nfirst query")
val rs1 =
rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
println ("nsecond query")
val rs2
=rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
println ("nFinished at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
sys.exit 

However The first query results are slightly different in SQL and FP
(may be the first query code in FP is not exactly correct?) and more
importantly the FP takes order of magnitude longer compared to SQL (8
minutes compared to less than a minute). I am not surprised as I
expected Functional Programming has to flatten up all those method calls
and convert them to SQL? 

THE STANDARD SQL RESULTS 

Started at
[23/02/2016 23:55:30.30]
res1: org.apache.spark.sql.DataFrame = [result: string]
rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
channel_desc: string, TotalSales: decimal(20,0)] 

first query
[1998-01,Direct Sales,9161730]
[1998-01,Internet,1248581]
[1998-01,Partners,2409776]
[1998-02,Direct Sales,9161840]
[1998-02,Internet,1533193] 

second query
[Direct Sales,9161840]
[Internet,3977374]
[Partners,3976291]
[Tele Sales,328760] 

Finished at
[23/02/2016 23:56:11.11] 

THE FP RESULTS 

Started at
[23/02/2016 23:45:58.58]
res1: org.apache.spark.sql.DataFrame = [result: string]
s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0),
TIME_ID: timestamp, CHANNEL_ID: bigint]
c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
string]
t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
CALENDAR_MONTH_DESC: string]
rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
channel_desc: string, TotalSales: decimal(20,0)] 

first query
[1998-01,Direct Sales,9086830]
[1998-01,Internet,1247641]
[1998-01,Partners,2393567]
[1998-02,Direct Sales,9161840]
[1998-02,Internet,1533193]
rs1: Unit = () 

second query
[Direct Sales,9161840]
[Internet,3977374]
[Partners,3976291]
[Tele Sales,328760]
rs2: Unit = () 

Finished at
[23/02/2016 23:53:42.42] 

On 22/02/2016 23:16, Mich Talebzadeh wrote: 

> Hi, 
> 
> I have data stored in Hive tables that I want to do simple manipulation. 
> 
> Currently in Spark I perform the following with getting the result set using SQL from Hive tables, registering as a temporary table in Spark 
> 
> Now Ideally I can get the result set into a DF and work on DF to slice and dice the data using functional programming with filter, map. split etc. 
> 
> I wanted to get some ideas on how to go about it. 
> 
> thanks 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 
> 
> HiveContext.sql("use oraclehadoop")
> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS TotalSales
> FROM smallsales s, times t, channels c
> WHERE s.time_id = t.time_id
> AND s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> RS.REGISTERTEMPTABLE("TMP") 
> 
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL
> """).collect.foreach(println)
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC
> """).collect.foreach(println) 
> 
> -- 
> 
> Dr Mich Talebzadeh
> 
> LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> 
> http://talebzadehmich.wordpress.com
> 
> NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.

-- 

Dr Mich Talebzadeh

LinkedIn
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

http://talebzadehmich.wordpress.com

NOTE: The information in this email is proprietary and confidential.
This message is for the designated recipient only, if you are not the
intended recipient, you should destroy it immediately. Any information
in this message shall not be understood as given or endorsed by Cloud
Technology Partners Ltd, its subsidiaries or their employees, unless
expressly so stated. It is the responsibility of the recipient to ensure
that this email is virus free, therefore neither Cloud Technology
partners Ltd, its subsidiaries nor their employees accept any
responsibility.