You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by ry...@gmail.com on 2018/08/29 14:49:15 UTC

Spark code to write to MySQL and Hive

Hi,

 

Can anyone help me to understand what is happening with my code ?

 

I wrote a Spark application to read from a MySQL table [that already has 4
records], Create a new DF by adding 10 to the ID field.  Then, I wanted to
write the new DF to MySQL as well as to Hive. 

 

I am surprised to see additional set of records in Hive !! I am not able to
understand how the newDF has records with IDs 21 to 24.  I know that a DF is
immutable. If so, how come it has 4 records at one point and 8 records at
later point ?

 


// Read table from mySQL.
val mysqlDF = spark.read.jdbc(jdbcUrl, table, properties)
println("I am back from mySql")




 

 

 

 

mysqlDF.show()




 

 

 

 

 

// Create a new Dataframe with column 'id' increased to avoid Duplicate
primary keys
val newDF = mysqlDF.select((col("id") + 10).as("id"), col("country"),
col("city"))
newDF.printSchema()
newDF.show()




 

 

// Insert records into the MySQL table.
newDF.write
  .mode(SaveMode.Append)
  .jdbc(jdbcUrl, table, properties)




 

// Write to Hive - This Creates a new table.
newDF.write.saveAsTable("cities")
newDF.show()

 

 

Records already existing in mySql

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

|  1|           USA|Palo Alto|

|  2|Czech Republic|     Brno|

|  3|           USA|Sunnyvale|

|  4|          null|     null|

+---+--------------+---------+

 

root

|-- id: long (nullable = false)

|-- country: string (nullable = true)

|-- city: string (nullable = true)

 

newDF.show()

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

| 11|           USA|Palo Alto|

| 12|Czech Republic|     Brno|

| 13|           USA|Sunnyvale|

| 14|          null|     null|

+---+--------------+---------+

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

| 11|           USA|Palo Alto|

| 12|Czech Republic|     Brno|

| 13|           USA|Sunnyvale|

| 14|          null|     null|

| 24|          null|     null|

| 23|           USA|Sunnyvale|

| 22|Czech Republic|     Brno|

| 21|           USA|Palo Alto|

+---+--------------+---------+

 

 

Thanks for you time. 

Ravi


Re: Spark code to write to MySQL and Hive

Posted by Sonal Goyal <so...@gmail.com>.
If you have the flexibility to append a new column to the table, you could
add an isUpdated column which by default is 0. so mysqlDF would read the
rows with isUpdated=0 and newDF would insert/append the rows with isUpdated
= 1.

Actually, whats your use case - you seem to be updating a table while also
trying to keep the old state. What makes you not write to a new table
instead?

Thanks,
Sonal
Nube Technologies <http://www.nubetech.co>

<http://in.linkedin.com/in/sonalgoyal>



On Wed, Aug 29, 2018 at 11:12 PM, <ry...@gmail.com> wrote:

