You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Daniel Haviv <da...@veracity-group.com> on 2015/10/08 20:51:59 UTC

Insert via HiveContext is slow

Hi,
I'm inserting into a partitioned ORC table using an insert sql statement
passed via HiveContext.
The performance I'm getting is pretty bad and I was wondering if there are
ways to speed things up.
Would saving the DF like this
df.write().mode(SaveMode.Append).partitionBy("date").saveAsTable("Tablename")
be faster ?


Thank you.
Daniel

RE: Insert via HiveContext is slow

Posted by "Cheng, Hao" <ha...@intel.com>.
I think DF performs the same as the SQL API does in the multi-inserts, if you don’t use the cached table.

Hao

From: Daniel Haviv [mailto:daniel.haviv@veracity-group.com]
Sent: Friday, October 9, 2015 3:09 PM
To: Cheng, Hao
Cc: user
Subject: Re: Insert via HiveContext is slow

Thanks Hao.
It seems like one issue.
The other issue to me seems the renaming of files at the end of the insert.
would DF.save perform the task better?

Thanks,
Daniel

On Fri, Oct 9, 2015 at 3:35 AM, Cheng, Hao <ha...@intel.com>> wrote:
I think that’s a known performance issue(Compared to Hive) of Spark SQL in multi-inserts.
A workaround is create a temp cached table for the projection first, and then do the multiple inserts base on the cached table.

We are actually working on the POC of some similar cases, hopefully it comes out soon.

Hao

From: Daniel Haviv [mailto:daniel.haviv@veracity-group.com<ma...@veracity-group.com>]
Sent: Friday, October 9, 2015 3:08 AM
To: user
Subject: Re: Insert via HiveContext is slow

Forgot to mention that my insert is a multi table insert :
sqlContext2.sql("""from avro_events
               lateral view explode(usChnlList) usParamLine as usParamLine
               lateral view explode(dsChnlList) dsParamLine as dsParamLine
               insert into table UpStreamParam partition(day_ts, cmtsid)
               select cmtstimestamp,datats,macaddress,
                    usParamLine['chnlidx'] chnlidx,
                    usParamLine['modulation'] modulation,
                    usParamLine['severity'] severity,
                    usParamLine['rxpower'] rxpower,
                    usParamLine['sigqnoise'] sigqnoise,
                    usParamLine['noisedeviation'] noisedeviation,
                    usParamLine['prefecber'] prefecber,
                    usParamLine['postfecber'] postfecber,
                    usParamLine['txpower'] txpower,
                    usParamLine['txpowerdrop'] txpowerdrop,
                    usParamLine['nmter'] nmter,
                    usParamLine['premtter'] premtter,
                    usParamLine['postmtter'] postmtter,
                    usParamLine['unerroreds'] unerroreds,
                    usParamLine['corrected'] corrected,
                    usParamLine['uncorrectables'] uncorrectables,
                    from_unixtime(cast(datats/1000 as bigint),'yyyyMMdd') day_ts,
                                                            cmtsid
               insert into table DwnStreamParam partition(day_ts, cmtsid)
               select  cmtstimestamp,datats,macaddress,
                    dsParamLine['chnlidx'] chnlidx,
                    dsParamLine['modulation'] modulation,
                    dsParamLine['severity'] severity,
                    dsParamLine['rxpower'] rxpower,
                    dsParamLine['sigqnoise'] sigqnoise,
                    dsParamLine['noisedeviation'] noisedeviation,
                    dsParamLine['prefecber'] prefecber,
                    dsParamLine['postfecber'] postfecber,
                    dsParamLine['sigqrxmer'] sigqrxmer,
                    dsParamLine['sigqmicroreflection'] sigqmicroreflection,
                    dsParamLine['unerroreds'] unerroreds,
                    dsParamLine['corrected'] corrected,
                    dsParamLine['uncorrectables'] uncorrectables,
                    from_unixtime(cast(datats/1000 as bigint),'yyyyMMdd') day_ts,
                                                            cmtsid
                                                                                    """)



On Thu, Oct 8, 2015 at 9:51 PM, Daniel Haviv <da...@veracity-group.com>> wrote:
Hi,
I'm inserting into a partitioned ORC table using an insert sql statement passed via HiveContext.
The performance I'm getting is pretty bad and I was wondering if there are ways to speed things up.
Would saving the DF like this df.write().mode(SaveMode.Append).partitionBy("date").saveAsTable("Tablename") be faster ?


Thank you.
Daniel



Re: Insert via HiveContext is slow

Posted by Daniel Haviv <da...@veracity-group.com>.
Thanks Hao.
It seems like one issue.
The other issue to me seems the renaming of files at the end of the insert.
would DF.save perform the task better?

Thanks,
Daniel

On Fri, Oct 9, 2015 at 3:35 AM, Cheng, Hao <ha...@intel.com> wrote:

