You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hive.apache.org by Mich Talebzadeh <mi...@gmail.com> on 2016/04/07 00:03:31 UTC

How to use Spark JDBC to read from RDBMS table, create Hive ORC table and save RDBMS data in it

Hi,

There was a question on the merits of using Sqoop to ingest data from
Oracle table to Hive.

The issue is that Sqoop reverts to MapReduce when getting data into Hive
which is not that great. One can do IMO better by using JDBC connection
(which is identical with what Sqoop does anyway but crucially using Spark
faster processing. So I did this test.

Basically


   1. Create sqlContext based on HiveContext
   2. Use JDBC to get Oracle table data
   3. Register  data as temporary table
   4. Ensure that you cater for compatibility issues. For example an Oracle
   column of type NUMBER is translated as decimal(38,10) in JDBC. That will
   cause "Overflowed precision"  error in Spark! Convert it to TO_CHAR in JDBC
   (see below)
   5. Create ORC table in a give Hive database
   6. Insert/select from temp table to ORC table

Contrary to belief in Spark you can create an ORC table in Hive and will
work fine. You can also choose which database in Hive to create your table
in. Just to be clear I used Spark 1.6 with Hive 2.

I had to use sbt or Maven to build a project for this purpose but it works.

This is a sample code in Scala getting just under 1 million rows from
Oracle table.

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
//
object Create_and_populate_table_JDBC {
  def main(args: Array[String]) {
//
  val conf = new SparkConf().
               setAppName("Create_and_populate_table_JDBC").
               setMaster("local[12]").
               set("spark.driver.allowMultipleContexts", "true").
               set("spark.hadoop.validateOutputSpecs", "false")
  val sc = new SparkContext(conf)
  // Create sqlContext based on HiveContext
  val sqlContext = new HiveContext(sc)
  import sqlContext.implicits._
  val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
  println ("\nStarted at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
//
  var _ORACLEserver : String = "jdbc:oracle:thin:@rhes564:1521:mydb"
  var _username : String = "sh"
  var _password : String = "xxxxxx"
//
  val s = HiveContext.load("jdbc",
  Map("url" -> _ORACLEserver,
  "dbtable" -> "(SELECT to_char(PROD_ID) AS PROD_ID, to_char(CUST_ID) AS
CUST_ID, to_char(TIME_ID) AS TIME_ID, to_char(CHANNEL_ID) AS CHANNEL_ID,
to_char(PROMO_ID) AS PROMO_ID, QUANTITY_SOLD, AMOUNT_SOLD FROM sh.sales)",
  "user" -> _username,
  "password" -> _password))
//
  s.registerTempTable("tmp")
// Need to create and populate target ORC table sales in database test in
Hive
//
  HiveContext.sql("use test")
//
// Drop and create table sales in Hive test database
//
  HiveContext.sql("DROP TABLE IF EXISTS test.sales")
  var sqltext : String = ""
  sqltext = """
  CREATE TABLE test.sales
 (
  PROD_ID        bigint                       ,
  CUST_ID        bigint                       ,
  TIME_ID        timestamp                    ,
  CHANNEL_ID     bigint                       ,
  PROMO_ID       bigint                       ,
  QUANTITY_SOLD  decimal(10)                  ,
  AMOUNT_SOLD    decimal(10)
)
CLUSTERED BY (PROD_ID,CUST_ID,TIME_ID,CHANNEL_ID,PROMO_ID) INTO 256 BUCKETS
STORED AS ORC
TBLPROPERTIES ( "orc.compress"="SNAPPY",
"orc.create.index"="true",
"orc.bloom.filter.columns"="PROD_ID,CUST_ID,TIME_ID,CHANNEL_ID,PROMO_ID",
"orc.bloom.filter.fpp"="0.05",
"orc.stripe.size"="268435456",
"orc.row.index.stride"="10000" )
"""
  HiveContext.sql(sqltext)
// Put data in Hive table.
//
  sqltext = """
INSERT INTO TABLE test.sales
SELECT
        *
FROM tmp
"""
  HiveContext.sql(sqltext)

  HiveContext.sql("select count(1) from
test.sales").collect.foreach(println)
  println ("\nFinished at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
  }
}

Started at
[06/04/2016 23:17:49.49]
[918843]
Finished at
[06/04/2016 23:18:05.05]


HTH

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com