> Thank you Jacek.  You’re right.  Unless a DF is persisted / Cached [Both
> are same I believe], the DF is reevaluated every time it is referenced in
> an action.  [??]
>
> So far, I have been reading/writing to files. This point never came to my
> attention.
>
>
>
> So, instead of thinking DataFrames as “Fixed data once created by an
> action”, it makes sense to think of them as kind of “expressions”.  Unless
> persisted, they’re reevaluated and can get/lose their old data based on the
> changes in their sources.
>
>
>
> Is there anyway to restrict Spark to not to read the newly inserted data ?
> [the reads look like Uncommitted Reads in DB2] ?
>
>
>
>
>
>
>
> Thanks,
>
> Ravi
>
>
>
> *From:* Jacek Laskowski <ja...@japila.pl>
> *Sent:* Wednesday, August 29, 2018 8:57 PM
> *To:* ryandam.9@gmail.com
> *Cc:* user @spark <us...@spark.apache.org>
> *Subject:* Re: Spark code to write to MySQL and Hive
>
>
>
> Hi,
>
>
>
> I haven't checked my answer (too lazy today), but think I know what might
> be going on.
>
>
>
> tl;dr Use cache to preserve the initial set of rows from mysql
>
>
>
> After you append new rows, you will have twice as many rows as you had
> previously. Correct?
>
>
>
> Since newDF references the table every time you use it in a structured
> query, say to write it to a table, the source table will get re-loaded and
> hence the number of rows changes.
>
>
>
> What you should do is to execute newDF.cache.count right after val newDF =
> mysqlDF.select... so the data (rows) remains on executors and won't get
> reloaded.
>
>
>
> Hope that helps.
>
>
>
> Pozdrawiam,
>
> Jacek Laskowski
>
> ----
>
> https://about.me/JacekLaskowski
>
> Mastering Spark SQL https://bit.ly/mastering-spark-sql
>
> Spark Structured Streaming https://bit.ly/spark-structured-streaming
>
> Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
>
> Follow me at https://twitter.com/jaceklaskowski
>
>
>
>
>
> On Wed, Aug 29, 2018 at 4:59 PM <ry...@gmail.com> wrote:
>
> Sorry, last mail format was not good.
>
>
>
> *println*(*"Going to talk to mySql"*)
>
>
> *// Read table from mySQL.**val *mysqlDF = spark.read.jdbc(jdbcUrl,
> table, properties)
> *println*(*"I am back from mySql"*)
>
> mysqlDF.show()
>
>
> *// Create a new Dataframe with column 'id' increased to avoid Duplicate
> primary keys**val *newDF = mysqlDF.select((*col*(*"id"*) + 10).as(*"id"*),
> *col*(*"country"*), *col*(*"city"*))
> newDF.printSchema()
> newDF.show()
>
>
> *// Insert records into the table.*newDF.write
>   .mode(SaveMode.*Append*)
>   .jdbc(jdbcUrl, table, properties)
>
>
> *// Write to Hive - This Creates a new table.*newDF.write.saveAsTable(
> *"cities"*)
> newDF.show()
>
>
>
>
>
>
>
> Going to talk to mySql
>
> I am back from mySql
>
> +---+--------------+---------+
>
> | id|       country|     city|
>
> +---+--------------+---------+
>
> |  1|           USA|Palo Alto|
>
> |  2|Czech Republic|     Brno|
>
> |  3|           USA|Sunnyvale|
>
> |  4|          null|     null|
>
> +---+--------------+---------+
>
>
>
> root
>
> |-- id: long (nullable = false)
>
> |-- country: string (nullable = true)
>
> |-- city: string (nullable = true)
>
>
>
> +---+--------------+---------+
>
> | id|       country|     city|
>
> +---+--------------+---------+
>
> | 11|           USA|Palo Alto|
>
> | 12|Czech Republic|     Brno|
>
> | 13|           USA|Sunnyvale|
>
> | 14|          null|     null|
>
> +---+--------------+---------+
>
>
>
> +---+--------------+---------+
>
> | id|       country|     city|
>
> +---+--------------+---------+
>
> | 11|           USA|Palo Alto|
>
> | 12|Czech Republic|     Brno|
>
> | 13|           USA|Sunnyvale|
>
> | 14|          null|     null|
>
> | 24|          null|     null|
>
> | 23|           USA|Sunnyvale|
>
> | 22|Czech Republic|     Brno|
>
> | 21|           USA|Palo Alto|
>
> +---+--------------+---------+
>
>
>
> Thanks,
>
> Ravi
>
>
>
> *From:* ryandam.9@gmail.com <ry...@gmail.com>
> *Sent:* Wednesday, August 29, 2018 8:19 PM
> *To:* user@spark.apache.org
> *Subject:* Spark code to write to MySQL and Hive
>
>
>
> Hi,
>
>
>
> Can anyone help me to understand what is happening with my code ?
>
>
>
> I wrote a Spark application to read from a MySQL table [that already has 4
> records], Create a new DF by adding 10 to the ID field.  Then, I wanted to
> write the new DF to MySQL as well as to Hive.
>
>
>
> I am surprised to see additional set of records in Hive !! I am not able
> to understand how the *newDF *has records with IDs 21 to 24.  I know that
> a DF is immutable. If so, how come it has 4 records at one point and 8
> records at later point ?
>
>
>
>
> *// Read table from mySQL.**val *mysqlDF = spark.read.jdbc(jdbcUrl,
> table, properties)
> *println*(*"I am back from mySql"*)
>
>
>
>
>
>
>
>
>
> mysqlDF.show()
>
>
>
>
>
>
>
>
>
>
>
>
> *// Create a new Dataframe with column 'id' increased to avoid Duplicate
> primary keys**val *newDF = mysqlDF.select((*col*(*"id"*) + 10).as(*"id"*),
> *col*(*"country"*), *col*(*"city"*))
> newDF.printSchema()
> newDF.show()
>
>
>
>
>
>
> *// Insert records into the MySQL table.*newDF.write
>   .mode(SaveMode.*Append*)
>   .jdbc(jdbcUrl, table, properties)
>
>
>
>
> *// Write to Hive - This Creates a new table.*newDF.write.saveAsTable(
> *"cities"*)
> newDF.show()
>
>
>
>
>
> *Records already existing in mySql*
>
>
>
> +---+--------------+---------+
>
> | id|       country|     city|
>
> +---+--------------+---------+
>
> |  1|           USA|Palo Alto|
>
> |  2|Czech Republic|     Brno|
>
> |  3|           USA|Sunnyvale|
>
> |  4|          null|     null|
>
> +---+--------------+---------+
>
>
>
> root
>
> |-- id: long (nullable = false)
>
> |-- country: string (nullable = true)
>
> |-- city: string (nullable = true)
>
>
>
> *newDF.show()*
>
>
>
> +---+--------------+---------+
>
> | id|       country|     city|
>
> +---+--------------+---------+
>
> | 11|           USA|Palo Alto|
>
> | 12|Czech Republic|     Brno|
>
> | 13|           USA|Sunnyvale|
>
> | 14|          null|     null|
>
> +---+--------------+---------+
>
>
>
> +---+--------------+---------+
>
> | id|       country|     city|
>
> +---+--------------+---------+
>
> | 11|           USA|Palo Alto|
>
> | 12|Czech Republic|     Brno|
>
> | 13|           USA|Sunnyvale|
>
> | 14|          null|     null|
>
> | 24|          null|     null|
>
> | 23|           USA|Sunnyvale|
>
> | 22|Czech Republic|     Brno|
>
> | 21|           USA|Palo Alto|
>
> +---+--------------+---------+
>
>
>
>
>
> Thanks for you time.
>
> Ravi
>
>

