You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Gaini Rajeshwar <ra...@gmail.com> on 2016/01/11 08:43:03 UTC

GroupBy on DataFrame taking too much time

Hi All,

I have a table named *customer *(customer_id, event, country, .... ) in
postgreSQL database. This table is having more than 100 million rows.

I want to know number of events from each country. To achieve that i am
doing groupBY using spark as following.

*val dataframe1 = sqlContext.load("jdbc", Map("url" ->
"jdbc:postgresql://localhost/customerlogs?user=postgres&password=postgres",
"dbtable" -> "customer"))*


*dataframe1.groupBy("country").count().show()*

above code seems to be getting complete customer table before doing
groupBy. Because of that reason it is throwing the following error

*16/01/11 12:49:04 WARN HeartbeatReceiver: Removing executor 0 with no
recent heartbeats: 170758 ms exceeds timeout 120000 ms*
*16/01/11 12:49:04 ERROR TaskSchedulerImpl: Lost executor 0 on 10.2.12.59
<http://10.2.12.59>: Executor heartbeat timed out after 170758 ms*
*16/01/11 12:49:04 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
10.2.12.59): ExecutorLostFailure (executor 0 exited caused by one of the
running tasks) Reason: Executor heartbeat timed out after 170758 ms*

I am using spark 1.6.0

Is there anyway i can solve this ?

Thanks,
Rajeshwar Gaini.

Re: Tungsten in a mixed endian environment

Posted by Randy Swanberg <rs...@us.ibm.com>.
FWIW,  POWER is bi-endian.   AIX still runs big-endian on POWER,  but the 
latest Linux distros for POWER run little-endian (in fact Ubuntu for POWER 
only runs LE). 

> (x86 is little-endian and SPARC / POWER / ARM are big-endian; I'm sure
> that was just a typo)
> 
> On Tue, Jan 12, 2016 at 9:13 PM, Steve Loughran <st...@hortonworks.com> 
wrote:
> > It's notable that Hadoop doesn't like mixed-endianness; there is work
> > (primarily from Oracle) to have consistent byteswapping —that is: work
> > reliably on big-endian systems
> > https://issues.apache.org/jira/browse/HADOOP-11505 ). There's no 
motivation
> > to support mixed-endian clusters.
> >
> >
> > The majority of clusters x86, there's only 3 cpu families that are 
little
> > endian: Spark, Power, Arm. Adam has clearly been playing with Power + 
x86,
> > but I'd suspect that's experimentation, not production.
> >
> > What is probably worth checking is mixed endian-ness between client 
apps
> > submitting work and the servers: Java and Kryo serialization should 
handle
> > that automatically.


Randy Swanberg





Re: Tungsten in a mixed endian environment

Posted by Sean Owen <so...@cloudera.com>.
(x86 is little-endian and SPARC / POWER / ARM are big-endian; I'm sure
that was just a typo)

On Tue, Jan 12, 2016 at 9:13 PM, Steve Loughran <st...@hortonworks.com> wrote:
> It's notable that Hadoop doesn't like mixed-endianness; there is work
> (primarily from Oracle) to have consistent byteswapping —that is: work
> reliably on big-endian systems
> https://issues.apache.org/jira/browse/HADOOP-11505 ). There's no motivation
> to support mixed-endian clusters.
>
>
> The majority of clusters x86, there's only 3 cpu families that are little
> endian: Spark, Power, Arm. Adam has clearly been playing with Power + x86,
> but I'd suspect that's experimentation, not production.
>
> What is probably worth checking is mixed endian-ness between client apps
> submitting work and the servers: Java and Kryo serialization should handle
> that automatically.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: Tungsten in a mixed endian environment

Posted by Nong Li <no...@gmail.com>.
On Fri, Jan 15, 2016 at 1:30 AM, Tim Preece <te...@mail.com> wrote:

