You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Hemanth Gudela <he...@qvantel.com> on 2017/04/24 12:29:34 UTC

Spark registered view in "Future" - View changes updated in "Future" are lost in main thread

Hi,

I’m trying to write a background thread using “Future” which would periodically re-register a view with latest data from underlying database table.
However, the data changes updated in “Future” thread are lost in main thread.

In the below code,

1.       In the beginning, registered view “myView” has only 1 row (1, ‘a’), that is shown as first output

+-------+-----+

|id     |value|

+-------+-----+

|1      |a    |

+-------+-----+

2.       After a minute, a background “Future” thread inserts a new row (1, ‘b’) in the database, and then re-registers “myView” with latest updates from underlying table.

a.       The second output clearly shows that “myView” in the “Future” has 2 rows
  +-------+-----+

  |id     |value|

  +-------+-----+

  |1      |a    |

  |2      |b    |

  +-------+-----+



3.       After 2 minutes, when I query “myView” in the main thread, it doesn’t show newly added row (1, ‘b”) even though “myView” has picked up the changes in “Future” thread. As you can observe, the third output shows only one row (1, ‘a’) again!

+-------+-----+

|id     |value|

+-------+-----+

|1      |a    |

+-------+-----+

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

val url = "jdbc:mysql://localhost:3306/myDb?sessionVariables=sql_mode='ANSI'" //mysql jdbc url

//set database properties
val dbProperties = new java.util.Properties
dbProperties.setProperty("user","myUser")
dbProperties.setProperty("password","myPass")

//read a database table, register a temp view and cache it
spark.read.jdbc(url,"testTable",dbProperties).createOrReplaceTempView("myView")  //register a temp view named "myView"
spark.sql("cache table myView")
spark.sql("select * from myView").show //in the beginning, myView has just 1 record
/*
+-------+-----+
|id     |value|
+-------+-----+
|1      |a    |
+-------+-----+
*/

Future { //in a background thread, insert a new row in database table, re-register temp view and refresh the cache
  Thread.sleep(1000*60) //(not necessary but) wait for a minute

  spark.sql("select 2 as id, 'b' as value").write.mode("append").jdbc(url,"myView",dbProperties)
  spark.read.jdbc(url,"testTable",dbProperties).createOrReplaceTempView("myView") //re-register "myView"
  spark.sql("cache table myView")    //refresh cache of myView again
  spark.sql("select * from myView").show  //myView now has 2 records
  /*
  +-------+-----+
  |id     |value|
  +-------+-----+
  |1      |a    |
  |2      |b    |
  +-------+-----+
  */
}
Thread.sleep(1000*60*2) //wait for 2 minutes
spark.sql("select * from myView").show //Why is myView having only 1 record!?!
/*
+-------+-----+
|id     |value|
+-------+-----+
|1      |a    |
+-------+-----+
*/

I have assumed that a temp view registered in “Future” thread is thread local, but that doesn’t seem to be the case always.
When the data source is a database table, the data changes updated in a registered view “Future” are lost in main thread. However, when the data source is parquet, the changes updated in a registered view sustain even in the main thread.

Could you please throw some light on what’s happening in the behavior of registered view when the data source is database, and why the behavior is different when data source is parquet.

Thank you (in advance ☺)
Hemanth