RE: Spark code to write to MySQL and Hive

Posted by ry...@gmail.com.
Thank you Jacek.  You’re right.  Unless a DF is persisted / Cached [Both are same I believe], the DF is reevaluated every time it is referenced in an action.  [??]

So far, I have been reading/writing to files. This point never came to my attention. 

 

So, instead of thinking DataFrames as “Fixed data once created by an action”, it makes sense to think of them as kind of “expressions”.  Unless persisted, they’re reevaluated and can get/lose their old data based on the changes in their sources. 

 

Is there anyway to restrict Spark to not to read the newly inserted data ? [the reads look like Uncommitted Reads in DB2] ?

 

 



 

Thanks,

Ravi

 

From: Jacek Laskowski <ja...@japila.pl> 
Sent: Wednesday, August 29, 2018 8:57 PM
To: ryandam.9@gmail.com
Cc: user @spark <us...@spark.apache.org>
Subject: Re: Spark code to write to MySQL and Hive

 

Hi,

 

I haven't checked my answer (too lazy today), but think I know what might be going on.

 

tl;dr Use cache to preserve the initial set of rows from mysql

 

After you append new rows, you will have twice as many rows as you had previously. Correct?

 

Since newDF references the table every time you use it in a structured query, say to write it to a table, the source table will get re-loaded and hence the number of rows changes.

 

