You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by René Treffer <rt...@gmail.com> on 2015/06/01 10:10:22 UTC

spark 1.4 - test-loading 1786 mysql tables / a few TB

Hi *,

I used to run into a few problems with the jdbc/mysql integration and
thought it would be nice to load our whole db, doing nothing but .map(_ =>
1).aggregate(0)(_+_,_+_) on the DataFrames.
SparkSQL has to load all columns and process them so this should reveal
type errors like
SPARK-7897 Column with an unsigned bigint should be treated as DecimalType
in JDBCRDD <https://issues.apache.org/jira/browse/SPARK-7897>
SPARK-7697 <https://issues.apache.org/jira/browse/SPARK-7697>Column with an
unsigned int should be treated as long in JDBCRDD

The test was done on the 1.4 branch (checkout 2-3 days ago, local build,
running standalone with a 350G heap).

1. Date/Timestamp 0000-00-00

org.apache.spark.SparkException: Job aborted due to stage failure: Task 15
in stage 18.0 failed 1 times, most recent failure: Lost task 15.0 in stage
18.0 (TID 186, localhost): java.sql.SQLException: Value '0000-00-00
00:00:00' can not be represented as java.sql.Timestamp
        at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:998)

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 26.0 failed 1 times, most recent failure: Lost task 0.0 in stage
26.0 (TID 636, localhost): java.sql.SQLException: Value '0000-00-00' can
not be represented as java.sql.Date
        at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:998)

This was the most common error when I tried to load tables with
Date/Timestamp types.
Can be worked around by subqueries or by specifying those types to be
string and handling them afterwards.

2. Keywords as column names fail

SparkSQL does not enclose column names, e.g.
SELECT key,value FROM tablename
fails and should be
SELECT `key`,`value` FROM tablename

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 157.0 failed 1 times, most recent failure: Lost task 0.0 in stage
157.0 (TID 4322, localhost):
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an
error in your SQL syntax; check the manual that corresponds to your MySQL
server version for the right syntax to use near 'key,value FROM [XXXXXX]'

I'm not sure how to work around that issue except for manually writing
sub-queries (with all the performance problems that may cause).

3. Overloaded DB due to concurrency

While providing where clauses works well to parallelize the fetch it can
overload the DB, thus causing trouble (e.g. query/connection timeouts due
to an overloaded DB).
It would be nice to specify fetch parallelism independent from result
partitions (e.g. 100 partitions, but don't fetch more than 5 in parallel).
This can be emulated by loading just n partition at a time and doing a
union afterwards. (gouped(5).map(...)....)

Success

I've successfully loaded 8'573'651'154 rows from 1667 tables ( >93% success
rate). This is pretty awesome given that e.g. sqoop has failed horrible on
the same data.
Note that I didn't not verify that the retrieved data is valid. I've only
checked for fetch errors so far. But given that spark does not silence any
errors I'm quite confident that fetched data will be valid :-)

Regards,
  Rene Treffer

Re: spark 1.4 - test-loading 1786 mysql tables / a few TB

Posted by Reynold Xin <rx...@databricks.com>.
Thanks, René. I actually added a warning to the new JDBC reader/writer
interface for 1.4.0.

Even with that, I think we should support throttling JDBC; otherwise it's
too convenient for our users to DOS their production database servers!


  /**
   * Construct a [[DataFrame]] representing the database table accessible
via JDBC URL
   * url named table. Partitions of the table will be retrieved in parallel
based on the parameters
   * passed to this function.
   *
*   * Don't create too many partitions in parallel on a large cluster;
otherwise Spark might crash*
*   * your external database systems.*
   *
   * @param url JDBC database url of the form `jdbc:subprotocol:subname`
   * @param table Name of the table in the external database.
   * @param columnName the name of a column of integral type that will be
used for partitioning.
   * @param lowerBound the minimum value of `columnName` used to decide
partition stride
   * @param upperBound the maximum value of `columnName` used to decide
partition stride
   * @param numPartitions the number of partitions.  the range
`minValue`-`maxValue` will be split
   *                      evenly into this many partitions
   * @param connectionProperties JDBC database connection arguments, a list
of arbitrary string
   *                             tag/value. Normally at least a "user" and
"password" property
   *                             should be included.
   *
   * @since 1.4.0
   */


On Mon, Jun 1, 2015 at 1:54 AM, René Treffer <rt...@gmail.com> wrote:

> Hi,
>
> I'm using sqlContext.jdbc(uri, table, where).map(_ =>
> 1).aggregate(0)(_+_,_+_) on an interactive shell (where "where" is an
> Array[String] of 32 to 48 elements).  (The code is tailored to your db,
> specifically through the where conditions, I'd have otherwise post it)
> That should be the DataFrame API, but I'm just trying to load everything
> and discard it as soon as possible :-)
>
> (1) Never do a silent drop of the values by default: it kills confidence.
> An option sounds reasonable.  Some sort of insight / log would be great.
> (How many columns of what type were truncated? why?)
> Note that I could declare the field as string via JdbcDialects (thank you
> guys for merging that :-) ).
> I have quite bad experiences with silent drops / truncates of columns and
> thus _like_ the strict way of spark. It causes trouble but noticing later
> that your data was corrupted during conversion is even worse.
>
> (2) SPARK-8004 https://issues.apache.org/jira/browse/SPARK-8004
>
> (3) One option would be to make it safe to use, the other option would be
> to document the behavior (s.th. like "WARNING: this method tries to load
> as many partitions as possible, make sure your database can handle the load
> or load them in chunks and use union"). SPARK-8008
> https://issues.apache.org/jira/browse/SPARK-8008
>
> Regards,
>   Rene Treffer
>

Re: spark 1.4 - test-loading 1786 mysql tables / a few TB

Posted by René Treffer <rt...@gmail.com>.
Hi,

I'm using sqlContext.jdbc(uri, table, where).map(_ =>
1).aggregate(0)(_+_,_+_) on an interactive shell (where "where" is an
Array[String] of 32 to 48 elements).  (The code is tailored to your db,
specifically through the where conditions, I'd have otherwise post it)
That should be the DataFrame API, but I'm just trying to load everything
and discard it as soon as possible :-)