> So if Spark does not support heterogeneous endianness clusters, should
> Spark
> at least always support homogeneous endianess clusters ?
>
> I ask because I just noticed
> https://issues.apache.org/jira/browse/SPARK-12785 which appears to be
> introducing a new feature designed for Little Endian only.
>
I wouldn't say this is designed for little endian only. If anything, it is
designed to support both
but only little endian is implemented (and more importantly tested). I
don't think it would be
very hard to add big endian support and instead of producing wrong results,
it is going to
fail to run.


>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Tungsten-in-a-mixed-endian-environment-tp15975p16027.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> For additional commands, e-mail: dev-help@spark.apache.org
>
>

Re: Tungsten in a mixed endian environment

Posted by Tim Preece <te...@mail.com>.
So if Spark does not support heterogeneous endianness clusters, should Spark
at least always support homogeneous endianess clusters ?

I ask because I just noticed
https://issues.apache.org/jira/browse/SPARK-12785 which appears to be
introducing a new feature designed for Little Endian only.





--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Tungsten-in-a-mixed-endian-environment-tp15975p16027.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: Tungsten in a mixed endian environment

Posted by Steve Loughran <st...@hortonworks.com>.
On 12 Jan 2016, at 10:49, Reynold Xin <rx...@databricks.com>> wrote:

How big of a deal this use case is in a heterogeneous endianness environment? If we do want to fix it, we should do it when right before Spark shuffles data to minimize performance penalty, i.e. turn big-endian encoded data into little-indian encoded data before it goes on the wire. This is a pretty involved change and given other things that might break across heterogeneous endianness environments, I am not sure if it is high priority enough to even warrant review bandwidth right now.



This is a classic problem in distributed computing, which has two common strategies


the SunOS RPC strategy: fixed order. For Sun, hence NFS, the order was that of the Motorola 68K, so cost-free on Sun workstations. As SPARC used the same byte ordering; again, free. For x86 parts wanting to play, inefficient at both sending and receiving. Protobuf has a fixed order, but here little-endian https://developers.google.com/protocol-buffers/docs/encoding.