What you should do is to execute newDF.cache.count right after val newDF = mysqlDF.select... so the data (rows) remains on executors and won't get reloaded.

 

Hope that helps.

 

Pozdrawiam,

Jacek Laskowski

----

https://about.me/JacekLaskowski

Mastering Spark SQL https://bit.ly/mastering-spark-sql

Spark Structured Streaming  <https://bit.ly/spark-structured-streaming> https://bit.ly/spark-structured-streaming

Mastering Kafka Streams https://bit.ly/mastering-kafka-streams

Follow me at https://twitter.com/jaceklaskowski


 

 

On Wed, Aug 29, 2018 at 4:59 PM <ryandam.9@gmail.com <ma...@gmail.com> > wrote:

Sorry, last mail format was not good. 

 


println("Going to talk to mySql")

// Read table from mySQL.
val mysqlDF = spark.read.jdbc(jdbcUrl, table, properties)
println("I am back from mySql")

mysqlDF.show()

// Create a new Dataframe with column 'id' increased to avoid Duplicate primary keys
val newDF = mysqlDF.select((col("id") + 10).as("id"), col("country"), col("city"))
newDF.printSchema()
newDF.show()

// Insert records into the table.
newDF.write
  .mode(SaveMode.Append)
  .jdbc(jdbcUrl, table, properties)

// Write to Hive - This Creates a new table.
newDF.write.saveAsTable("cities")
newDF.show()

 

 

 

Going to talk to mySql

I am back from mySql

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

|  1|           USA|Palo Alto|

|  2|Czech Republic|     Brno|

|  3|           USA|Sunnyvale|

|  4|          null|     null|

+---+--------------+---------+

 

root

|-- id: long (nullable = false)

|-- country: string (nullable = true)

|-- city: string (nullable = true)

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

| 11|           USA|Palo Alto|

| 12|Czech Republic|     Brno|

| 13|           USA|Sunnyvale|

| 14|          null|     null|

+---+--------------+---------+

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

| 11|           USA|Palo Alto|

| 12|Czech Republic|     Brno|

| 13|           USA|Sunnyvale|

| 14|          null|     null|

| 24|          null|     null|

| 23|           USA|Sunnyvale|

| 22|Czech Republic|     Brno|

| 21|           USA|Palo Alto|

+---+--------------+---------+

 

Thanks,

Ravi

 

From: ryandam.9@gmail.com <ma...@gmail.com>  <ryandam.9@gmail.com <ma...@gmail.com> > 
Sent: Wednesday, August 29, 2018 8:19 PM
To: user@spark.apache.org <ma...@spark.apache.org> 
Subject: Spark code to write to MySQL and Hive

 

Hi,

 

Can anyone help me to understand what is happening with my code ?

 

I wrote a Spark application to read from a MySQL table [that already has 4 records], Create a new DF by adding 10 to the ID field.  Then, I wanted to write the new DF to MySQL as well as to Hive. 

 

I am surprised to see additional set of records in Hive !! I am not able to understand how the newDF has records with IDs 21 to 24.  I know that a DF is immutable. If so, how come it has 4 records at one point and 8 records at later point ?

 


// Read table from mySQL.
val mysqlDF = spark.read.jdbc(jdbcUrl, table, properties)
println("I am back from mySql")

 

 

 

 

mysqlDF.show()

 

 

 

 

 

// Create a new Dataframe with column 'id' increased to avoid Duplicate primary keys
val newDF = mysqlDF.select((col("id") + 10).as("id"), col("country"), col("city"))
newDF.printSchema()
newDF.show()

 

 