(1) Never do a silent drop of the values by default: it kills confidence.
An option sounds reasonable.  Some sort of insight / log would be great.
(How many columns of what type were truncated? why?)
Note that I could declare the field as string via JdbcDialects (thank you
guys for merging that :-) ).
I have quite bad experiences with silent drops / truncates of columns and
thus _like_ the strict way of spark. It causes trouble but noticing later
that your data was corrupted during conversion is even worse.

(2) SPARK-8004 https://issues.apache.org/jira/browse/SPARK-8004

(3) One option would be to make it safe to use, the other option would be
to document the behavior (s.th. like "WARNING: this method tries to load as
many partitions as possible, make sure your database can handle the load or
load them in chunks and use union"). SPARK-8008
https://issues.apache.org/jira/browse/SPARK-8008

Regards,
  Rene Treffer

Re: spark 1.4 - test-loading 1786 mysql tables / a few TB

Posted by Reynold Xin <rx...@databricks.com>.
Never mind my comment about 3. You were talking about the read side, while
I was thinking about the write side. Your workaround actually is a pretty
good idea. Can you create a JIRA for that as well?

On Monday, June 1, 2015, Reynold Xin <rx...@databricks.com> wrote:

> René,
>
> Thanks for sharing your experience. Are you using the DataFrame API or SQL?
>
> (1) Any recommendations on what we do w.r.t. out of range values? Should
> we silently turn them into a null? Maybe based on an option?
>
> (2) Looks like a good idea to always quote column names. The small tricky
> thing is each database seems to have its own unique quotes. Do you mind
> filing a JIRA for this?
>
> (3) It is somewhat hard to do because that'd require changing Spark's task
> scheduler. The easiest way maybe to coalesce it into a smaller number of
> partitions -- or we can coalesce for the user based on an option. Can you
> file a JIRA for this also?
>
> Thanks!
>
>
>
>
>
>
>
>
>
>
> On Mon, Jun 1, 2015 at 1:10 AM, René Treffer <rtreffer@gmail.com
> <javascript:_e(%7B%7D,'cvml','rtreffer@gmail.com');>> wrote:
>
>> Hi *,
>>
>> I used to run into a few problems with the jdbc/mysql integration and
>> thought it would be nice to load our whole db, doing nothing but .map(_ =>
>> 1).aggregate(0)(_+_,_+_) on the DataFrames.
>> SparkSQL has to load all columns and process them so this should reveal
>> type errors like
>> SPARK-7897 Column with an unsigned bigint should be treated as
>> DecimalType in JDBCRDD <https://issues.apache.org/jira/browse/SPARK-7897>
>> SPARK-7697 <https://issues.apache.org/jira/browse/SPARK-7697>Column with
>> an unsigned int should be treated as long in JDBCRDD
>>
>> The test was done on the 1.4 branch (checkout 2-3 days ago, local build,
>> running standalone with a 350G heap).
>>
>> 1. Date/Timestamp 0000-00-00
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 15 in stage 18.0 failed 1 times, most recent failure: Lost task 15.0 in
>> stage 18.0 (TID 186, localhost): java.sql.SQLException: Value '0000-00-00
>> 00:00:00' can not be represented as java.sql.Timestamp
>>         at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:998)
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
>> in stage 26.0 failed 1 times, most recent failure: Lost task 0.0 in stage
>> 26.0 (TID 636, localhost): java.sql.SQLException: Value '0000-00-00' can
>> not be represented as java.sql.Date
>>         at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:998)
>>
>> This was the most common error when I tried to load tables with
>> Date/Timestamp types.
>> Can be worked around by subqueries or by specifying those types to be
>> string and handling them afterwards.
>>
>> 2. Keywords as column names fail
>>
>> SparkSQL does not enclose column names, e.g.
>> SELECT key,value FROM tablename
>> fails and should be
>> SELECT `key`,`value` FROM tablename
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
>> in stage 157.0 failed 1 times, most recent failure: Lost task 0.0 in stage
>> 157.0 (TID 4322, localhost):
>> com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an
>> error in your SQL syntax; check the manual that corresponds to your MySQL
>> server version for the right syntax to use near 'key,value FROM [XXXXXX]'
>>
>> I'm not sure how to work around that issue except for manually writing
>> sub-queries (with all the performance problems that may cause).
>>
>> 3. Overloaded DB due to concurrency
>>
>> While providing where clauses works well to parallelize the fetch it can
>> overload the DB, thus causing trouble (e.g. query/connection timeouts due
>> to an overloaded DB).
>> It would be nice to specify fetch parallelism independent from result
>> partitions (e.g. 100 partitions, but don't fetch more than 5 in parallel).
>> This can be emulated by loading just n partition at a time and doing a
>> union afterwards. (gouped(5).map(...)....)
>>
>> Success
>>
>> I've successfully loaded 8'573'651'154 rows from 1667 tables ( >93%
>> success rate). This is pretty awesome given that e.g. sqoop has failed
>> horrible on the same data.
>> Note that I didn't not verify that the retrieved data is valid. I've only
>> checked for fetch errors so far. But given that spark does not silence any
>> errors I'm quite confident that fetched data will be valid :-)
>>
>> Regards,
>>   Rene Treffer
>>
>
>

Re: spark 1.4 - test-loading 1786 mysql tables / a few TB

Posted by Reynold Xin <rx...@databricks.com>.
René,

Thanks for sharing your experience. Are you using the DataFrame API or SQL?

(1) Any recommendations on what we do w.r.t. out of range values? Should we
silently turn them into a null? Maybe based on an option?

(2) Looks like a good idea to always quote column names. The small tricky
thing is each database seems to have its own unique quotes. Do you mind
filing a JIRA for this?

(3) It is somewhat hard to do because that'd require changing Spark's task
scheduler. The easiest way maybe to coalesce it into a smaller number of
partitions -- or we can coalesce for the user based on an option. Can you
file a JIRA for this also?

Thanks!










On Mon, Jun 1, 2015 at 1:10 AM, René Treffer <rt...@gmail.com> wrote:

> Hi *,
>
> I used to run into a few problems with the jdbc/mysql integration and
> thought it would be nice to load our whole db, doing nothing but .map(_ =>
> 1).aggregate(0)(_+_,_+_) on the DataFrames.
> SparkSQL has to load all columns and process them so this should reveal
> type errors like
> SPARK-7897 Column with an unsigned bigint should be treated as DecimalType
> in JDBCRDD <https://issues.apache.org/jira/browse/SPARK-7897>
> SPARK-7697 <https://issues.apache.org/jira/browse/SPARK-7697>Column with
> an unsigned int should be treated as long in JDBCRDD
>
> The test was done on the 1.4 branch (checkout 2-3 days ago, local build,
> running standalone with a 350G heap).
>
> 1. Date/Timestamp 0000-00-00
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 15
> in stage 18.0 failed 1 times, most recent failure: Lost task 15.0 in stage
> 18.0 (TID 186, localhost): java.sql.SQLException: Value '0000-00-00
> 00:00:00' can not be represented as java.sql.Timestamp
>         at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:998)
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 26.0 failed 1 times, most recent failure: Lost task 0.0 in stage
> 26.0 (TID 636, localhost): java.sql.SQLException: Value '0000-00-00' can
> not be represented as java.sql.Date
>         at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:998)
>
> This was the most common error when I tried to load tables with
> Date/Timestamp types.
> Can be worked around by subqueries or by specifying those types to be
> string and handling them afterwards.
>
> 2. Keywords as column names fail
>
> SparkSQL does not enclose column names, e.g.
> SELECT key,value FROM tablename
> fails and should be
> SELECT `key`,`value` FROM tablename
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 157.0 failed 1 times, most recent failure: Lost task 0.0 in stage
> 157.0 (TID 4322, localhost):
> com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an
> error in your SQL syntax; check the manual that corresponds to your MySQL
> server version for the right syntax to use near 'key,value FROM [XXXXXX]'
>
> I'm not sure how to work around that issue except for manually writing
> sub-queries (with all the performance problems that may cause).
>
> 3. Overloaded DB due to concurrency
>
> While providing where clauses works well to parallelize the fetch it can
> overload the DB, thus causing trouble (e.g. query/connection timeouts due
> to an overloaded DB).
> It would be nice to specify fetch parallelism independent from result
> partitions (e.g. 100 partitions, but don't fetch more than 5 in parallel).
> This can be emulated by loading just n partition at a time and doing a
> union afterwards. (gouped(5).map(...)....)
>
> Success
>
> I've successfully loaded 8'573'651'154 rows from 1667 tables ( >93%
> success rate). This is pretty awesome given that e.g. sqoop has failed
> horrible on the same data.
> Note that I didn't not verify that the retrieved data is valid. I've only
> checked for fetch errors so far. But given that spark does not silence any
> errors I'm quite confident that fetched data will be valid :-)
>
> Regards,
>   Rene Treffer
>