Apollo RPC DCE strategy: packets declare byte order, recipient gets to deal with it. This is efficient in a homogenous cluster of either endianness, as x86-x86 would be zero-byteswapping. The Apollo design ended up in DCE, which is what Windows Distributed COM uses.  ( http://pubs.opengroup.org/onlinepubs/9629399/chap14.htm ). If you look at that spec, you can see its floating point marshalling that's most trouble.

recipient-makes-good is ideal for clusters where the systems all share the same endianness: the amount of marshalling is guaranteed to be zero if all CPU parts are the same. That's clearly the defacto strategy in Spark. On contrast, the one-network-fomat is guaranteed to have 0 byteswaps on CPUs whose endian matches the wire format, guaranteed to be two for the other part (one at each end). For mixed-endian RPC there'll be one bswap, so the cost is the same as for the apollo DCE.

Bits of hadoop core do byteswap stuff; for performance this is in native code; code which has to use assembly and builtin functions for max efficiency.

It's a big patch —one that's designed for effective big-endian support, *ignoring heterogenous clusters*

https://issues.apache.org/jira/secure/attachment/12776247/HADOOP-11505.007.patch

All that stuff cropped up during Alan Burlinson sitting down to get Hadoop working properly on Sparc —that's a big enough project on its own that worrying about heterogenous systems isn't on his roadmap —and nobody else appears to care.

I'd suggest the same to IBM: focus effort & testing on Power + AIX rather than worrying about heterogenous systems.

-Steve

Re: Tungsten in a mixed endian environment

Posted by Steve Loughran <st...@hortonworks.com>.
On 12 Jan 2016, at 10:49, Reynold Xin <rx...@databricks.com>> wrote:

How big of a deal this use case is in a heterogeneous endianness environment? If we do want to fix it, we should do it when right before Spark shuffles data to minimize performance penalty, i.e. turn big-endian encoded data into little-indian encoded data before it goes on the wire. This is a pretty involved change and given other things that might break across heterogeneous endianness environments, I am not sure if it is high priority enough to even warrant review bandwidth right now.




It's notable that Hadoop doesn't like mixed-endianness; there is work (primarily from Oracle) to have consistent byteswapping —that is: work reliably on big-endian systems  https://issues.apache.org/jira/browse/HADOOP-11505 ). There's no motivation to support mixed-endian clusters.


The majority of clusters x86, there's only 3 cpu families that are little endian: Spark, Power, Arm. Adam has clearly been playing with Power + x86, but I'd suspect that's experimentation, not production.

What is probably worth checking is mixed endian-ness between client apps submitting work and the servers: Java and Kryo serialization should handle that automatically.

Re: Tungsten in a mixed endian environment

Posted by Reynold Xin <rx...@databricks.com>.
How big of a deal this use case is in a heterogeneous endianness
environment? If we do want to fix it, we should do it when right before
Spark shuffles data to minimize performance penalty, i.e. turn big-endian
encoded data into little-indian encoded data before it goes on the wire.
This is a pretty involved change and given other things that might break
across heterogeneous endianness environments, I am not sure if it is high
priority enough to even warrant review bandwidth right now.




On Tue, Jan 12, 2016 at 7:30 AM, Ted Yu <yu...@gmail.com> wrote:

> I logged SPARK-12778 where endian awareness in Platform.java should help
> in mixed endian set up.
>
> There could be other parts of the code base which are related.
>
> Cheers
>
> On Tue, Jan 12, 2016 at 7:01 AM, Adam Roberts <AR...@uk.ibm.com> wrote:
>
>> Hi all, I've been experimenting with DataFrame operations in a mixed
>> endian environment - a big endian master with little endian workers. With
>> tungsten enabled I'm encountering data corruption issues.
>>
>> For example, with this simple test code:
>>
>> import org.apache.spark.SparkContext
>> import org.apache.spark._
>> import org.apache.spark.sql.SQLContext
>>
>> object SimpleSQL {
>>   def main(args: Array[String]): Unit = {
>>     if (args.length != 1) {
>>       println("Not enough args, you need to specify the master url")
>>     }
>>     val masterURL = args(0)
>>     println("Setting up Spark context at: " + masterURL)
>>     val sparkConf = new SparkConf
>>     val sc = new SparkContext(masterURL, "Unsafe endian test", sparkConf)
>>
>>     println("Performing SQL tests")
>>
>>     val sqlContext = new SQLContext(sc)
>>     println("SQL context set up")
>>     val df = sqlContext.read.json("/tmp/people.json")
>>     df.show()
>>     println("Selecting everyone's age and adding one to it")
>>     df.select(df("name"), df("age") + 1).show()
>>     println("Showing all people over the age of 21")
>>     df.filter(df("age") > 21).show()
>>     println("Counting people by age")
>>     df.groupBy("age").count().show()
>>   }
>> }
>>
>> Instead of getting
>>
>> +----+-----+
>> | age|count|
>> +----+-----+
>> |null|    1|
>> |  19|    1|
>> |  30|    1|
>> +----+-----+
>>
>> I get the following with my mixed endian set up:
>>
>> +-------------------+-----------------+
>> |                age|            count|
>> +-------------------+-----------------+
>> |               null|                1|
>> |1369094286720630784|72057594037927936|
>> |                 30|                1|
>> +-------------------+-----------------+
>>
>> and on another run:
>>
>> +-------------------+-----------------+
>> |                age|            count|
>> +-------------------+-----------------+
>> |                  0|72057594037927936|
>> |                 19|                1|
>>
>> Is Spark expected to work in such an environment? If I turn off tungsten
>> (sparkConf.set("spark.sql.tungsten.enabled", "false"), in 20 runs I don't
>> see any problems.
>>
>> Unless stated otherwise above:
>> IBM United Kingdom Limited - Registered in England and Wales with number
>> 741598.
>> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU
>>
>
>

Re: Tungsten in a mixed endian environment

Posted by Ted Yu <yu...@gmail.com>.
I logged SPARK-12778 where endian awareness in Platform.java should
help in mixed
endian set up.

There could be other parts of the code base which are related.

Cheers

On Tue, Jan 12, 2016 at 7:01 AM, Adam Roberts <AR...@uk.ibm.com> wrote:

> Hi all, I've been experimenting with DataFrame operations in a mixed
> endian environment - a big endian master with little endian workers. With
> tungsten enabled I'm encountering data corruption issues.
>
> For example, with this simple test code:
>
> import org.apache.spark.SparkContext
> import org.apache.spark._
> import org.apache.spark.sql.SQLContext
>
> object SimpleSQL {
>   def main(args: Array[String]): Unit = {
>     if (args.length != 1) {
>       println("Not enough args, you need to specify the master url")
>     }
>     val masterURL = args(0)
>     println("Setting up Spark context at: " + masterURL)
>     val sparkConf = new SparkConf
>     val sc = new SparkContext(masterURL, "Unsafe endian test", sparkConf)
>
>     println("Performing SQL tests")
>
>     val sqlContext = new SQLContext(sc)
>     println("SQL context set up")
>     val df = sqlContext.read.json("/tmp/people.json")
>     df.show()
>     println("Selecting everyone's age and adding one to it")
>     df.select(df("name"), df("age") + 1).show()
>     println("Showing all people over the age of 21")
>     df.filter(df("age") > 21).show()
>     println("Counting people by age")
>     df.groupBy("age").count().show()
>   }
> }
>
> Instead of getting
>
> +----+-----+
> | age|count|
> +----+-----+
> |null|    1|
> |  19|    1|
> |  30|    1|
> +----+-----+
>
> I get the following with my mixed endian set up:
>
> +-------------------+-----------------+
> |                age|            count|
> +-------------------+-----------------+
> |               null|                1|
> |1369094286720630784|72057594037927936|
> |                 30|                1|
> +-------------------+-----------------+
>
> and on another run:
>
> +-------------------+-----------------+
> |                age|            count|
> +-------------------+-----------------+
> |                  0|72057594037927936|
> |                 19|                1|
>
> Is Spark expected to work in such an environment? If I turn off tungsten
> (sparkConf.set("spark.sql.tungsten.enabled", "false"), in 20 runs I don't
> see any problems.
>
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number
> 741598.
> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU
>

Tungsten in a mixed endian environment

Posted by Adam Roberts <AR...@uk.ibm.com>.
Hi all, I've been experimenting with DataFrame operations in a mixed 
endian environment - a big endian master with little endian workers. With 
tungsten enabled I'm encountering data corruption issues.

For example, with this simple test code:

import org.apache.spark.SparkContext
import org.apache.spark._
import org.apache.spark.sql.SQLContext

object SimpleSQL {
  def main(args: Array[String]): Unit = {
    if (args.length != 1) {
      println("Not enough args, you need to specify the master url")
    }
    val masterURL = args(0)
    println("Setting up Spark context at: " + masterURL)
    val sparkConf = new SparkConf
    val sc = new SparkContext(masterURL, "Unsafe endian test", sparkConf)

    println("Performing SQL tests")

    val sqlContext = new SQLContext(sc)
    println("SQL context set up")
    val df = sqlContext.read.json("/tmp/people.json")
    df.show()
    println("Selecting everyone's age and adding one to it")
    df.select(df("name"), df("age") + 1).show()
    println("Showing all people over the age of 21")
    df.filter(df("age") > 21).show()
    println("Counting people by age")
    df.groupBy("age").count().show()
  }
} 

Instead of getting

+----+-----+
| age|count|
+----+-----+
|null|    1|
|  19|    1|
|  30|    1|
+----+-----+ 

I get the following with my mixed endian set up:

+-------------------+-----------------+
|                age|            count|
+-------------------+-----------------+
|               null|                1|
|1369094286720630784|72057594037927936|
|                 30|                1|
+-------------------+-----------------+ 

and on another run:

+-------------------+-----------------+
|                age|            count|
+-------------------+-----------------+
|                  0|72057594037927936|
|                 19|                1| 

Is Spark expected to work in such an environment? If I turn off tungsten (
sparkConf.set("spark.sql.tungsten.enabled", "false"), in 20 runs I don't 
see any problems.

Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 
741598. 
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU

Re: GroupBy on DataFrame taking too much time

Posted by Todd Nist <ts...@gmail.com>.
Hi Rajeshwar Gaini,

dbtable can be any valid sql query, simple define it as a sub query,
something like:


  val query = "(SELECT country, count(*) FROM customer group by country) as
X"

  val df1 = sqlContext.read
    .format("jdbc")
    .option("url", url)
    .option("user", username)
    .option("password", pwd)
    .option("driver", "driverClassNameHere")
    .option("dbtable", query)
    .load()

Not sure if that's what your looking for or not.

HTH.

-Todd

On Mon, Jan 11, 2016 at 3:47 AM, Gaini Rajeshwar <
raja.rajeshwar2006@gmail.com> wrote:

> There is no problem with the sql read. When i do the following it is
> working fine.
>
> *val dataframe1 = sqlContext.load("jdbc", Map("url" ->
> "jdbc:postgresql://localhost/customerlogs?user=postgres&password=postgres",
> "dbtable" -> "customer"))*
>
> *dataframe1.filter("country = 'BA'").show()*
>
> On Mon, Jan 11, 2016 at 1:41 PM, Xingchi Wang <re...@gmail.com> wrote:
>
>> Error happend at the "Lost task 0.0 in stage 0.0", I think it is not the
>> "groupBy" problem, it's the sql read the "customer" table issue,
>> please check the jdbc link and the data is loaded successfully??
>>
>> Thanks
>> Xingchi
>>
>> 2016-01-11 15:43 GMT+08:00 Gaini Rajeshwar <ra...@gmail.com>
>> :
>>
>>> Hi All,
>>>
>>> I have a table named *customer *(customer_id, event, country, .... ) in
>>> postgreSQL database. This table is having more than 100 million rows.
>>>
>>> I want to know number of events from each country. To achieve that i am
>>> doing groupBY using spark as following.
>>>
>>> *val dataframe1 = sqlContext.load("jdbc", Map("url" ->
>>> "jdbc:postgresql://localhost/customerlogs?user=postgres&password=postgres",
>>> "dbtable" -> "customer"))*
>>>
>>>
>>> *dataframe1.groupBy("country").count().show()*
>>>
>>> above code seems to be getting complete customer table before doing
>>> groupBy. Because of that reason it is throwing the following error
>>>
>>> *16/01/11 12:49:04 WARN HeartbeatReceiver: Removing executor 0 with no
>>> recent heartbeats: 170758 ms exceeds timeout 120000 ms*
>>> *16/01/11 12:49:04 ERROR TaskSchedulerImpl: Lost executor 0 on
>>> 10.2.12.59 <http://10.2.12.59>: Executor heartbeat timed out after 170758
>>> ms*
>>> *16/01/11 12:49:04 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID
>>> 0, 10.2.12.59): ExecutorLostFailure (executor 0 exited caused by one of the
>>> running tasks) Reason: Executor heartbeat timed out after 170758 ms*
>>>
>>> I am using spark 1.6.0
>>>
>>> Is there anyway i can solve this ?
>>>
>>> Thanks,
>>> Rajeshwar Gaini.
>>>
>>
>>
>

Re: GroupBy on DataFrame taking too much time

Posted by Gaini Rajeshwar <ra...@gmail.com>.
There is no problem with the sql read. When i do the following it is
working fine.

*val dataframe1 = sqlContext.load("jdbc", Map("url" ->
"jdbc:postgresql://localhost/customerlogs?user=postgres&password=postgres",
"dbtable" -> "customer"))*

*dataframe1.filter("country = 'BA'").show()*

On Mon, Jan 11, 2016 at 1:41 PM, Xingchi Wang <re...@gmail.com> wrote:

> Error happend at the "Lost task 0.0 in stage 0.0", I think it is not the
> "groupBy" problem, it's the sql read the "customer" table issue,
> please check the jdbc link and the data is loaded successfully??
>
> Thanks
> Xingchi
>
> 2016-01-11 15:43 GMT+08:00 Gaini Rajeshwar <ra...@gmail.com>:
>
>> Hi All,
>>
>> I have a table named *customer *(customer_id, event, country, .... ) in
>> postgreSQL database. This table is having more than 100 million rows.
>>
>> I want to know number of events from each country. To achieve that i am
>> doing groupBY using spark as following.
>>
>> *val dataframe1 = sqlContext.load("jdbc", Map("url" ->
>> "jdbc:postgresql://localhost/customerlogs?user=postgres&password=postgres",
>> "dbtable" -> "customer"))*
>>
>>
>> *dataframe1.groupBy("country").count().show()*
>>
>> above code seems to be getting complete customer table before doing
>> groupBy. Because of that reason it is throwing the following error
>>
>> *16/01/11 12:49:04 WARN HeartbeatReceiver: Removing executor 0 with no
>> recent heartbeats: 170758 ms exceeds timeout 120000 ms*
>> *16/01/11 12:49:04 ERROR TaskSchedulerImpl: Lost executor 0 on 10.2.12.59
>> <http://10.2.12.59>: Executor heartbeat timed out after 170758 ms*
>> *16/01/11 12:49:04 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID
>> 0, 10.2.12.59): ExecutorLostFailure (executor 0 exited caused by one of the
>> running tasks) Reason: Executor heartbeat timed out after 170758 ms*
>>
>> I am using spark 1.6.0
>>
>> Is there anyway i can solve this ?
>>
>> Thanks,
>> Rajeshwar Gaini.
>>
>
>

Re: GroupBy on DataFrame taking too much time

Posted by Xingchi Wang <re...@gmail.com>.
Error happend at the "Lost task 0.0 in stage 0.0", I think it is not the
"groupBy" problem, it's the sql read the "customer" table issue,
please check the jdbc link and the data is loaded successfully??

Thanks
Xingchi

2016-01-11 15:43 GMT+08:00 Gaini Rajeshwar <ra...@gmail.com>:

> Hi All,
>
> I have a table named *customer *(customer_id, event, country, .... ) in
> postgreSQL database. This table is having more than 100 million rows.
>
> I want to know number of events from each country. To achieve that i am
> doing groupBY using spark as following.
>
> *val dataframe1 = sqlContext.load("jdbc", Map("url" ->
> "jdbc:postgresql://localhost/customerlogs?user=postgres&password=postgres",
> "dbtable" -> "customer"))*
>
>
> *dataframe1.groupBy("country").count().show()*
>
> above code seems to be getting complete customer table before doing
> groupBy. Because of that reason it is throwing the following error
>
> *16/01/11 12:49:04 WARN HeartbeatReceiver: Removing executor 0 with no
> recent heartbeats: 170758 ms exceeds timeout 120000 ms*
> *16/01/11 12:49:04 ERROR TaskSchedulerImpl: Lost executor 0 on 10.2.12.59
> <http://10.2.12.59>: Executor heartbeat timed out after 170758 ms*
> *16/01/11 12:49:04 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> 10.2.12.59): ExecutorLostFailure (executor 0 exited caused by one of the
> running tasks) Reason: Executor heartbeat timed out after 170758 ms*
>
> I am using spark 1.6.0
>
> Is there anyway i can solve this ?
>
> Thanks,
> Rajeshwar Gaini.
>