// Insert records into the MySQL table.
newDF.write
  .mode(SaveMode.Append)
  .jdbc(jdbcUrl, table, properties)

 

// Write to Hive - This Creates a new table.
newDF.write.saveAsTable("cities")
newDF.show()

 

 

Records already existing in mySql

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

|  1|           USA|Palo Alto|

|  2|Czech Republic|     Brno|

|  3|           USA|Sunnyvale|

|  4|          null|     null|

+---+--------------+---------+

 

root

|-- id: long (nullable = false)

|-- country: string (nullable = true)

|-- city: string (nullable = true)

 

newDF.show()

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

| 11|           USA|Palo Alto|

| 12|Czech Republic|     Brno|

| 13|           USA|Sunnyvale|

| 14|          null|     null|

+---+--------------+---------+

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

| 11|           USA|Palo Alto|

| 12|Czech Republic|     Brno|

| 13|           USA|Sunnyvale|

| 14|          null|     null|

| 24|          null|     null|

| 23|           USA|Sunnyvale|

| 22|Czech Republic|     Brno|

| 21|           USA|Palo Alto|

+---+--------------+---------+

 

 

Thanks for you time. 

Ravi


Re: Spark code to write to MySQL and Hive

Posted by Jacek Laskowski <ja...@japila.pl>.
Hi,

I haven't checked my answer (too lazy today), but think I know what might
be going on.

tl;dr Use cache to preserve the initial set of rows from mysql

After you append new rows, you will have twice as many rows as you had
previously. Correct?

Since newDF references the table every time you use it in a structured
query, say to write it to a table, the source table will get re-loaded and
hence the number of rows changes.

What you should do is to execute newDF.cache.count right after val newDF =
mysqlDF.select... so the data (rows) remains on executors and won't get
reloaded.

Hope that helps.

Pozdrawiam,
Jacek Laskowski
----
https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski


On Wed, Aug 29, 2018 at 4:59 PM <ry...@gmail.com> wrote:

