You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hive.apache.org by Mich Talebzadeh <mi...@gmail.com> on 2016/04/07 00:03:31 UTC
How to use Spark JDBC to read from RDBMS table, create Hive ORC table
and save RDBMS data in it
Hi,
There was a question on the merits of using Sqoop to ingest data from
Oracle table to Hive.
The issue is that Sqoop reverts to MapReduce when getting data into Hive
which is not that great. One can do IMO better by using JDBC connection
(which is identical with what Sqoop does anyway but crucially using Spark
faster processing. So I did this test.
Basically
1. Create sqlContext based on HiveContext
2. Use JDBC to get Oracle table data
3. Register data as temporary table
4. Ensure that you cater for compatibility issues. For example an Oracle
column of type NUMBER is translated as decimal(38,10) in JDBC. That will
cause "Overflowed precision" error in Spark! Convert it to TO_CHAR in JDBC
(see below)
5. Create ORC table in a give Hive database
6. Insert/select from temp table to ORC table
Contrary to belief in Spark you can create an ORC table in Hive and will
work fine. You can also choose which database in Hive to create your table
in. Just to be clear I used Spark 1.6 with Hive 2.
I had to use sbt or Maven to build a project for this purpose but it works.
This is a sample code in Scala getting just under 1 million rows from
Oracle table.
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
//
object Create_and_populate_table_JDBC {
def main(args: Array[String]) {
//
val conf = new SparkConf().
setAppName("Create_and_populate_table_JDBC").
setMaster("local[12]").
set("spark.driver.allowMultipleContexts", "true").
set("spark.hadoop.validateOutputSpecs", "false")
val sc = new SparkContext(conf)
// Create sqlContext based on HiveContext
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._
val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("\nStarted at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
//
var _ORACLEserver : String = "jdbc:oracle:thin:@rhes564:1521:mydb"
var _username : String = "sh"
var _password : String = "xxxxxx"
//
val s = HiveContext.load("jdbc",
Map("url" -> _ORACLEserver,
"dbtable" -> "(SELECT to_char(PROD_ID) AS PROD_ID, to_char(CUST_ID) AS
CUST_ID, to_char(TIME_ID) AS TIME_ID, to_char(CHANNEL_ID) AS CHANNEL_ID,
to_char(PROMO_ID) AS PROMO_ID, QUANTITY_SOLD, AMOUNT_SOLD FROM sh.sales)",
"user" -> _username,
"password" -> _password))
//
s.registerTempTable("tmp")
// Need to create and populate target ORC table sales in database test in
Hive
//
HiveContext.sql("use test")
//
// Drop and create table sales in Hive test database
//
HiveContext.sql("DROP TABLE IF EXISTS test.sales")
var sqltext : String = ""
sqltext = """
CREATE TABLE test.sales
(
PROD_ID bigint ,
CUST_ID bigint ,
TIME_ID timestamp ,
CHANNEL_ID bigint ,
PROMO_ID bigint ,
QUANTITY_SOLD decimal(10) ,
AMOUNT_SOLD decimal(10)
)
CLUSTERED BY (PROD_ID,CUST_ID,TIME_ID,CHANNEL_ID,PROMO_ID) INTO 256 BUCKETS
STORED AS ORC
TBLPROPERTIES ( "orc.compress"="SNAPPY",
"orc.create.index"="true",
"orc.bloom.filter.columns"="PROD_ID,CUST_ID,TIME_ID,CHANNEL_ID,PROMO_ID",
"orc.bloom.filter.fpp"="0.05",
"orc.stripe.size"="268435456",
"orc.row.index.stride"="10000" )
"""
HiveContext.sql(sqltext)
// Put data in Hive table.
//
sqltext = """
INSERT INTO TABLE test.sales
SELECT
*
FROM tmp
"""
HiveContext.sql(sqltext)
HiveContext.sql("select count(1) from
test.sales").collect.foreach(println)
println ("\nFinished at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
}
}
Started at
[06/04/2016 23:17:49.49]
[918843]
Finished at
[06/04/2016 23:18:05.05]
HTH
Dr Mich Talebzadeh
LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com