You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Jianshi Huang <ji...@gmail.com> on 2014/11/26 07:13:55 UTC

Re: How to do broadcast join in SparkSQL

Hi,

Looks like the latest SparkSQL with Hive 0.12 has a bug in Parquet support.
I got the following exceptions:

org.apache.hadoop.hive.ql.parse.SemanticException: Output Format must
implement HiveOutputFormat, otherwise it should be either
IgnoreKeyTextOutputFormat or SequenceFileOutputFormat
        at
org.apache.hadoop.hive.ql.plan.CreateTableDesc.validate(CreateTableDesc.java:431)
        at
org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:9964)
        at
org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:9180)
        at
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:327)

Using the same DDL and Analyze script above.

Jianshi


On Sat, Oct 11, 2014 at 2:18 PM, Jianshi Huang <ji...@gmail.com>
wrote:

> It works fine, thanks for the help Michael.
>
> Liancheng also told me a trick, using a subquery with LIMIT n. It works in
> latest 1.2.0
>
> BTW, looks like the broadcast optimization won't be recognized if I do a
> left join instead of a inner join. Is that true? How can I make it work for
> left joins?
>
> Cheers,
> Jianshi
>
> On Thu, Oct 9, 2014 at 3:10 AM, Michael Armbrust <mi...@databricks.com>
> wrote:
>
>> Thanks for the input.  We purposefully made sure that the config option
>> did not make it into a release as it is not something that we are willing
>> to support long term.  That said we'll try and make this easier in the
>> future either through hints or better support for statistics.
>>
>> In this particular case you can get what you want by registering the
>> tables as external tables and setting an flag.  Here's a helper function to
>> do what you need.
>>
>> /**
>>  * Sugar for creating a Hive external table from a parquet path.
>>  */
>> def createParquetTable(name: String, file: String): Unit = {
>>   import org.apache.spark.sql.hive.HiveMetastoreTypes
>>
>>   val rdd = parquetFile(file)
>>   val schema = rdd.schema.fields.map(f => s"${f.name}
>> ${HiveMetastoreTypes.toMetastoreType(f.dataType)}").mkString(",\n")
>>   val ddl = s"""
>>     |CREATE EXTERNAL TABLE $name (
>>     |  $schema
>>     |)
>>     |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
>>     |STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
>>     |OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
>>     |LOCATION '$file'""".stripMargin
>>   sql(ddl)
>>   setConf("spark.sql.hive.convertMetastoreParquet", "true")
>> }
>>
>> You'll also need to run this to populate the statistics:
>>
>> ANALYZE TABLE  tableName COMPUTE STATISTICS noscan;
>>
>>
>> On Wed, Oct 8, 2014 at 1:44 AM, Jianshi Huang <ji...@gmail.com>
>> wrote:
>>
>>> Ok, currently there's cost-based optimization however Parquet statistics
>>> is not implemented...
>>>
>>> What's the good way if I want to join a big fact table with several tiny
>>> dimension tables in Spark SQL (1.1)?
>>>
>>> I wish we can allow user hint for the join.
>>>
>>> Jianshi
>>>
>>> On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang <ji...@gmail.com>
>>> wrote:
>>>
>>>> Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not
>>>> merged into master?
>>>>
>>>> I cannot find spark.sql.hints.broadcastTables in latest master, but
>>>> it's in the following patch.
>>>>
>>>>
>>>> https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5
>>>>
>>>>
>>>> Jianshi
>>>>
>>>>
>>>> On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang <jianshi.huang@gmail.com
>>>> > wrote:
>>>>
>>>>> Yes, looks like it can only be controlled by the
>>>>> parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird
>>>>> to me.
>>>>>
>>>>> How am I suppose to know the exact bytes of a table? Let me specify
>>>>> the join algorithm is preferred I think.
>>>>>
>>>>> Jianshi
>>>>>
>>>>> On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>>
>>>>>> Have you looked at SPARK-1800 ?
>>>>>>
>>>>>> e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
>>>>>> Cheers
>>>>>>
>>>>>> On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang <
>>>>>> jianshi.huang@gmail.com> wrote:
>>>>>>
>>>>>>> I cannot find it in the documentation. And I have a dozen dimension
>>>>>>> tables to (left) join...
>>>>>>>
>>>>>>>
>>>>>>> Cheers,
>>>>>>> --
>>>>>>> Jianshi Huang
>>>>>>>
>>>>>>> LinkedIn: jianshi
>>>>>>> Twitter: @jshuang
>>>>>>> Github & Blog: http://huangjs.github.com/
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Jianshi Huang
>>>>>
>>>>> LinkedIn: jianshi
>>>>> Twitter: @jshuang
>>>>> Github & Blog: http://huangjs.github.com/
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Jianshi Huang
>>>>
>>>> LinkedIn: jianshi
>>>> Twitter: @jshuang
>>>> Github & Blog: http://huangjs.github.com/
>>>>
>>>
>>>
>>>
>>> --
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>
>>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/

Re: How to do broadcast join in SparkSQL

Posted by Jianshi Huang <ji...@gmail.com>.
Oh, I found a explanation from
http://cmenguy.github.io/blog/2013/10/30/using-hive-with-parquet-format-in-cdh-4-dot-3/

The error here is a bit misleading, what it really means is that the class
parquet.hive.DeprecatedParquetOutputFormat isn’t in the classpath for Hive.
Sure enough, doing a ls /usr/lib/hive/lib doesn’t show any of the parquet
jars, but ls /usr/lib/impala/lib shows the jar we’re looking for as
parquet-hive-1.0.jar
Is it removed from latest Spark?

Jianshi


On Wed, Nov 26, 2014 at 2:13 PM, Jianshi Huang <ji...@gmail.com>
wrote:

> Hi,
>
> Looks like the latest SparkSQL with Hive 0.12 has a bug in Parquet
> support. I got the following exceptions:
>
> org.apache.hadoop.hive.ql.parse.SemanticException: Output Format must
> implement HiveOutputFormat, otherwise it should be either
> IgnoreKeyTextOutputFormat or SequenceFileOutputFormat
>         at
> org.apache.hadoop.hive.ql.plan.CreateTableDesc.validate(CreateTableDesc.java:431)
>         at
> org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:9964)
>         at
> org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:9180)
>         at
> org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:327)
>
> Using the same DDL and Analyze script above.
>
> Jianshi
>
>
> On Sat, Oct 11, 2014 at 2:18 PM, Jianshi Huang <ji...@gmail.com>
> wrote:
>
>> It works fine, thanks for the help Michael.
>>
>> Liancheng also told me a trick, using a subquery with LIMIT n. It works
>> in latest 1.2.0
>>
>> BTW, looks like the broadcast optimization won't be recognized if I do a
>> left join instead of a inner join. Is that true? How can I make it work for
>> left joins?
>>
>> Cheers,
>> Jianshi
>>
>> On Thu, Oct 9, 2014 at 3:10 AM, Michael Armbrust <mi...@databricks.com>
>> wrote:
>>
>>> Thanks for the input.  We purposefully made sure that the config option
>>> did not make it into a release as it is not something that we are willing
>>> to support long term.  That said we'll try and make this easier in the
>>> future either through hints or better support for statistics.
>>>
>>> In this particular case you can get what you want by registering the
>>> tables as external tables and setting an flag.  Here's a helper function to
>>> do what you need.
>>>
>>> /**
>>>  * Sugar for creating a Hive external table from a parquet path.
>>>  */
>>> def createParquetTable(name: String, file: String): Unit = {
>>>   import org.apache.spark.sql.hive.HiveMetastoreTypes
>>>
>>>   val rdd = parquetFile(file)
>>>   val schema = rdd.schema.fields.map(f => s"${f.name}
>>> ${HiveMetastoreTypes.toMetastoreType(f.dataType)}").mkString(",\n")
>>>   val ddl = s"""
>>>     |CREATE EXTERNAL TABLE $name (
>>>     |  $schema
>>>     |)
>>>     |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
>>>     |STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
>>>     |OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
>>>     |LOCATION '$file'""".stripMargin
>>>   sql(ddl)
>>>   setConf("spark.sql.hive.convertMetastoreParquet", "true")
>>> }
>>>
>>> You'll also need to run this to populate the statistics:
>>>
>>> ANALYZE TABLE  tableName COMPUTE STATISTICS noscan;
>>>
>>>
>>> On Wed, Oct 8, 2014 at 1:44 AM, Jianshi Huang <ji...@gmail.com>
>>> wrote:
>>>
>>>> Ok, currently there's cost-based optimization however Parquet
>>>> statistics is not implemented...
>>>>
>>>> What's the good way if I want to join a big fact table with several
>>>> tiny dimension tables in Spark SQL (1.1)?
>>>>
>>>> I wish we can allow user hint for the join.
>>>>
>>>> Jianshi
>>>>
>>>> On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang <ji...@gmail.com>
>>>> wrote:
>>>>
>>>>> Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not
>>>>> merged into master?
>>>>>
>>>>> I cannot find spark.sql.hints.broadcastTables in latest master, but
>>>>> it's in the following patch.
>>>>>
>>>>>
>>>>> https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5
>>>>>
>>>>>
>>>>> Jianshi
>>>>>
>>>>>
>>>>> On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang <
>>>>> jianshi.huang@gmail.com> wrote:
>>>>>
>>>>>> Yes, looks like it can only be controlled by the
>>>>>> parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird
>>>>>> to me.
>>>>>>
>>>>>> How am I suppose to know the exact bytes of a table? Let me specify
>>>>>> the join algorithm is preferred I think.
>>>>>>
>>>>>> Jianshi
>>>>>>
>>>>>> On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>>>
>>>>>>> Have you looked at SPARK-1800 ?
>>>>>>>
>>>>>>> e.g. see
>>>>>>> sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
>>>>>>> Cheers
>>>>>>>
>>>>>>> On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang <
>>>>>>> jianshi.huang@gmail.com> wrote:
>>>>>>>
>>>>>>>> I cannot find it in the documentation. And I have a dozen dimension
>>>>>>>> tables to (left) join...
>>>>>>>>
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> --
>>>>>>>> Jianshi Huang
>>>>>>>>
>>>>>>>> LinkedIn: jianshi
>>>>>>>> Twitter: @jshuang
>>>>>>>> Github & Blog: http://huangjs.github.com/
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Jianshi Huang
>>>>>>
>>>>>> LinkedIn: jianshi
>>>>>> Twitter: @jshuang
>>>>>> Github & Blog: http://huangjs.github.com/
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Jianshi Huang
>>>>>
>>>>> LinkedIn: jianshi
>>>>> Twitter: @jshuang
>>>>> Github & Blog: http://huangjs.github.com/
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Jianshi Huang
>>>>
>>>> LinkedIn: jianshi
>>>> Twitter: @jshuang
>>>> Github & Blog: http://huangjs.github.com/
>>>>
>>>
>>>
>>
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/

Re: How to do broadcast join in SparkSQL

Posted by Jianshi Huang <ji...@gmail.com>.
Oh, I found a explanation from
http://cmenguy.github.io/blog/2013/10/30/using-hive-with-parquet-format-in-cdh-4-dot-3/

The error here is a bit misleading, what it really means is that the class
parquet.hive.DeprecatedParquetOutputFormat isn’t in the classpath for Hive.
Sure enough, doing a ls /usr/lib/hive/lib doesn’t show any of the parquet
jars, but ls /usr/lib/impala/lib shows the jar we’re looking for as
parquet-hive-1.0.jar
Is it removed from latest Spark?

Jianshi


On Wed, Nov 26, 2014 at 2:13 PM, Jianshi Huang <ji...@gmail.com>
wrote:

> Hi,
>
> Looks like the latest SparkSQL with Hive 0.12 has a bug in Parquet
> support. I got the following exceptions:
>
> org.apache.hadoop.hive.ql.parse.SemanticException: Output Format must
> implement HiveOutputFormat, otherwise it should be either
> IgnoreKeyTextOutputFormat or SequenceFileOutputFormat
>         at
> org.apache.hadoop.hive.ql.plan.CreateTableDesc.validate(CreateTableDesc.java:431)
>         at
> org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:9964)
>         at
> org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:9180)
>         at
> org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:327)
>
> Using the same DDL and Analyze script above.
>
> Jianshi
>
>
> On Sat, Oct 11, 2014 at 2:18 PM, Jianshi Huang <ji...@gmail.com>
> wrote:
>
>> It works fine, thanks for the help Michael.
>>
>> Liancheng also told me a trick, using a subquery with LIMIT n. It works
>> in latest 1.2.0
>>
>> BTW, looks like the broadcast optimization won't be recognized if I do a
>> left join instead of a inner join. Is that true? How can I make it work for
>> left joins?
>>
>> Cheers,
>> Jianshi
>>
>> On Thu, Oct 9, 2014 at 3:10 AM, Michael Armbrust <mi...@databricks.com>
>> wrote:
>>
>>> Thanks for the input.  We purposefully made sure that the config option
>>> did not make it into a release as it is not something that we are willing
>>> to support long term.  That said we'll try and make this easier in the
>>> future either through hints or better support for statistics.
>>>
>>> In this particular case you can get what you want by registering the
>>> tables as external tables and setting an flag.  Here's a helper function to
>>> do what you need.
>>>
>>> /**
>>>  * Sugar for creating a Hive external table from a parquet path.
>>>  */
>>> def createParquetTable(name: String, file: String): Unit = {
>>>   import org.apache.spark.sql.hive.HiveMetastoreTypes
>>>
>>>   val rdd = parquetFile(file)
>>>   val schema = rdd.schema.fields.map(f => s"${f.name}
>>> ${HiveMetastoreTypes.toMetastoreType(f.dataType)}").mkString(",\n")
>>>   val ddl = s"""
>>>     |CREATE EXTERNAL TABLE $name (
>>>     |  $schema
>>>     |)
>>>     |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
>>>     |STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
>>>     |OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
>>>     |LOCATION '$file'""".stripMargin
>>>   sql(ddl)
>>>   setConf("spark.sql.hive.convertMetastoreParquet", "true")
>>> }
>>>
>>> You'll also need to run this to populate the statistics:
>>>
>>> ANALYZE TABLE  tableName COMPUTE STATISTICS noscan;
>>>
>>>
>>> On Wed, Oct 8, 2014 at 1:44 AM, Jianshi Huang <ji...@gmail.com>
>>> wrote:
>>>
>>>> Ok, currently there's cost-based optimization however Parquet
>>>> statistics is not implemented...
>>>>
>>>> What's the good way if I want to join a big fact table with several
>>>> tiny dimension tables in Spark SQL (1.1)?
>>>>
>>>> I wish we can allow user hint for the join.
>>>>
>>>> Jianshi
>>>>
>>>> On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang <ji...@gmail.com>
>>>> wrote:
>>>>
>>>>> Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not
>>>>> merged into master?
>>>>>
>>>>> I cannot find spark.sql.hints.broadcastTables in latest master, but
>>>>> it's in the following patch.
>>>>>
>>>>>
>>>>> https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5
>>>>>
>>>>>
>>>>> Jianshi
>>>>>
>>>>>
>>>>> On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang <
>>>>> jianshi.huang@gmail.com> wrote:
>>>>>
>>>>>> Yes, looks like it can only be controlled by the
>>>>>> parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird
>>>>>> to me.
>>>>>>
>>>>>> How am I suppose to know the exact bytes of a table? Let me specify
>>>>>> the join algorithm is preferred I think.
>>>>>>
>>>>>> Jianshi
>>>>>>
>>>>>> On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>>>
>>>>>>> Have you looked at SPARK-1800 ?
>>>>>>>
>>>>>>> e.g. see
>>>>>>> sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
>>>>>>> Cheers
>>>>>>>
>>>>>>> On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang <
>>>>>>> jianshi.huang@gmail.com> wrote:
>>>>>>>
>>>>>>>> I cannot find it in the documentation. And I have a dozen dimension
>>>>>>>> tables to (left) join...
>>>>>>>>
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> --
>>>>>>>> Jianshi Huang
>>>>>>>>
>>>>>>>> LinkedIn: jianshi
>>>>>>>> Twitter: @jshuang
>>>>>>>> Github & Blog: http://huangjs.github.com/
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Jianshi Huang
>>>>>>
>>>>>> LinkedIn: jianshi
>>>>>> Twitter: @jshuang
>>>>>> Github & Blog: http://huangjs.github.com/
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Jianshi Huang
>>>>>
>>>>> LinkedIn: jianshi
>>>>> Twitter: @jshuang
>>>>> Github & Blog: http://huangjs.github.com/
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Jianshi Huang
>>>>
>>>> LinkedIn: jianshi
>>>> Twitter: @jshuang
>>>> Github & Blog: http://huangjs.github.com/
>>>>
>>>
>>>
>>
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/