You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mich Talebzadeh <mi...@cloudtechnologypartners.co.uk> on 2016/02/24 17:20:22 UTC

Re: Using Spark functional programming rather than SQL, Spark on Hive tables

 

HI, 

TOOLS 

SPARK 1.5.2, HADOOP 2.6, HIVE 2.0, SPARK-SHELL, HIVE DATABASE 

OBJECTIVES: TIMING DIFFERENCES BETWEEN RUNNING SPARK USING SQL AND
RUNNING SPARK USING FUNCTIONAL PROGRAMING (FP) (FUNCTIONAL CALLS) ON
HIVE TABLES 

UNDERLYING TABLES: THREE TABLES IN HIVE DATABASE USING ORC FORMAT 

The main differences in timings come from running the queries and
fetching data. If you look the transformation part that is 

val rs =
s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))


Takes I second. On the other hand using SQL the query 1 takes 19 seconds
compared to just under 4 minutes for functional programming 

The seconds query using SQL takes 28 seconds. Using FP it takes around 4
minutes. 

These are my assumptions. 

 	* Running SQL the full query is executed in Hive which means that Hive
can take advantage of ORC optimization/storage index etc?
 	* Running FP requires that data is fetched from the underlying tables
in Hive and brought back to Spark cluster (standalone here) and the
joins etc are done there

The next step for me would be to: 

 	* Look at the query plans in Spark
 	* Run the same code on Hive alone and compare results

Any other suggestions are welcome. 

STANDARD SQL CODE 