> I think that’s a known performance issue(Compared to Hive) of Spark SQL in
> multi-inserts.
>
> A workaround is create a temp cached table for the projection first, and
> then do the multiple inserts base on the cached table.
>
>
>
> We are actually working on the POC of some similar cases, hopefully it
> comes out soon.
>
>
>
> Hao
>
>
>
> *From:* Daniel Haviv [mailto:daniel.haviv@veracity-group.com]
> *Sent:* Friday, October 9, 2015 3:08 AM
> *To:* user
> *Subject:* Re: Insert via HiveContext is slow
>
>
>
> Forgot to mention that my insert is a multi table insert :
>
> sqlContext2.sql("""from avro_events
>
>                lateral view explode(usChnlList) usParamLine as usParamLine
>
>                lateral view explode(dsChnlList) dsParamLine as dsParamLine
>
>                insert into table UpStreamParam partition(day_ts, cmtsid)
>
>                select cmtstimestamp,datats,macaddress,
>
>                     usParamLine['chnlidx'] chnlidx,
>
>                     usParamLine['modulation'] modulation,
>
>                     usParamLine['severity'] severity,
>
>                     usParamLine['rxpower'] rxpower,
>
>                     usParamLine['sigqnoise'] sigqnoise,
>
>                     usParamLine['noisedeviation'] noisedeviation,
>
>                     usParamLine['prefecber'] prefecber,
>
>                     usParamLine['postfecber'] postfecber,
>
>                     usParamLine['txpower'] txpower,
>
>                     usParamLine['txpowerdrop'] txpowerdrop,
>
>                     usParamLine['nmter'] nmter,
>
>                     usParamLine['premtter'] premtter,
>
>                     usParamLine['postmtter'] postmtter,
>
>                     usParamLine['unerroreds'] unerroreds,
>
>                     usParamLine['corrected'] corrected,
>
>                     usParamLine['uncorrectables'] uncorrectables,
>
>                     from_unixtime(cast(datats/1000 as bigint),'yyyyMMdd')
> day_ts,
>
>                                                             cmtsid
>
>                insert into table DwnStreamParam partition(day_ts, cmtsid)
>
>                select  cmtstimestamp,datats,macaddress,
>
>                     dsParamLine['chnlidx'] chnlidx,
>
>                     dsParamLine['modulation'] modulation,
>
>                     dsParamLine['severity'] severity,
>
>                     dsParamLine['rxpower'] rxpower,
>
>                     dsParamLine['sigqnoise'] sigqnoise,
>
>                     dsParamLine['noisedeviation'] noisedeviation,
>
>                     dsParamLine['prefecber'] prefecber,
>
>                     dsParamLine['postfecber'] postfecber,
>
>                     dsParamLine['sigqrxmer'] sigqrxmer,
>
>                     dsParamLine['sigqmicroreflection'] sigqmicroreflection,
>
>                     dsParamLine['unerroreds'] unerroreds,
>
>                     dsParamLine['corrected'] corrected,
>
>                     dsParamLine['uncorrectables'] uncorrectables,
>
>                     from_unixtime(cast(datats/1000 as bigint),'yyyyMMdd')
> day_ts,
>
>                                                             cmtsid
>
>
> """)
>
>
>
>
>
>
>
> On Thu, Oct 8, 2015 at 9:51 PM, Daniel Haviv <
> daniel.haviv@veracity-group.com> wrote:
>
> Hi,
>
> I'm inserting into a partitioned ORC table using an insert sql statement
> passed via HiveContext.
>
> The performance I'm getting is pretty bad and I was wondering if there are
> ways to speed things up.
>
> Would saving the DF like this df.write().mode(SaveMode.Append).partitionBy("date").saveAsTable("Tablename")
> be faster ?
>
>
>
>
>
> Thank you.
>
> Daniel
>
>
>

RE: Insert via HiveContext is slow

Posted by "Cheng, Hao" <ha...@intel.com>.
I think that’s a known performance issue(Compared to Hive) of Spark SQL in multi-inserts.
A workaround is create a temp cached table for the projection first, and then do the multiple inserts base on the cached table.

We are actually working on the POC of some similar cases, hopefully it comes out soon.

Hao

From: Daniel Haviv [mailto:daniel.haviv@veracity-group.com]
Sent: Friday, October 9, 2015 3:08 AM
To: user
Subject: Re: Insert via HiveContext is slow

