You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Shailesh Birari <sb...@wynyardgroup.com> on 2014/11/03 05:47:24 UTC

Spark SQL takes unexpected time

Hello,

I have written an Spark SQL application which reads data from HDFS  and
query on it.
The data size is around 2GB (30 million records). The schema and query I am
running is as below.
The query takes around 05+ seconds to execute. 
I tried by adding 
       rdd.persist(StorageLevel.MEMORY_AND_DISK)
and
       rdd.cache()
but in both the cases it takes extra time, even if I give the below query as
second the data. (assuming Spark will cache it for first query).

case class EventDataTbl(ID: String, 
		ONum: String,
		RNum: String,
		Timestamp: String,
		Duration: String,
		Type: String,
		Source: String,
		OName: String,
		RName: String)

sql("SELECT COUNT(*) AS Frequency,ONum,OName,RNum,RName FROM EventDataTbl
GROUP BY ONum,OName,RNum,RName ORDER BY Frequency DESC LIMIT
10").collect().foreach(println)

Can you let me know if I am missing anything ?

Thanks,
  Shailesh




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-takes-unexpected-time-tp17925.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark SQL takes unexpected time

Posted by Michael Armbrust <mi...@databricks.com>.
People also store data off-heap by putting parquet data into Tachyon.

The optimization in 1.2 is to use the in-memory columnar cached format
instead of keeping row objects (and their boxed contents) around when you
call .cache().  This significantly reduces the number of live objects.
(since you have a single byte buffer per column batch).

On Tue, Nov 4, 2014 at 5:19 AM, Corey Nolet <cj...@gmail.com> wrote:

> Michael,
>
> I should probably look closer myself @ the design of 1.2 vs 1.1 but I've
> been curious why Spark's in-memory data uses the heap instead of putting it
> off heap? Was this the optimization that was done in 1.2 to alleviate GC?
>
> On Mon, Nov 3, 2014 at 8:52 PM, Shailesh Birari <sb...@wynyardgroup.com>
> wrote:
>
>> Yes, I am using Spark1.1.0 and have used rdd.registerTempTable().
>> I tried by adding sqlContext.cacheTable(), but it took 59 seconds (more
>> than
>> earlier).
>>
>> I also tried by changing schema to use Long data type in some fields but
>> seems conversion takes more time.
>> Is there any way to specify index ?  Though I checked and didn't found
>> any,
>> just want to confirm.
>>
>> For your reference here is the snippet of code.
>>
>>
>> -----------------------------------------------------------------------------------------------------------------
>> case class EventDataTbl(EventUID: Long,
>>                 ONum: Long,
>>                 RNum: Long,
>>                 Timestamp: java.sql.Timestamp,
>>                 Duration: String,
>>                 Type: String,
>>                 Source: String,
>>                 OName: String,
>>                 RName: String)
>>
>>                 val format = new java.text.SimpleDateFormat("yyyy-MM-dd
>> hh:mm:ss")
>>                 val cedFileName =
>> "hdfs://hadoophost:8020/demo/poc/JoinCsv/output_2"
>>                 val cedRdd = sc.textFile(cedFileName).map(_.split(",",
>> -1)).map(p =>
>> EventDataTbl(p(0).toLong, p(1).toLong, p(2).toLong, new
>> java.sql.Timestamp(format.parse(p(3)).getTime()), p(4), p(5), p(6), p(7),
>> p(8)))
>>
>>                 cedRdd.registerTempTable("EventDataTbl")
>>                 sqlCntxt.cacheTable("EventDataTbl")
>>
>>                 val t1 = System.nanoTime()
>>                 println("\n\n10 Most frequent conversations between the
>> Originators and
>> Recipients\n")
>>                 sql("SELECT COUNT(*) AS Frequency,ONum,OName,RNum,RName
>> FROM EventDataTbl
>> GROUP BY ONum,OName,RNum,RName ORDER BY Frequency DESC LIMIT
>> 10").collect().foreach(println)
>>                 val t2 = System.nanoTime()
>>                 println("Time taken " + (t2-t1)/1000000000.0 + " Seconds")
>>
>>
>> -----------------------------------------------------------------------------------------------------------------
>>
>> Thanks,
>>   Shailesh
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-takes-unexpected-time-tp17925p18017.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>

Re: Spark SQL takes unexpected time

Posted by Corey Nolet <cj...@gmail.com>.
Michael,

I should probably look closer myself @ the design of 1.2 vs 1.1 but I've
been curious why Spark's in-memory data uses the heap instead of putting it
off heap? Was this the optimization that was done in 1.2 to alleviate GC?

On Mon, Nov 3, 2014 at 8:52 PM, Shailesh Birari <sb...@wynyardgroup.com>
wrote:

> Yes, I am using Spark1.1.0 and have used rdd.registerTempTable().
> I tried by adding sqlContext.cacheTable(), but it took 59 seconds (more
> than
> earlier).
>
> I also tried by changing schema to use Long data type in some fields but
> seems conversion takes more time.
> Is there any way to specify index ?  Though I checked and didn't found any,
> just want to confirm.
>
> For your reference here is the snippet of code.
>
>
> -----------------------------------------------------------------------------------------------------------------
> case class EventDataTbl(EventUID: Long,
>                 ONum: Long,
>                 RNum: Long,
>                 Timestamp: java.sql.Timestamp,
>                 Duration: String,
>                 Type: String,
>                 Source: String,
>                 OName: String,
>                 RName: String)
>
>                 val format = new java.text.SimpleDateFormat("yyyy-MM-dd
> hh:mm:ss")
>                 val cedFileName =
> "hdfs://hadoophost:8020/demo/poc/JoinCsv/output_2"
>                 val cedRdd = sc.textFile(cedFileName).map(_.split(",",
> -1)).map(p =>
> EventDataTbl(p(0).toLong, p(1).toLong, p(2).toLong, new
> java.sql.Timestamp(format.parse(p(3)).getTime()), p(4), p(5), p(6), p(7),
> p(8)))
>
>                 cedRdd.registerTempTable("EventDataTbl")
>                 sqlCntxt.cacheTable("EventDataTbl")
>
>                 val t1 = System.nanoTime()
>                 println("\n\n10 Most frequent conversations between the
> Originators and
> Recipients\n")
>                 sql("SELECT COUNT(*) AS Frequency,ONum,OName,RNum,RName
> FROM EventDataTbl
> GROUP BY ONum,OName,RNum,RName ORDER BY Frequency DESC LIMIT
> 10").collect().foreach(println)
>                 val t2 = System.nanoTime()
>                 println("Time taken " + (t2-t1)/1000000000.0 + " Seconds")
>
>
> -----------------------------------------------------------------------------------------------------------------
>
> Thanks,
>   Shailesh
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-takes-unexpected-time-tp17925p18017.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Spark SQL takes unexpected time

Posted by Shailesh Birari <sb...@wynyardgroup.com>.
Yes, I am using Spark1.1.0 and have used rdd.registerTempTable().
I tried by adding sqlContext.cacheTable(), but it took 59 seconds (more than
earlier).

I also tried by changing schema to use Long data type in some fields but
seems conversion takes more time. 
Is there any way to specify index ?  Though I checked and didn't found any,
just want to confirm.

For your reference here is the snippet of code.

-----------------------------------------------------------------------------------------------------------------
case class EventDataTbl(EventUID: Long, 
		ONum: Long,
		RNum: Long,
		Timestamp: java.sql.Timestamp,
		Duration: String,
		Type: String,
		Source: String,
		OName: String,
		RName: String)

		val format = new java.text.SimpleDateFormat("yyyy-MM-dd hh:mm:ss")
		val cedFileName = "hdfs://hadoophost:8020/demo/poc/JoinCsv/output_2"
		val cedRdd = sc.textFile(cedFileName).map(_.split(",", -1)).map(p =>
EventDataTbl(p(0).toLong, p(1).toLong, p(2).toLong, new
java.sql.Timestamp(format.parse(p(3)).getTime()), p(4), p(5), p(6), p(7),
p(8)))

		cedRdd.registerTempTable("EventDataTbl")
		sqlCntxt.cacheTable("EventDataTbl")
		
		val t1 = System.nanoTime()
		println("\n\n10 Most frequent conversations between the Originators and
Recipients\n")
		sql("SELECT COUNT(*) AS Frequency,ONum,OName,RNum,RName FROM EventDataTbl
GROUP BY ONum,OName,RNum,RName ORDER BY Frequency DESC LIMIT
10").collect().foreach(println)
		val t2 = System.nanoTime()
		println("Time taken " + (t2-t1)/1000000000.0 + " Seconds")

-----------------------------------------------------------------------------------------------------------------

Thanks,
  Shailesh



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-takes-unexpected-time-tp17925p18017.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark SQL takes unexpected time

Posted by Michael Armbrust <mi...@databricks.com>.
If you are running on Spark 1.1 or earlier you'll want to use
rdd.registerTempTable(<tableName>) followed by
sqlContext.cacheTable(<tableName>) and then query that table.  rdd.cache()
is not using the optimized in-memory format and thus puts a lot of pressure
on the GC.  This is fixed in Spark 1.2 and .cache() should do what you want.

I'll also note that the caching in SQL will actually make things slower if
the data does not fit in memory.  So you should look in the storage tab of
the Spark Web UI and make sure all the partitions are fitting.

On Sun, Nov 2, 2014 at 8:47 PM, Shailesh Birari <sb...@wynyardgroup.com>
wrote:

> Hello,
>
> I have written an Spark SQL application which reads data from HDFS  and
> query on it.
> The data size is around 2GB (30 million records). The schema and query I am
> running is as below.
> The query takes around 05+ seconds to execute.
> I tried by adding
>        rdd.persist(StorageLevel.MEMORY_AND_DISK)
> and
>        rdd.cache()
> but in both the cases it takes extra time, even if I give the below query
> as
> second the data. (assuming Spark will cache it for first query).
>
> case class EventDataTbl(ID: String,
>                 ONum: String,
>                 RNum: String,
>                 Timestamp: String,
>                 Duration: String,
>                 Type: String,
>                 Source: String,
>                 OName: String,
>                 RName: String)
>
> sql("SELECT COUNT(*) AS Frequency,ONum,OName,RNum,RName FROM EventDataTbl
> GROUP BY ONum,OName,RNum,RName ORDER BY Frequency DESC LIMIT
> 10").collect().foreach(println)
>
> Can you let me know if I am missing anything ?
>
> Thanks,
>   Shailesh
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-takes-unexpected-time-tp17925.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>