You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Richard <fi...@gmail.com> on 2019/07/20 04:34:43 UTC

Spark SaveMode

Any reason why Spark's SaveMode doesn't have mode that ignore any Primary
Key/Unique constraint violations?

Let's say I'm using spark to migrate some data from Cassandra to Oracle, I
want the insert operation to be "ignore if exist primary keys" instead of
failing the whole batch.

Thanks,
Richard

Re: Spark SaveMode

Posted by Mich Talebzadeh <mi...@gmail.com>.
I dug some of my old stuff using Spark as ETL.

Regarding the question

"Any reason why Spark's SaveMode doesn't have mode that ignore any Primary
Key/Unique constraint violations?"

There is no way Spark can determine if PK constraint is violated until it
receives such message from Oracle through JDBC connection. In general
SaveMode is implanted as follows:

import org.apache.spark.sql.SaveMode

val saveMode = SaveMode.Append

Only SaveMode Overwrite or Append seem to work. The other mode like Ignore
etc do not work.

However, one can exclude records that already exist in Oracle through
reading the Oracle table in like my previous mail and excluding records
that already have PK in Oracle. This can be done through SQL itself by
creating tempView on top of your Oracle DF and Cassandra DF). Again ID is
PK constraint on the Oracle table

// find out IDs that do not exist (i.e. new records). FYI, dfdummy2 is your
Cassandra DF and s is your Oracle DF

dfdummy2.createOrReplaceTempView("dfdummy2")
s.createOrReplaceTempView("s")
//Create an Outer join between two DFs in SQL
var sqltext = """select dfdummy2.ID, CLUSTERED, SCATTERED, RANDOMISED,
RANDOM_STRING, SMALL_VC, PADDING FROM dfdummy2 LEFT OUTER JOIN s ON
dfdummy2.ID = s.ID WHERE s.ID IS NULL ORDER BY dfdummy2.ID"""
sql(sqltext).count()

// write the RS to Oracle table

// Put new data into Oracle table
val connectionProperties = new Properties
connectionProperties.put("user", _username)
connectionProperties.put("password", _password)
connectionProperties.put("jdbUrl", _ORACLEserver)
connectionProperties.put("jdbcDriver", driverName)

//broadcast jdbc connection parameters to cluster nodes
val brConnect = sc.broadcast(connectionProperties)

val saveMode = SaveMode.Append
sql(sqltext).write.mode(saveMode).jdbc(_ORACLEserver,
_dbschema+"."+_dbtable, connectionProperties)

HTH

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 20 Jul 2019 at 08:13, Mich Talebzadeh <mi...@gmail.com>
wrote:

> JDBC read from Oracle table requires Oracle jdbc driver ojdbc6.jar or
> higher. ojdbc6.jar works for 11 and 12c added as --jars <PATH>/ojdbc6.jar
>
> Example with parallel read  (4 connections) to Oracle with ID being your
> PK in Oracle table
>
> var _ORACLEserver= "jdbc:oracle:thin:@rhes564:1521:mydb12"
> var _username = "scratchpad"
> var _password = "xxxx"
> //
> //
> // Get minID and maxID first
> //
> val minID = HiveContext.read.format("jdbc").options(Map("url" ->
> _ORACLEserver,"dbtable" -> "(SELECT cast(MIN(ID) AS INT) AS maxID FROM
> scratchpad.dummy)",
>        "user" -> _username, "password" ->
> _password)).load().collect.apply(0).getDecimal(0).toString
> val maxID = HiveContext.read.format("jdbc").options(Map("url" ->
> _ORACLEserver,"dbtable" -> "(SELECT cast(MAX(ID) AS INT) AS maxID FROM
> scratchpad.dummy)",
>        "user" -> _username, "password" ->
> _password)).load().collect.apply(0).getDecimal(0).toString
> val s = HiveContext.read.format("jdbc").options(
>        Map("url" -> _ORACLEserver,
>        "dbtable" -> "(SELECT ID, CLUSTERED, SCATTERED, RANDOMISED,
> RANDOM_STRING, SMALL_VC, PADDING FROM scratchpad.dummy)",
>        "partitionColumn" -> "ID",
>        "lowerBound" -> minID,
>        "upperBound" -> maxID,
>        "numPartitions" -> "4",
>        "user" -> _username,
>        "password" -> _password)).load
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 20 Jul 2019 at 07:42, Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> This behaviour is governed by the underlying RDBMS for bulk insert, where
>> it either commits or roll backs.
>>
>> You can insert new rows into an staging table in Oracle (which is common
>> in ETL) and then insert/select into Oracle table in shell routine.
>>
>> The other way is to use JDBC in Spark to read Oracle table into a DF and
>> do a result set with Oracle DF and your DF and insert only those records
>> into Oracle.
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sat, 20 Jul 2019 at 05:35, Richard <fi...@gmail.com> wrote:
>>
>>> Any reason why Spark's SaveMode doesn't have mode that ignore any
>>> Primary Key/Unique constraint violations?
>>>
>>> Let's say I'm using spark to migrate some data from Cassandra to Oracle,
>>> I want the insert operation to be "ignore if exist primary keys" instead of
>>> failing the whole batch.
>>>
>>> Thanks,
>>> Richard
>>>
>>>