val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("nStarted at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("use oraclehadoop")
println ("ncreating data set at "); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
val rs = HiveContext.sql(
"""
SELECT t.calendar_month_desc
 , c.channel_desc
 , SUM(s.amount_sold) AS TotalSales
FROM smallsales s
INNER JOIN times t
ON s.time_id = t.time_id
INNER JOIN channels c
ON s.channel_id = c.channel_id
GROUP BY t.calendar_month_desc, c.channel_desc
""")
rs.registerTempTable("tmp")
println ("nfirst query at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("""
SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
from tmp
ORDER BY MONTH, CHANNEL LIMIT 5
""").collect.foreach(println)
println ("nsecond query at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("""
SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES
FROM tmp
GROUP BY channel_desc
order by SALES DESC LIMIT 5
""").collect.foreach(println)
println ("nFinished at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
sys.exit 

RESULTS 

Started at [24/02/2016 09:00:50.50]
res1: org.apache.spark.sql.DataFrame = [result: string] 

creating data set at [24/02/2016 09:00:53.53]
rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
channel_desc: string, TotalSales: decimal(20,0) 

First query at [24/02/2016 09:00:54.54]
[1998-01,Direct Sales,9161730]
[1998-01,Internet,1248581]
[1998-01,Partners,2409776]
[1998-02,Direct Sales,9161840]
[1998-02,Internet,1533193] 

second query at [24/02/2016 09:01:13.13]
[Direct Sales,9161840]
[Internet,3977374]
[Partners,3976291]
[Tele Sales,328760] 

Finished at [24/02/2016 09:01:31.31 

CODE USING FUNCTIONAL PROGRAMMING 

val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("nStarted at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("use oraclehadoop")
var s =
HiveContext.table("sales").select("AMOUNT_SOLD","TIME_ID","CHANNEL_ID")
val c =
HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
val t =
HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")
println ("ncreating data set at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
val rs =
s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
println ("nfirst query at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
val rs1 =
rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
println ("nsecond query at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
val rs2
=rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
println ("nFinished at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
sys.exit 

RESULTS 

Started at [24/02/2016 08:52:27.27]
res1: org.apache.spark.sql.DataFrame = [result: string]
s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0),
TIME_ID: timestamp, CHANNEL_ID: bigint]
c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
string]
t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
CALENDAR_MONTH_DESC: string] 

creating data set at [24/02/2016 08:52:30.30]
rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
channel_desc: string, TotalSales: decimal(20,0)] 

first query at [24/02/2016 08:52:31.31]
[1998-01,Direct Sales,9086830]
[1998-01,Internet,1247641]
[1998-01,Partners,2393567]
[1998-02,Direct Sales,9161840]
[1998-02,Internet,1533193]
rs1: Unit = () 

second query at [24/02/2016 08:56:17.17]
[Direct Sales,9161840]
[Internet,3977374]
[Partners,3976291]
[Tele Sales,328760]
rs2: Unit = () 

Finished at
[24/02/2016 09:00:14.14] 

On 24/02/2016 06:27, Sabarish Sasidharan wrote: 

> When using SQL your full query, including the joins, were executed in Hive(or RDBMS) and only the results were brought into the Spark cluster. In the FP case, the data for the 3 tables is first pulled into the Spark cluster and then the join is executed. 
> 
> Thus the time difference. 
> 
> It's not immediately obvious why the results are different. 
> 
> Regards
> Sab
 

Re: Using Spark functional programming rather than SQL, Spark on Hive tables

Posted by Sabarish Sasidharan <sa...@manthan.com>.
Spark has its own efficient in memory columnar format. So it's not ORC.
It's just that the data has to be serialized and deserialized over the
network. And that is consuming time.

Regards
Sab
On 24-Feb-2016 9:50 pm, "Mich Talebzadeh" <
mich.talebzadeh@cloudtechnologypartners.co.uk> wrote:

>
>
> *Hi,*
>
> *Tools*
>
> *Spark 1.5.2, Hadoop 2.6, Hive 2.0, Spark-Shell, Hive Database*
>
> *Objectives: Timing differences between running Spark using SQL and
> running Spark using functional programing (FP) (functional calls) on Hive
> tables*
>
> *Underlying tables: Three tables in Hive database using ORC format*
>
> The main differences in timings come from running the queries and fetching
> data. If you look the transformation part that is
>
> val rs =
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>
> Takes I second. On the other hand using SQL the query 1 takes 19 seconds
> compared to just under 4 minutes for functional programming
>
> The seconds query using SQL takes 28 seconds. Using FP it takes around 4
> minutes.
>
> These are my assumptions.
>
>    1. Running SQL the full query is executed in Hive which means that
>    Hive can take advantage of ORC optimization/storage index etc?
>    2. Running FP requires that data is fetched from the underlying tables
>    in Hive and brought back to Spark cluster (standalone here) and the joins
>    etc are done there
>
> The next step for me would be to:
>
>    1. Look at the query plans in Spark
>    2. Run the same code on Hive alone and compare results
>
>
>
> Any other suggestions are welcome.
>
> *Standard SQL code*
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> println ("\ncreating data set at "); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs = HiveContext.sql(
> """
> SELECT    t.calendar_month_desc
>         , c.channel_desc
>         , SUM(s.amount_sold) AS TotalSales
> FROM smallsales s
> INNER JOIN times t
> ON s.time_id = t.time_id
> INNER JOIN channels c
> ON s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> rs.registerTempTable("tmp")
> println ("\nfirst query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL LIMIT 5
> """).collect.foreach(println)
> println ("\nsecond query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC LIMIT 5
> """).collect.foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
> *Results*
>
> Started at [24/02/2016 09:00:50.50]
> res1: org.apache.spark.sql.DataFrame = [result: string]
>
> creating data set at [24/02/2016 09:00:53.53]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)
>
> First query at [24/02/2016 09:00:54.54]
> [1998-01,Direct Sales,9161730]
> [1998-01,Internet,1248581]
> [1998-01,Partners,2409776]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
>
> second query at [24/02/2016 09:01:13.13]
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
>
> Finished at [24/02/2016 09:01:31.31
>
> *Code using functional programming*
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> var s =
> HiveContext.table("sales").select("AMOUNT_SOLD","TIME_ID","CHANNEL_ID")
> val c = HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
> val t = HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")
> println ("\ncreating data set at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs =
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("\nfirst query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs1 =
> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("\nsecond query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs2
> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
> *Results*
>
> Started at [24/02/2016 08:52:27.27]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID:
> timestamp, CHANNEL_ID: bigint]
> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
> string]
> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
> CALENDAR_MONTH_DESC: string]
>
> creating data set at [24/02/2016 08:52:30.30]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)]
>
> first query at [24/02/2016 08:52:31.31]
> [1998-01,Direct Sales,9086830]
> [1998-01,Internet,1247641]
> [1998-01,Partners,2393567]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
> rs1: Unit = ()
>
> second query at [24/02/2016 08:56:17.17]
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
> rs2: Unit = ()
>
> Finished at
> [24/02/2016 09:00:14.14]
>
>
>
> On 24/02/2016 06:27, Sabarish Sasidharan wrote:
>
> When using SQL your full query, including the joins, were executed in
> Hive(or RDBMS) and only the results were brought into the Spark cluster. In
> the FP case, the data for the 3 tables is first pulled into the Spark
> cluster and then the join is executed.
>
> Thus the time difference.
>
> It's not immediately obvious why the results are different.
>
> Regards
> Sab
>
>
>