> Sorry, last mail format was not good.
>
>
>
> *println*(*"Going to talk to mySql"*)
>
>
> *// Read table from mySQL.**val *mysqlDF = spark.read.jdbc(jdbcUrl,
> table, properties)
> *println*(*"I am back from mySql"*)
>
> mysqlDF.show()
>
>
> *// Create a new Dataframe with column 'id' increased to avoid Duplicate
> primary keys**val *newDF = mysqlDF.select((*col*(*"id"*) + 10).as(*"id"*),
> *col*(*"country"*), *col*(*"city"*))
> newDF.printSchema()
> newDF.show()
>
>
> *// Insert records into the table.*newDF.write
>   .mode(SaveMode.*Append*)
>   .jdbc(jdbcUrl, table, properties)
>
>
> *// Write to Hive - This Creates a new table.*newDF.write.saveAsTable(
> *"cities"*)
> newDF.show()
>
>
>
>
>
>
>
> Going to talk to mySql
>
> I am back from mySql
>
> +---+--------------+---------+
>
> | id|       country|     city|
>
> +---+--------------+---------+
>
> |  1|           USA|Palo Alto|
>
> |  2|Czech Republic|     Brno|
>
> |  3|           USA|Sunnyvale|
>
> |  4|          null|     null|
>
> +---+--------------+---------+
>
>
>
> root
>
> |-- id: long (nullable = false)
>
> |-- country: string (nullable = true)
>
> |-- city: string (nullable = true)
>
>
>
> +---+--------------+---------+
>
> | id|       country|     city|
>
> +---+--------------+---------+
>
> | 11|           USA|Palo Alto|
>
> | 12|Czech Republic|     Brno|
>
> | 13|           USA|Sunnyvale|
>
> | 14|          null|     null|
>
> +---+--------------+---------+
>
>
>
> +---+--------------+---------+
>
> | id|       country|     city|
>
> +---+--------------+---------+
>
> | 11|           USA|Palo Alto|
>
> | 12|Czech Republic|     Brno|
>
> | 13|           USA|Sunnyvale|
>
> | 14|          null|     null|
>
> | 24|          null|     null|
>
> | 23|           USA|Sunnyvale|
>
> | 22|Czech Republic|     Brno|
>
> | 21|           USA|Palo Alto|
>
> +---+--------------+---------+
>
>
>
> Thanks,
>
> Ravi
>
>
>
> *From:* ryandam.9@gmail.com <ry...@gmail.com>
> *Sent:* Wednesday, August 29, 2018 8:19 PM
> *To:* user@spark.apache.org
> *Subject:* Spark code to write to MySQL and Hive
>
>
>
> Hi,
>
>
>
> Can anyone help me to understand what is happening with my code ?
>
>
>
> I wrote a Spark application to read from a MySQL table [that already has 4
> records], Create a new DF by adding 10 to the ID field.  Then, I wanted to
> write the new DF to MySQL as well as to Hive.
>
>
>
> I am surprised to see additional set of records in Hive !! I am not able
> to understand how the *newDF *has records with IDs 21 to 24.  I know that
> a DF is immutable. If so, how come it has 4 records at one point and 8
> records at later point ?
>
>
>
>
> *// Read table from mySQL.**val *mysqlDF = spark.read.jdbc(jdbcUrl,
> table, properties)
> *println*(*"I am back from mySql"*)
>
>
>
>
>
>
>
>
>
> mysqlDF.show()
>
>
>
>
>
>
>
>
>
>
>
>
> *// Create a new Dataframe with column 'id' increased to avoid Duplicate
> primary keys**val *newDF = mysqlDF.select((*col*(*"id"*) + 10).as(*"id"*),
> *col*(*"country"*), *col*(*"city"*))
> newDF.printSchema()
> newDF.show()
>
>
>
>
>
>
> *// Insert records into the MySQL table.*newDF.write
>   .mode(SaveMode.*Append*)
>   .jdbc(jdbcUrl, table, properties)
>
>
>
>
> *// Write to Hive - This Creates a new table.*newDF.write.saveAsTable(
> *"cities"*)
> newDF.show()
>
>
>
>
>
> *Records already existing in mySql*
>
>
>
> +---+--------------+---------+
>
> | id|       country|     city|
>
> +---+--------------+---------+
>
> |  1|           USA|Palo Alto|
>
> |  2|Czech Republic|     Brno|
>
> |  3|           USA|Sunnyvale|
>
> |  4|          null|     null|
>
> +---+--------------+---------+
>
>
>
> root
>
> |-- id: long (nullable = false)
>
> |-- country: string (nullable = true)
>
> |-- city: string (nullable = true)
>
>
>
> *newDF.show()*
>
>
>
> +---+--------------+---------+
>
> | id|       country|     city|
>
> +---+--------------+---------+
>
> | 11|           USA|Palo Alto|
>
> | 12|Czech Republic|     Brno|
>
> | 13|           USA|Sunnyvale|
>
> | 14|          null|     null|
>
> +---+--------------+---------+
>
>
>
> +---+--------------+---------+
>
> | id|       country|     city|
>
> +---+--------------+---------+
>
> | 11|           USA|Palo Alto|
>
> | 12|Czech Republic|     Brno|
>
> | 13|           USA|Sunnyvale|
>
> | 14|          null|     null|
>
> | 24|          null|     null|
>
> | 23|           USA|Sunnyvale|
>
> | 22|Czech Republic|     Brno|
>
> | 21|           USA|Palo Alto|
>
> +---+--------------+---------+
>
>
>
>
>
> Thanks for you time.
>
> Ravi
>

RE: Spark code to write to MySQL and Hive

Posted by ry...@gmail.com.
Sorry, last mail format was not good. 

 


println("Going to talk to mySql")