Forgot to mention that my insert is a multi table insert :
sqlContext2.sql("""from avro_events
               lateral view explode(usChnlList) usParamLine as usParamLine
               lateral view explode(dsChnlList) dsParamLine as dsParamLine
               insert into table UpStreamParam partition(day_ts, cmtsid)
               select cmtstimestamp,datats,macaddress,
                    usParamLine['chnlidx'] chnlidx,
                    usParamLine['modulation'] modulation,
                    usParamLine['severity'] severity,
                    usParamLine['rxpower'] rxpower,
                    usParamLine['sigqnoise'] sigqnoise,
                    usParamLine['noisedeviation'] noisedeviation,
                    usParamLine['prefecber'] prefecber,
                    usParamLine['postfecber'] postfecber,
                    usParamLine['txpower'] txpower,
                    usParamLine['txpowerdrop'] txpowerdrop,
                    usParamLine['nmter'] nmter,
                    usParamLine['premtter'] premtter,
                    usParamLine['postmtter'] postmtter,
                    usParamLine['unerroreds'] unerroreds,
                    usParamLine['corrected'] corrected,
                    usParamLine['uncorrectables'] uncorrectables,
                    from_unixtime(cast(datats/1000 as bigint),'yyyyMMdd') day_ts,
                                                            cmtsid
               insert into table DwnStreamParam partition(day_ts, cmtsid)
               select  cmtstimestamp,datats,macaddress,
                    dsParamLine['chnlidx'] chnlidx,
                    dsParamLine['modulation'] modulation,
                    dsParamLine['severity'] severity,
                    dsParamLine['rxpower'] rxpower,
                    dsParamLine['sigqnoise'] sigqnoise,
                    dsParamLine['noisedeviation'] noisedeviation,
                    dsParamLine['prefecber'] prefecber,
                    dsParamLine['postfecber'] postfecber,
                    dsParamLine['sigqrxmer'] sigqrxmer,
                    dsParamLine['sigqmicroreflection'] sigqmicroreflection,
                    dsParamLine['unerroreds'] unerroreds,
                    dsParamLine['corrected'] corrected,
                    dsParamLine['uncorrectables'] uncorrectables,
                    from_unixtime(cast(datats/1000 as bigint),'yyyyMMdd') day_ts,
                                                            cmtsid
                                                                                    """)



On Thu, Oct 8, 2015 at 9:51 PM, Daniel Haviv <da...@veracity-group.com>> wrote:
Hi,
I'm inserting into a partitioned ORC table using an insert sql statement passed via HiveContext.
The performance I'm getting is pretty bad and I was wondering if there are ways to speed things up.
Would saving the DF like this df.write().mode(SaveMode.Append).partitionBy("date").saveAsTable("Tablename") be faster ?


Thank you.
Daniel


Re: Insert via HiveContext is slow

Posted by Daniel Haviv <da...@veracity-group.com>.
Forgot to mention that my insert is a multi table insert :
sqlContext2.sql("""from avro_events
               lateral view explode(usChnlList) usParamLine as usParamLine
               lateral view explode(dsChnlList) dsParamLine as dsParamLine
               insert into table UpStreamParam partition(day_ts, cmtsid)
               select cmtstimestamp,datats,macaddress,
                    usParamLine['chnlidx'] chnlidx,
                    usParamLine['modulation'] modulation,
                    usParamLine['severity'] severity,
                    usParamLine['rxpower'] rxpower,
                    usParamLine['sigqnoise'] sigqnoise,
                    usParamLine['noisedeviation'] noisedeviation,
                    usParamLine['prefecber'] prefecber,
                    usParamLine['postfecber'] postfecber,
                    usParamLine['txpower'] txpower,
                    usParamLine['txpowerdrop'] txpowerdrop,
                    usParamLine['nmter'] nmter,
                    usParamLine['premtter'] premtter,
                    usParamLine['postmtter'] postmtter,
                    usParamLine['unerroreds'] unerroreds,
                    usParamLine['corrected'] corrected,
                    usParamLine['uncorrectables'] uncorrectables,
                    from_unixtime(cast(datats/1000 as bigint),'yyyyMMdd')
day_ts,
cmtsid
               insert into table DwnStreamParam partition(day_ts, cmtsid)
               select  cmtstimestamp,datats,macaddress,
                    dsParamLine['chnlidx'] chnlidx,
                    dsParamLine['modulation'] modulation,
                    dsParamLine['severity'] severity,
                    dsParamLine['rxpower'] rxpower,
                    dsParamLine['sigqnoise'] sigqnoise,
                    dsParamLine['noisedeviation'] noisedeviation,
                    dsParamLine['prefecber'] prefecber,
                    dsParamLine['postfecber'] postfecber,
                    dsParamLine['sigqrxmer'] sigqrxmer,
                    dsParamLine['sigqmicroreflection'] sigqmicroreflection,
                    dsParamLine['unerroreds'] unerroreds,
                    dsParamLine['corrected'] corrected,
                    dsParamLine['uncorrectables'] uncorrectables,
                    from_unixtime(cast(datats/1000 as bigint),'yyyyMMdd')
day_ts,
cmtsid
""")



On Thu, Oct 8, 2015 at 9:51 PM, Daniel Haviv <
daniel.haviv@veracity-group.com> wrote:

> Hi,
> I'm inserting into a partitioned ORC table using an insert sql statement
> passed via HiveContext.
> The performance I'm getting is pretty bad and I was wondering if there are
> ways to speed things up.
> Would saving the DF like this df.write().mode(SaveMode.Append).partitionBy("date").saveAsTable("Tablename")
> be faster ?
>
>
> Thank you.
> Daniel
>