Re: Using Spark functional programming rather than SQL, Spark on Hive tables

Posted by Mich Talebzadeh <mi...@cloudtechnologypartners.co.uk>.
 

Well spotted Sab. You are correct. An oversight by me. They should both
use "sales". 

The results are now comparable 

The following statement 

"On the other hand using SQL the query 1 takes 19 seconds compared to
just under 4 minutes for functional programming 

The seconds query using SQL takes 28 seconds. Using FP it takes around 4
minutes." 

Should be amended to 

"Using SQL query 1 takes 3 min, 39 sec compared to 3 min, 44 sec using
FP 

Using SQL query 2 takes 3 min, 36 sec compared to 3 min, 53 sec using
FP" 

FP lags slightly behind SQL but not by any significant margin. 

Thanks 

On 24/02/2016 18:20, Sabarish Sasidharan wrote: 

> One more, you are referring to 2 different sales tables. That might account for the difference in numbers. 
> 
> Regards
> Sab 
> On 24-Feb-2016 9:50 pm, "Mich Talebzadeh" <mi...@cloudtechnologypartners.co.uk> wrote:
> 
> HI, 
> 
> TOOLS 
> 
> SPARK 1.5.2, HADOOP 2.6, HIVE 2.0, SPARK-SHELL, HIVE DATABASE 
> 
> OBJECTIVES: TIMING DIFFERENCES BETWEEN RUNNING SPARK USING SQL AND RUNNING SPARK USING FUNCTIONAL PROGRAMING (FP) (FUNCTIONAL CALLS) ON HIVE TABLES 
> 
> UNDERLYING TABLES: THREE TABLES IN HIVE DATABASE USING ORC FORMAT 
> 
> The main differences in timings come from running the queries and fetching data. If you look the transformation part that is 
> 
> val rs = s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales")) 
> 
> Takes I second. On the other hand using SQL the query 1 takes 19 seconds compared to just under 4 minutes for functional programming 
> 
> The seconds query using SQL takes 28 seconds. Using FP it takes around 4 minutes. 
> 
> These are my assumptions. 
> 
> * Running SQL the full query is executed in Hive which means that Hive can take advantage of ORC optimization/storage index etc?
> * Running FP requires that data is fetched from the underlying tables in Hive and brought back to Spark cluster (standalone here) and the joins etc are done there
> 
> The next step for me would be to: 
> 
> * Look at the query plans in Spark
> * Run the same code on Hive alone and compare results
> 
> Any other suggestions are welcome. 
> 
> STANDARD SQL CODE 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("nStarted at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> println ("ncreating data set at "); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> val rs = HiveContext.sql(
> """
> SELECT t.calendar_month_desc
> , c.channel_desc
> , SUM(s.amount_sold) AS TotalSales
> FROM smallsales s
> INNER JOIN times t
> ON s.time_id = t.time_id
> INNER JOIN channels c
> ON s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> rs.registerTempTable("tmp")
> println ("nfirst query at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL LIMIT 5
> """).collect.foreach(println)
> println ("nsecond query at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC LIMIT 5
> """).collect.foreach(println)
> println ("nFinished at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> sys.exit 
> 
> RESULTS 
> 
> Started at [24/02/2016 09:00:50.50]
> res1: org.apache.spark.sql.DataFrame = [result: string] 
> 
> creating data set at [24/02/2016 09:00:53.53]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, channel_desc: string, TotalSales: decimal(20,0) 
> 
> First query at [24/02/2016 09:00:54.54]
> [1998-01,Direct Sales,9161730]
> [1998-01,Internet,1248581]
> [1998-01,Partners,2409776]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193] 
> 
> second query at [24/02/2016 09:01:13.13]
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760] 
> 
> Finished at [24/02/2016 09:01:31.31 
> 
> CODE USING FUNCTIONAL PROGRAMMING 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("nStarted at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> var s = HiveContext.table("sales").select("AMOUNT_SOLD","TIME_ID","CHANNEL_ID")
> val c = HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
> val t = HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")
> println ("ncreating data set at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> val rs = s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("nfirst query at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> val rs1 = rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("nsecond query at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> val rs2 =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
> println ("nFinished at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> sys.exit 
> 
> RESULTS 
> 
> Started at [24/02/2016 08:52:27.27]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID: timestamp, CHANNEL_ID: bigint]
> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC: string]
> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp, CALENDAR_MONTH_DESC: string] 
> 
> creating data set at [24/02/2016 08:52:30.30]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, channel_desc: string, TotalSales: decimal(20,0)] 
> 
> first query at [24/02/2016 08:52:31.31]
> [1998-01,Direct Sales,9086830]
> [1998-01,Internet,1247641]
> [1998-01,Partners,2393567]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
> rs1: Unit = () 
> 
> second query at [24/02/2016 08:56:17.17]
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
> rs2: Unit = () 
> 
> Finished at
> [24/02/2016 09:00:14.14] 
> 
> On 24/02/2016 06:27, Sabarish Sasidharan wrote: 
> 
> When using SQL your full query, including the joins, were executed in Hive(or RDBMS) and only the results were brought into the Spark cluster. In the FP case, the data for the 3 tables is first pulled into the Spark cluster and then the join is executed. 
> 
> Thus the time difference. 
> 
> It's not immediately obvious why the results are different. 
> 
> Regards
> Sab

-- 

Dr Mich Talebzadeh

LinkedIn
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

http://talebzadehmich.wordpress.com

NOTE: The information in this email is proprietary and confidential.
This message is for the designated recipient only, if you are not the
intended recipient, you should destroy it immediately. Any information
in this message shall not be understood as given or endorsed by Cloud
Technology Partners Ltd, its subsidiaries or their employees, unless
expressly so stated. It is the responsibility of the recipient to ensure
that this email is virus free, therefore neither Cloud Technology
partners Ltd, its subsidiaries nor their employees accept any
responsibility.

 

Re: Using Spark functional programming rather than SQL, Spark on Hive tables

Posted by Mich Talebzadeh <mi...@cloudtechnologypartners.co.uk>.
 

Well spotted Sab. You are correct. An oversight by me. They should both
use "sales". 

The results are now comparable 

The following statement 

"On the other hand using SQL the query 1 takes 19 seconds compared to
just under 4 minutes for functional programming 

The seconds query using SQL takes 28 seconds. Using FP it takes around 4
minutes." 

Should be amended to 

"Using SQL query 1 takes 3 min, 39 sec compared to 3 min, 44 sec using
FP 

Using SQL query 2 takes 3 min, 36 sec compared to 3 min, 53 sec using
FP" 

FP lags slightly behind SQL but not by any significant margin. 

Thanks 

On 24/02/2016 18:20, Sabarish Sasidharan wrote: 

> One more, you are referring to 2 different sales tables. That might account for the difference in numbers. 
> 
> Regards
> Sab 
> On 24-Feb-2016 9:50 pm, "Mich Talebzadeh" <mi...@cloudtechnologypartners.co.uk> wrote:
> 
> HI, 
> 
> TOOLS 
> 
> SPARK 1.5.2, HADOOP 2.6, HIVE 2.0, SPARK-SHELL, HIVE DATABASE 
> 
> OBJECTIVES: TIMING DIFFERENCES BETWEEN RUNNING SPARK USING SQL AND RUNNING SPARK USING FUNCTIONAL PROGRAMING (FP) (FUNCTIONAL CALLS) ON HIVE TABLES 
> 
> UNDERLYING TABLES: THREE TABLES IN HIVE DATABASE USING ORC FORMAT 
> 
> The main differences in timings come from running the queries and fetching data. If you look the transformation part that is 
> 
> val rs = s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales")) 
> 
> Takes I second. On the other hand using SQL the query 1 takes 19 seconds compared to just under 4 minutes for functional programming 
> 
> The seconds query using SQL takes 28 seconds. Using FP it takes around 4 minutes. 
> 
> These are my assumptions. 
> 
> * Running SQL the full query is executed in Hive which means that Hive can take advantage of ORC optimization/storage index etc?
> * Running FP requires that data is fetched from the underlying tables in Hive and brought back to Spark cluster (standalone here) and the joins etc are done there
> 
> The next step for me would be to: 
> 
> * Look at the query plans in Spark
> * Run the same code on Hive alone and compare results
> 
> Any other suggestions are welcome. 
> 
> STANDARD SQL CODE 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("nStarted at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> println ("ncreating data set at "); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> val rs = HiveContext.sql(
> """
> SELECT t.calendar_month_desc
> , c.channel_desc
> , SUM(s.amount_sold) AS TotalSales
> FROM smallsales s
> INNER JOIN times t
> ON s.time_id = t.time_id
> INNER JOIN channels c
> ON s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> rs.registerTempTable("tmp")
> println ("nfirst query at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL LIMIT 5
> """).collect.foreach(println)
> println ("nsecond query at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC LIMIT 5
> """).collect.foreach(println)
> println ("nFinished at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> sys.exit 
> 
> RESULTS 
> 
> Started at [24/02/2016 09:00:50.50]
> res1: org.apache.spark.sql.DataFrame = [result: string] 
> 
> creating data set at [24/02/2016 09:00:53.53]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, channel_desc: string, TotalSales: decimal(20,0) 
> 
> First query at [24/02/2016 09:00:54.54]
> [1998-01,Direct Sales,9161730]
> [1998-01,Internet,1248581]
> [1998-01,Partners,2409776]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193] 
> 
> second query at [24/02/2016 09:01:13.13]
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760] 
> 
> Finished at [24/02/2016 09:01:31.31 
> 
> CODE USING FUNCTIONAL PROGRAMMING 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("nStarted at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> var s = HiveContext.table("sales").select("AMOUNT_SOLD","TIME_ID","CHANNEL_ID")
> val c = HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
> val t = HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")
> println ("ncreating data set at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> val rs = s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("nfirst query at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> val rs1 = rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("nsecond query at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> val rs2 =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
> println ("nFinished at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)
> sys.exit 
> 
> RESULTS 
> 
> Started at [24/02/2016 08:52:27.27]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID: timestamp, CHANNEL_ID: bigint]
> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC: string]
> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp, CALENDAR_MONTH_DESC: string] 
> 
> creating data set at [24/02/2016 08:52:30.30]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, channel_desc: string, TotalSales: decimal(20,0)] 
> 
> first query at [24/02/2016 08:52:31.31]
> [1998-01,Direct Sales,9086830]
> [1998-01,Internet,1247641]
> [1998-01,Partners,2393567]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
> rs1: Unit = () 
> 
> second query at [24/02/2016 08:56:17.17]
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
> rs2: Unit = () 
> 
> Finished at
> [24/02/2016 09:00:14.14] 
> 
> On 24/02/2016 06:27, Sabarish Sasidharan wrote: 
> 
> When using SQL your full query, including the joins, were executed in Hive(or RDBMS) and only the results were brought into the Spark cluster. In the FP case, the data for the 3 tables is first pulled into the Spark cluster and then the join is executed. 
> 
> Thus the time difference. 
> 
> It's not immediately obvious why the results are different. 
> 
> Regards
> Sab

-- 

Dr Mich Talebzadeh

LinkedIn
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

http://talebzadehmich.wordpress.com

NOTE: The information in this email is proprietary and confidential.
This message is for the designated recipient only, if you are not the
intended recipient, you should destroy it immediately. Any information
in this message shall not be understood as given or endorsed by Cloud
Technology Partners Ltd, its subsidiaries or their employees, unless
expressly so stated. It is the responsibility of the recipient to ensure
that this email is virus free, therefore neither Cloud Technology
partners Ltd, its subsidiaries nor their employees accept any
responsibility.

 

Re: Using Spark functional programming rather than SQL, Spark on Hive tables

Posted by Sabarish Sasidharan <sa...@manthan.com>.
One more, you are referring to 2 different sales tables. That might account
for the difference in numbers.

Regards
Sab
On 24-Feb-2016 9:50 pm, "Mich Talebzadeh" <
mich.talebzadeh@cloudtechnologypartners.co.uk> wrote:

>
>
> *Hi,*
>
> *Tools*
>
> *Spark 1.5.2, Hadoop 2.6, Hive 2.0, Spark-Shell, Hive Database*
>
> *Objectives: Timing differences between running Spark using SQL and
> running Spark using functional programing (FP) (functional calls) on Hive
> tables*
>
> *Underlying tables: Three tables in Hive database using ORC format*
>
> The main differences in timings come from running the queries and fetching
> data. If you look the transformation part that is
>
> val rs =
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>
> Takes I second. On the other hand using SQL the query 1 takes 19 seconds
> compared to just under 4 minutes for functional programming
>
> The seconds query using SQL takes 28 seconds. Using FP it takes around 4
> minutes.
>
> These are my assumptions.
>
>    1. Running SQL the full query is executed in Hive which means that
>    Hive can take advantage of ORC optimization/storage index etc?
>    2. Running FP requires that data is fetched from the underlying tables
>    in Hive and brought back to Spark cluster (standalone here) and the joins
>    etc are done there
>
> The next step for me would be to:
>
>    1. Look at the query plans in Spark
>    2. Run the same code on Hive alone and compare results
>
>
>
> Any other suggestions are welcome.
>
> *Standard SQL code*
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> println ("\ncreating data set at "); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs = HiveContext.sql(
> """
> SELECT    t.calendar_month_desc
>         , c.channel_desc
>         , SUM(s.amount_sold) AS TotalSales
> FROM smallsales s
> INNER JOIN times t
> ON s.time_id = t.time_id
> INNER JOIN channels c
> ON s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> rs.registerTempTable("tmp")
> println ("\nfirst query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL LIMIT 5
> """).collect.foreach(println)
> println ("\nsecond query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC LIMIT 5
> """).collect.foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
> *Results*
>
> Started at [24/02/2016 09:00:50.50]
> res1: org.apache.spark.sql.DataFrame = [result: string]
>
> creating data set at [24/02/2016 09:00:53.53]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)
>
> First query at [24/02/2016 09:00:54.54]
> [1998-01,Direct Sales,9161730]
> [1998-01,Internet,1248581]
> [1998-01,Partners,2409776]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
>
> second query at [24/02/2016 09:01:13.13]
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
>
> Finished at [24/02/2016 09:01:31.31
>
> *Code using functional programming*
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> var s =
> HiveContext.table("sales").select("AMOUNT_SOLD","TIME_ID","CHANNEL_ID")
> val c = HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
> val t = HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")
> println ("\ncreating data set at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs =
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("\nfirst query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs1 =
> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("\nsecond query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs2
> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
> *Results*
>
> Started at [24/02/2016 08:52:27.27]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID:
> timestamp, CHANNEL_ID: bigint]
> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
> string]
> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
> CALENDAR_MONTH_DESC: string]
>
> creating data set at [24/02/2016 08:52:30.30]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)]
>
> first query at [24/02/2016 08:52:31.31]
> [1998-01,Direct Sales,9086830]
> [1998-01,Internet,1247641]
> [1998-01,Partners,2393567]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
> rs1: Unit = ()
>
> second query at [24/02/2016 08:56:17.17]
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
> rs2: Unit = ()
>
> Finished at
> [24/02/2016 09:00:14.14]
>
>
>
> On 24/02/2016 06:27, Sabarish Sasidharan wrote:
>
> When using SQL your full query, including the joins, were executed in
> Hive(or RDBMS) and only the results were brought into the Spark cluster. In
> the FP case, the data for the 3 tables is first pulled into the Spark
> cluster and then the join is executed.
>
> Thus the time difference.
>
> It's not immediately obvious why the results are different.
>
> Regards
> Sab
>
>
>