You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mich Talebzadeh <mi...@gmail.com> on 2016/06/20 20:35:02 UTC
Saving data using tempTable versus save() method
Hi,
I have a DF based on a table and sorted and shown below
This is fine and when I register as tempTable I can populate the underlying
table sales 2 in Hive. That sales2 is an ORC table
val s = HiveContext.table("sales_staging")
val sorted = s.sort("prod_id","cust_id","time_id","channel_id","promo_id")
sorted.registerTempTable("tmp")
sqltext = """
INSERT INTO TABLE oraclehadoop.sales2
SELECT
PROD_ID
, CUST_ID
, TIME_ID
, CHANNEL_ID
, PROMO_ID
, QUANTITY_SOLD
, AMOUNT_SOLD
FROM tmp
"""
HiveContext.sql(sqltext)
HiveContext.sql("select count(1) from oraclehadoop.sales2").show
HiveContext.sql("truncate table oraclehadoop.sales2")
sorted.save("oraclehadoop.sales2")
HiveContext.sql("select count(1) from oraclehadoop.sales2").show
When I truncate the Hive table and use sorted.save("oraclehadoop.sales2")
It does not save any data
Started at
[20/06/2016 21:21:57.57]
+------+
| _c0|
*+------+|918843| // This works+------+*
[Stage 7:============================================> (3 + 1)
/ 4]SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
details.
+---+
|_c0|
*+---+| 0| // This does not+---+*
Finished at
[20/06/2016 21:22:30.30]
Any ideas if anyone has seen this before?
The issue is saving data. Saving through tempTable works but the other one
does not work.
Thanks
Dr Mich Talebzadeh
LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
Re: Saving data using tempTable versus save() method
Posted by Mich Talebzadeh <mi...@gmail.com>.
thanks Robin.
This is data from Hive (source) to Hive (Target) via Spark. The database in
Hive is called oraclehadoop (mainly used to import data from Oracle in the
first place)
I am very sceptical of these methods in Spark pertaining to store data in
Hive database. I all probability they just store it on HDFS somewhere.
With SQL itself one is in control. You can create the target table the
way you want it and store it there. With these available methods it is all
black box.
So I was just trying to explore another implementation method if I could.
May be the way it expects the target ORC table is different from the way I
have tailored it
object ETL_sales2 {
def main(args: Array[String]) {
val conf = new SparkConf().
setAppName("ETL_sales2").
set("spark.driver.allowMultipleContexts", "true")
val sc = new SparkContext(conf)
// Create sqlContext based on HiveContext
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._
val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("\nStarted at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("use oraclehadoop")
val s = HiveContext.table("sales_staging")
val sorted = s.sort("prod_id","cust_id","time_id","channel_id","promo_id")
HiveContext.sql("DROP TABLE IF EXISTS oraclehadoop.sales2")
// clean up the files in HDFS directory first if exist
val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new
java.net.URI("hdfs://rhes564:9000"), hadoopConf)
val output1 = "hdfs://rhes564:9000/user/hduser/oraclehadoop.sales2"
try { hdfs.delete(new org.apache.hadoop.fs.Path(output1), true) } catch {
case _ : Throwable => { } }
var sqltext: String = ""
sqltext =
"""
CREATE TABLE IF NOT EXISTS oraclehadoop.sales2
(
PROD_ID bigint ,
CUST_ID bigint ,
TIME_ID timestamp ,
CHANNEL_ID bigint ,
PROMO_ID bigint ,
QUANTITY_SOLD decimal(10) ,
AMOUNT_SOLD decimal(10)
)
CLUSTERED BY (PROD_ID,CUST_ID,TIME_ID,CHANNEL_ID,PROMO_ID) INTO 256
BUCKETS
STORED AS ORC
TBLPROPERTIES ( "orc.compress"="SNAPPY",
"orc.create.index"="true",
"orc.bloom.filter.columns"="PROD_ID,CUST_ID,TIME_ID,CHANNEL_ID,PROMO_ID",
"orc.bloom.filter.fpp"="0.05",
"orc.stripe.size"="268435456",
"orc.row.index.stride"="10000")
"""
HiveContext.sql(sqltext)
//sorted.count
sorted.registerTempTable("tmp")
sqltext = """
INSERT INTO TABLE oraclehadoop.sales2
SELECT
PROD_ID
, CUST_ID
, TIME_ID
, CHANNEL_ID
, PROMO_ID
, QUANTITY_SOLD
, AMOUNT_SOLD
FROM tmp
"""
HiveContext.sql(sqltext)
HiveContext.sql("select count(1) from oraclehadoop.sales2").show
HiveContext.sql("truncate table oraclehadoop.sales2")
sorted.save("oraclehadoop.sales2")
HiveContext.sql("select count(1) from oraclehadoop.sales2").show
println ("\nFinished at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
sys.exit()
}
}
Dr Mich Talebzadeh
LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
On 21 June 2016 at 10:03, Robin East <ro...@xense.co.uk> wrote:
> if you are able to trace the underlying oracle session you can see whether
> a commit has been called or not.
>
>
>
>
> On 21 Jun 2016, at 09:57, Robin East <ro...@xense.co.uk> wrote:
>
> I’m not sure - I don’t know what those APIs do under the hood. It simply
> rang a bell with something I have fallen foul of in the past (not with
> Spark though) - have wasted many hours forgetting to commit and then
> scratching my head as why my data is not persisting.
>
>
>
>
> On 21 Jun 2016, at 09:20, Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
> that is a very interesting point. I am not sure. how can I do that with
>
> sorted.save("oraclehadoop.sales2")
>
> like .. commit?
>
> thanks
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 21 June 2016 at 08:56, Robin East <ro...@xense.co.uk> wrote:
>
>> random thought - do you need an explicit commit with the 2nd method?
>>
>>
>>
>>
>> On 20 Jun 2016, at 21:35, Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>> Hi,
>>
>> I have a DF based on a table and sorted and shown below
>>
>> This is fine and when I register as tempTable I can populate the
>> underlying table sales 2 in Hive. That sales2 is an ORC table
>>
>> val s = HiveContext.table("sales_staging")
>> val sorted =
>> s.sort("prod_id","cust_id","time_id","channel_id","promo_id")
>> sorted.registerTempTable("tmp")
>> sqltext = """
>> INSERT INTO TABLE oraclehadoop.sales2
>> SELECT
>> PROD_ID
>> , CUST_ID
>> , TIME_ID
>> , CHANNEL_ID
>> , PROMO_ID
>> , QUANTITY_SOLD
>> , AMOUNT_SOLD
>> FROM tmp
>> """
>> HiveContext.sql(sqltext)
>> HiveContext.sql("select count(1) from oraclehadoop.sales2").show
>> HiveContext.sql("truncate table oraclehadoop.sales2")
>>
>> sorted.save("oraclehadoop.sales2")
>> HiveContext.sql("select count(1) from oraclehadoop.sales2").show
>>
>> When I truncate the Hive table and use sorted.save("oraclehadoop.sales2")
>>
>>
>> It does not save any data
>>
>> Started at
>> [20/06/2016 21:21:57.57]
>> +------+
>> | _c0|
>>
>>
>> *+------+|918843| // This works+------+*
>> [Stage 7:============================================> (3 +
>> 1) / 4]SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
>> SLF4J: Defaulting to no-operation (NOP) logger implementation
>> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
>> further details.
>> +---+
>> |_c0|
>>
>>
>> *+---+| 0| // This does not+---+*
>> Finished at
>> [20/06/2016 21:22:30.30]
>>
>> Any ideas if anyone has seen this before?
>>
>>
>> The issue is saving data. Saving through tempTable works but the other
>> one does not work.
>>
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>>
>>
>
>
>
Re: Saving data using tempTable versus save() method
Posted by Robin East <ro...@xense.co.uk>.
if you are able to trace the underlying oracle session you can see whether a commit has been called or not.
> On 21 Jun 2016, at 09:57, Robin East <ro...@xense.co.uk> wrote:
>
> I’m not sure - I don’t know what those APIs do under the hood. It simply rang a bell with something I have fallen foul of in the past (not with Spark though) - have wasted many hours forgetting to commit and then scratching my head as why my data is not persisting.
>
>
>
>
>> On 21 Jun 2016, at 09:20, Mich Talebzadeh <mich.talebzadeh@gmail.com <ma...@gmail.com>> wrote:
>>
>> that is a very interesting point. I am not sure. how can I do that with
>>
>> sorted.save("oraclehadoop.sales2")
>>
>> like .. commit?
>>
>> thanks
>>
>> Dr Mich Talebzadeh
>>
>> LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>>
>> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>>
>>
>> On 21 June 2016 at 08:56, Robin East <robin.east@xense.co.uk <ma...@xense.co.uk>> wrote:
>> random thought - do you need an explicit commit with the 2nd method?
>>
>>
>>
>>
>>> On 20 Jun 2016, at 21:35, Mich Talebzadeh <mich.talebzadeh@gmail.com <ma...@gmail.com>> wrote:
>>>
>>> Hi,
>>>
>>> I have a DF based on a table and sorted and shown below
>>>
>>> This is fine and when I register as tempTable I can populate the underlying table sales 2 in Hive. That sales2 is an ORC table
>>>
>>> val s = HiveContext.table("sales_staging")
>>> val sorted = s.sort("prod_id","cust_id","time_id","channel_id","promo_id")
>>> sorted.registerTempTable("tmp")
>>> sqltext = """
>>> INSERT INTO TABLE oraclehadoop.sales2
>>> SELECT
>>> PROD_ID
>>> , CUST_ID
>>> , TIME_ID
>>> , CHANNEL_ID
>>> , PROMO_ID
>>> , QUANTITY_SOLD
>>> , AMOUNT_SOLD
>>> FROM tmp
>>> """
>>> HiveContext.sql(sqltext)
>>> HiveContext.sql("select count(1) from oraclehadoop.sales2").show
>>> HiveContext.sql("truncate table oraclehadoop.sales2")
>>>
>>> sorted.save("oraclehadoop.sales2")
>>> HiveContext.sql("select count(1) from oraclehadoop.sales2").show
>>>
>>> When I truncate the Hive table and use sorted.save("oraclehadoop.sales2")
>>>
>>> It does not save any data
>>>
>>> Started at
>>> [20/06/2016 21:21:57.57]
>>> +------+
>>> | _c0|
>>> +------+
>>> |918843| // This works
>>> +------+
>>> [Stage 7:============================================> (3 + 1) / 4]SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
>>> SLF4J: Defaulting to no-operation (NOP) logger implementation
>>> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder <http://www.slf4j.org/codes.html#StaticLoggerBinder> for further details.
>>> +---+
>>> |_c0|
>>> +---+
>>> | 0| // This does not
>>> +---+
>>> Finished at
>>> [20/06/2016 21:22:30.30]
>>>
>>> Any ideas if anyone has seen this before?
>>>
>>>
>>> The issue is saving data. Saving through tempTable works but the other one does not work.
>>>
>>>
>>> Thanks
>>>
>>> Dr Mich Talebzadeh
>>>
>>> LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>>>
>>> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>>>
>>
>>
>
Re: Saving data using tempTable versus save() method
Posted by Robin East <ro...@xense.co.uk>.
I’m not sure - I don’t know what those APIs do under the hood. It simply rang a bell with something I have fallen foul of in the past (not with Spark though) - have wasted many hours forgetting to commit and then scratching my head as why my data is not persisting.
> On 21 Jun 2016, at 09:20, Mich Talebzadeh <mi...@gmail.com> wrote:
>
> that is a very interesting point. I am not sure. how can I do that with
>
> sorted.save("oraclehadoop.sales2")
>
> like .. commit?
>
> thanks
>
> Dr Mich Talebzadeh
>
> LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>
>
> On 21 June 2016 at 08:56, Robin East <robin.east@xense.co.uk <ma...@xense.co.uk>> wrote:
> random thought - do you need an explicit commit with the 2nd method?
>
>
>
>
>> On 20 Jun 2016, at 21:35, Mich Talebzadeh <mich.talebzadeh@gmail.com <ma...@gmail.com>> wrote:
>>
>> Hi,
>>
>> I have a DF based on a table and sorted and shown below
>>
>> This is fine and when I register as tempTable I can populate the underlying table sales 2 in Hive. That sales2 is an ORC table
>>
>> val s = HiveContext.table("sales_staging")
>> val sorted = s.sort("prod_id","cust_id","time_id","channel_id","promo_id")
>> sorted.registerTempTable("tmp")
>> sqltext = """
>> INSERT INTO TABLE oraclehadoop.sales2
>> SELECT
>> PROD_ID
>> , CUST_ID
>> , TIME_ID
>> , CHANNEL_ID
>> , PROMO_ID
>> , QUANTITY_SOLD
>> , AMOUNT_SOLD
>> FROM tmp
>> """
>> HiveContext.sql(sqltext)
>> HiveContext.sql("select count(1) from oraclehadoop.sales2").show
>> HiveContext.sql("truncate table oraclehadoop.sales2")
>>
>> sorted.save("oraclehadoop.sales2")
>> HiveContext.sql("select count(1) from oraclehadoop.sales2").show
>>
>> When I truncate the Hive table and use sorted.save("oraclehadoop.sales2")
>>
>> It does not save any data
>>
>> Started at
>> [20/06/2016 21:21:57.57]
>> +------+
>> | _c0|
>> +------+
>> |918843| // This works
>> +------+
>> [Stage 7:============================================> (3 + 1) / 4]SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
>> SLF4J: Defaulting to no-operation (NOP) logger implementation
>> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder <http://www.slf4j.org/codes.html#StaticLoggerBinder> for further details.
>> +---+
>> |_c0|
>> +---+
>> | 0| // This does not
>> +---+
>> Finished at
>> [20/06/2016 21:22:30.30]
>>
>> Any ideas if anyone has seen this before?
>>
>>
>> The issue is saving data. Saving through tempTable works but the other one does not work.
>>
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>> LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>>
>> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>>
>
>
Re: Saving data using tempTable versus save() method
Posted by Mich Talebzadeh <mi...@gmail.com>.
that is a very interesting point. I am not sure. how can I do that with
sorted.save("oraclehadoop.sales2")
like .. commit?
thanks
Dr Mich Talebzadeh
LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
On 21 June 2016 at 08:56, Robin East <ro...@xense.co.uk> wrote:
> random thought - do you need an explicit commit with the 2nd method?
>
>
>
>
> On 20 Jun 2016, at 21:35, Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
> Hi,
>
> I have a DF based on a table and sorted and shown below
>
> This is fine and when I register as tempTable I can populate the
> underlying table sales 2 in Hive. That sales2 is an ORC table
>
> val s = HiveContext.table("sales_staging")
> val sorted =
> s.sort("prod_id","cust_id","time_id","channel_id","promo_id")
> sorted.registerTempTable("tmp")
> sqltext = """
> INSERT INTO TABLE oraclehadoop.sales2
> SELECT
> PROD_ID
> , CUST_ID
> , TIME_ID
> , CHANNEL_ID
> , PROMO_ID
> , QUANTITY_SOLD
> , AMOUNT_SOLD
> FROM tmp
> """
> HiveContext.sql(sqltext)
> HiveContext.sql("select count(1) from oraclehadoop.sales2").show
> HiveContext.sql("truncate table oraclehadoop.sales2")
>
> sorted.save("oraclehadoop.sales2")
> HiveContext.sql("select count(1) from oraclehadoop.sales2").show
>
> When I truncate the Hive table and use sorted.save("oraclehadoop.sales2")
>
> It does not save any data
>
> Started at
> [20/06/2016 21:21:57.57]
> +------+
> | _c0|
>
>
> *+------+|918843| // This works+------+*
> [Stage 7:============================================> (3 +
> 1) / 4]SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
> details.
> +---+
> |_c0|
>
>
> *+---+| 0| // This does not+---+*
> Finished at
> [20/06/2016 21:22:30.30]
>
> Any ideas if anyone has seen this before?
>
>
> The issue is saving data. Saving through tempTable works but the other one
> does not work.
>
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
> http://talebzadehmich.wordpress.com
>
>
>
>
>
Re: Saving data using tempTable versus save() method
Posted by Robin East <ro...@xense.co.uk>.
random thought - do you need an explicit commit with the 2nd method?
> On 20 Jun 2016, at 21:35, Mich Talebzadeh <mi...@gmail.com> wrote:
>
> Hi,
>
> I have a DF based on a table and sorted and shown below
>
> This is fine and when I register as tempTable I can populate the underlying table sales 2 in Hive. That sales2 is an ORC table
>
> val s = HiveContext.table("sales_staging")
> val sorted = s.sort("prod_id","cust_id","time_id","channel_id","promo_id")
> sorted.registerTempTable("tmp")
> sqltext = """
> INSERT INTO TABLE oraclehadoop.sales2
> SELECT
> PROD_ID
> , CUST_ID
> , TIME_ID
> , CHANNEL_ID
> , PROMO_ID
> , QUANTITY_SOLD
> , AMOUNT_SOLD
> FROM tmp
> """
> HiveContext.sql(sqltext)
> HiveContext.sql("select count(1) from oraclehadoop.sales2").show
> HiveContext.sql("truncate table oraclehadoop.sales2")
>
> sorted.save("oraclehadoop.sales2")
> HiveContext.sql("select count(1) from oraclehadoop.sales2").show
>
> When I truncate the Hive table and use sorted.save("oraclehadoop.sales2")
>
> It does not save any data
>
> Started at
> [20/06/2016 21:21:57.57]
> +------+
> | _c0|
> +------+
> |918843| // This works
> +------+
> [Stage 7:============================================> (3 + 1) / 4]SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder <http://www.slf4j.org/codes.html#StaticLoggerBinder> for further details.
> +---+
> |_c0|
> +---+
> | 0| // This does not
> +---+
> Finished at
> [20/06/2016 21:22:30.30]
>
> Any ideas if anyone has seen this before?
>
>
> The issue is saving data. Saving through tempTable works but the other one does not work.
>
>
> Thanks
>
> Dr Mich Talebzadeh
>
> LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>