// Read table from mySQL.
val mysqlDF = spark.read.jdbc(jdbcUrl, table, properties)
println("I am back from mySql")

mysqlDF.show()

// Create a new Dataframe with column 'id' increased to avoid Duplicate
primary keys
val newDF = mysqlDF.select((col("id") + 10).as("id"), col("country"),
col("city"))
newDF.printSchema()
newDF.show()

// Insert records into the table.
newDF.write
  .mode(SaveMode.Append)
  .jdbc(jdbcUrl, table, properties)

// Write to Hive - This Creates a new table.
newDF.write.saveAsTable("cities")
newDF.show()

 

 

 

Going to talk to mySql

I am back from mySql

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

|  1|           USA|Palo Alto|

|  2|Czech Republic|     Brno|

|  3|           USA|Sunnyvale|

|  4|          null|     null|

+---+--------------+---------+

 

root

|-- id: long (nullable = false)

|-- country: string (nullable = true)

|-- city: string (nullable = true)

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

| 11|           USA|Palo Alto|

| 12|Czech Republic|     Brno|

| 13|           USA|Sunnyvale|

| 14|          null|     null|

+---+--------------+---------+

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

| 11|           USA|Palo Alto|

| 12|Czech Republic|     Brno|

| 13|           USA|Sunnyvale|

| 14|          null|     null|

| 24|          null|     null|

| 23|           USA|Sunnyvale|

| 22|Czech Republic|     Brno|

| 21|           USA|Palo Alto|

+---+--------------+---------+

 

Thanks,

Ravi

 

From: ryandam.9@gmail.com <ry...@gmail.com> 
Sent: Wednesday, August 29, 2018 8:19 PM
To: user@spark.apache.org
Subject: Spark code to write to MySQL and Hive

 

Hi,

 

Can anyone help me to understand what is happening with my code ?

 

I wrote a Spark application to read from a MySQL table [that already has 4
records], Create a new DF by adding 10 to the ID field.  Then, I wanted to
write the new DF to MySQL as well as to Hive. 

 

I am surprised to see additional set of records in Hive !! I am not able to
understand how the newDF has records with IDs 21 to 24.  I know that a DF is
immutable. If so, how come it has 4 records at one point and 8 records at
later point ?

 


// Read table from mySQL.
val mysqlDF = spark.read.jdbc(jdbcUrl, table, properties)
println("I am back from mySql")



 

 

 

 

mysqlDF.show()



 

 

 

 

 

// Create a new Dataframe with column 'id' increased to avoid Duplicate
primary keys
val newDF = mysqlDF.select((col("id") + 10).as("id"), col("country"),
col("city"))
newDF.printSchema()
newDF.show()



 

 

// Insert records into the MySQL table.
newDF.write
  .mode(SaveMode.Append)
  .jdbc(jdbcUrl, table, properties)



 

// Write to Hive - This Creates a new table.
newDF.write.saveAsTable("cities")
newDF.show()

 

 

Records already existing in mySql

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

|  1|           USA|Palo Alto|

|  2|Czech Republic|     Brno|

|  3|           USA|Sunnyvale|

|  4|          null|     null|

+---+--------------+---------+

 

root

|-- id: long (nullable = false)

|-- country: string (nullable = true)

|-- city: string (nullable = true)

 

newDF.show()

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

| 11|           USA|Palo Alto|

| 12|Czech Republic|     Brno|

| 13|           USA|Sunnyvale|

| 14|          null|     null|

+---+--------------+---------+

 

+---+--------------+---------+

| id|       country|     city|

+---+--------------+---------+

| 11|           USA|Palo Alto|

| 12|Czech Republic|     Brno|

| 13|           USA|Sunnyvale|

| 14|          null|     null|

| 24|          null|     null|

| 23|           USA|Sunnyvale|

| 22|Czech Republic|     Brno|

| 21|           USA|Palo Alto|

+---+--------------+---------+

 

 

Thanks for you time. 

Ravi