Re: Spark SaveMode

Posted by Mich Talebzadeh <mi...@gmail.com>.
JDBC read from Oracle table requires Oracle jdbc driver ojdbc6.jar or
higher. ojdbc6.jar works for 11 and 12c added as --jars <PATH>/ojdbc6.jar

Example with parallel read  (4 connections) to Oracle with ID being your PK
in Oracle table

var _ORACLEserver= "jdbc:oracle:thin:@rhes564:1521:mydb12"
var _username = "scratchpad"
var _password = "xxxx"
//
//
// Get minID and maxID first
//
val minID = HiveContext.read.format("jdbc").options(Map("url" ->
_ORACLEserver,"dbtable" -> "(SELECT cast(MIN(ID) AS INT) AS maxID FROM
scratchpad.dummy)",
       "user" -> _username, "password" ->
_password)).load().collect.apply(0).getDecimal(0).toString
val maxID = HiveContext.read.format("jdbc").options(Map("url" ->
_ORACLEserver,"dbtable" -> "(SELECT cast(MAX(ID) AS INT) AS maxID FROM
scratchpad.dummy)",
       "user" -> _username, "password" ->
_password)).load().collect.apply(0).getDecimal(0).toString
val s = HiveContext.read.format("jdbc").options(
       Map("url" -> _ORACLEserver,
       "dbtable" -> "(SELECT ID, CLUSTERED, SCATTERED, RANDOMISED,
RANDOM_STRING, SMALL_VC, PADDING FROM scratchpad.dummy)",
       "partitionColumn" -> "ID",
       "lowerBound" -> minID,
       "upperBound" -> maxID,
       "numPartitions" -> "4",
       "user" -> _username,
       "password" -> _password)).load

HTH

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 20 Jul 2019 at 07:42, Mich Talebzadeh <mi...@gmail.com>
wrote:

> This behaviour is governed by the underlying RDBMS for bulk insert, where
> it either commits or roll backs.
>
> You can insert new rows into an staging table in Oracle (which is common
> in ETL) and then insert/select into Oracle table in shell routine.
>
> The other way is to use JDBC in Spark to read Oracle table into a DF and
> do a result set with Oracle DF and your DF and insert only those records
> into Oracle.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 20 Jul 2019 at 05:35, Richard <fi...@gmail.com> wrote:
>
>> Any reason why Spark's SaveMode doesn't have mode that ignore any Primary
>> Key/Unique constraint violations?
>>
>> Let's say I'm using spark to migrate some data from Cassandra to Oracle,
>> I want the insert operation to be "ignore if exist primary keys" instead of
>> failing the whole batch.
>>
>> Thanks,
>> Richard
>>
>>

Re: Spark SaveMode

Posted by Mich Talebzadeh <mi...@gmail.com>.
This behaviour is governed by the underlying RDBMS for bulk insert, where
it either commits or roll backs.

You can insert new rows into an staging table in Oracle (which is common in
ETL) and then insert/select into Oracle table in shell routine.

The other way is to use JDBC in Spark to read Oracle table into a DF and do
a result set with Oracle DF and your DF and insert only those records into
Oracle.

HTH

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 20 Jul 2019 at 05:35, Richard <fi...@gmail.com> wrote:

> Any reason why Spark's SaveMode doesn't have mode that ignore any Primary
> Key/Unique constraint violations?
>
> Let's say I'm using spark to migrate some data from Cassandra to Oracle, I
> want the insert operation to be "ignore if exist primary keys" instead of
> failing the whole batch.
>
> Thanks,
> Richard
>
>

Re: Spark SaveMode

Posted by Jörn Franke <jo...@gmail.com>.
This is not an issue of Spark, but the underlying database. The primary key constraint has a purpose and ignoring it would defeat that purpose. 
Then to handle your use case, you would need to make multiple decisions that may imply you don’t want to simply insert if not exist. Maybe you want to do an upsert or how do you want to take into account deleted data?
You could use a Merge in Oracle to achieve what you have in mind. In Spark you would need to fetch the data from the Oracle database and then merge it in Spark with the new data depending on your requirements.

> Am 20.07.2019 um 06:34 schrieb Richard <fi...@gmail.com>:
> 
> Any reason why Spark's SaveMode doesn't have mode that ignore any Primary Key/Unique constraint violations?
> 
> Let's say I'm using spark to migrate some data from Cassandra to Oracle, I want the insert operation to be "ignore if exist primary keys" instead of failing the whole batch.
> 
> Thanks, 
> Richard 
> 

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org