You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Mich Talebzadeh <mi...@gmail.com> on 2021/06/25 07:25:16 UTC

Lift the limitation of Spark JDBC handling of individual rows with DML

*Challenge*

Insert data from Spark dataframe when one or more columns in theOracle
table rely on some derived_colums dependent on data in one or more
dataframe columns.

Standard JDBC from Spark to Oracle does batch insert of dataframe into
Oracle *so it cannot handle these derived columns*. Refer below

*dataFrame.* \
            write. \
            format("jdbc"). \
            option("url", url of Oracle). \
            *option("dbtable", schema.tableName)*. \
            option("user", user). \
            option("password", password). \
            option("driver", Oracle driver). \
            mode(mode). \
            *save()*

This writes the whole content of the dataframe to the Oracle table. Cannot
replace  schema.tableName  with INSERT statement

*Possible solution*


   1. Need a cursor based solution. Create a cursor from Spark dataframe.
   So we can walk through every row and get the value of each column from the
   dataframe
   2. Oracle provides the cx_Oracle package.  cx_Oracle
   <https://oracle.github.io/python-cx_Oracle/> is a Python extension
   module that enables access to Oracle Database. It conforms to the Python
   database API 2.0 specification
   <http://www.python.org/topics/database/DatabaseAPI-2.0.html> with a
   considerable number of additions and a couple of exclusions. It is
   maintained by Oracle.
   3. Using cx_Oracle we should be able to create a Connection type to
   Oracle and use Connection.cursor() to deal with rows. See below


This is an example

Create connection to Oracle. Need to install cx_oracle package in PySpark


import cx_Oracle

def loadIntoOracleTableWithCursor(self, df):
              # set Oracle details
              tableName = "randomdata"
        fullyQualifiedTableName =
self.config['OracleVariables']['dbschema']+'.'+tableName
        user = self.config['OracleVariables']['oracle_user']
        password = self.config['OracleVariables']['oracle_password']
        serverName = self.config['OracleVariables']['oracleHost']
        port = self.config['OracleVariables']['oraclePort']
        serviceName = self.config['OracleVariables']['serviceName']
        dsn_tns = cx_Oracle.makedsn(serverName, port,
service_name=serviceName)
        # create connection conn
        conn = cx_Oracle.connect(user, password, dsn_tns)
        cursor = conn.cursor()
        # df is the dataframe containing the data. Let us build a cursor on
it.

               for row in df.rdd.collect():
            # get individual column values from the dataframe
            id = row[0]
            clustered = row[1]
            scattered = row[2]
            randomised = row[3]
            random_string = row[4]
            small_vc = row[5]
            padding = row[6]
            # Build INSERT/SELECT statement to be executed in Oracle. This
is what we are sending for every row to the Oracle table. Oracle table has
a column called *derived_col *that dataframe does not have it.
                      #  That is the one that is derived from some value on
the dataframe column(s). For example here I assign *derived_col = cos(id)* and
pass it in sqlText. You need {} to pass the value and enclose i single
quotes
                       #  if the column is character type
            sqlText = f"""insert into {fullyQualifiedTableName}
(id,clustered,scattered,randomised,random_string,small_vc,padding,
*derived_col)*
                      values
({id},{clustered},{scattered},{randomised},'{random_string}','{small_vc}','{padding}',
*cos({id*}))"""
            print(sqlText)
            cursor.execute(sqlText)
            conn.commit()

Our dataframe has 10 rows and id in Oracle table has been made the primary
key


scratchpad@ORASOURCE.MICH.LOCAL> CREATE TABLE scratchpad.randomdata
  2  (
  3      "ID" NUMBER(*,0),
  4      "CLUSTERED" NUMBER(*,0),
  5      "SCATTERED" NUMBER(*,0),
  6      "RANDOMISED" NUMBER(*,0),
  7      "RANDOM_STRING" VARCHAR2(50 BYTE),
  8      "SMALL_VC" VARCHAR2(50 BYTE),
  9      "PADDING" VARCHAR2(4000 BYTE),
 10      "DERIVED_COL" FLOAT(126)
 11  );

Table created.
scratchpad@ORASOURCE.MICH.LOCAL> ALTER TABLE scratchpad.randomdata ADD
CONSTRAINT randomdata_PK PRIMARY KEY (ID);
Table altered.

Run it and see the output of  print(sqlText)

insert into SCRATCHPAD.randomdata
(id,clustered,scattered,randomised,random_string,small_vc,padding,derived_col)
                      values
(1,0.0,0.0,2.0,'KZWeqhFWCEPyYngFbyBMWXaSCrUZoLgubbbPIayRnBUbHoWCFJ','

 1','xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',cos(1))

This works fine. It creates the rows and does a commit


*What is needed*


We need to implement a JDBC connection in Spark such that it handles DML in
addition to DQ.


JDBC option option("dbtable", schema.tableName) should be enhanced to
replace schema.tableName with an equivalent statement to allow DML to go
through.


*Benefits*


This will enable standard JDBC calls from Spark to handle all conditions
rather than one bulk insert.



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.