You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jianshi Huang <ji...@gmail.com> on 2014/09/28 10:55:59 UTC

How to do broadcast join in SparkSQL

I cannot find it in the documentation. And I have a dozen dimension tables
to (left) join...


Cheers,
-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/

Re: How to do broadcast join in SparkSQL

Posted by Michael Armbrust <mi...@databricks.com>.
In Spark 1.3, parquet tables that are created through the datasources API
will automatically calculate the sizeInBytes, which is used to broadcast.

On Thu, Feb 12, 2015 at 12:46 PM, Dima Zhiyanov <di...@hotmail.com>
wrote:

> Hello
>
> Has Spark implemented computing statistics for Parquet files? Or is there
> any other way I can enable broadcast joins between parquet file RDDs in
> Spark Sql?
>
> Thanks
> Dima
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-do-broadcast-join-in-SparkSQL-tp15298p21632.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: How to do broadcast join in SparkSQL

Posted by Dima Zhiyanov <di...@hotmail.com>.
Hello 

Has Spark implemented computing statistics for Parquet files? Or is there
any other way I can enable broadcast joins between parquet file RDDs in
Spark Sql? 

Thanks 
Dima



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-do-broadcast-join-in-SparkSQL-tp15298p21632.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: How to do broadcast join in SparkSQL

Posted by Dima Zhiyanov <di...@hotmail.com>.
Hello 

Has Spark implemented computing statistics for Parquet files? Or is there
any other way I can enable broadcast joins between parquet file RDDs in
Spark Sql? 

Thanks 
Dima 



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-do-broadcast-join-in-SparkSQL-tp15298p21611.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: How to do broadcast join in SparkSQL

Posted by Jianshi Huang <ji...@gmail.com>.
Oh, I found a explanation from
http://cmenguy.github.io/blog/2013/10/30/using-hive-with-parquet-format-in-cdh-4-dot-3/

The error here is a bit misleading, what it really means is that the class
parquet.hive.DeprecatedParquetOutputFormat isn’t in the classpath for Hive.
Sure enough, doing a ls /usr/lib/hive/lib doesn’t show any of the parquet
jars, but ls /usr/lib/impala/lib shows the jar we’re looking for as
parquet-hive-1.0.jar
Is it removed from latest Spark?

Jianshi


On Wed, Nov 26, 2014 at 2:13 PM, Jianshi Huang <ji...@gmail.com>
wrote:

> Hi,
>
> Looks like the latest SparkSQL with Hive 0.12 has a bug in Parquet
> support. I got the following exceptions:
>
> org.apache.hadoop.hive.ql.parse.SemanticException: Output Format must
> implement HiveOutputFormat, otherwise it should be either
> IgnoreKeyTextOutputFormat or SequenceFileOutputFormat
>         at
> org.apache.hadoop.hive.ql.plan.CreateTableDesc.validate(CreateTableDesc.java:431)
>         at
> org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:9964)
>         at
> org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:9180)
>         at
> org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:327)
>
> Using the same DDL and Analyze script above.
>
> Jianshi
>
>
> On Sat, Oct 11, 2014 at 2:18 PM, Jianshi Huang <ji...@gmail.com>
> wrote:
>
>> It works fine, thanks for the help Michael.
>>
>> Liancheng also told me a trick, using a subquery with LIMIT n. It works
>> in latest 1.2.0
>>
>> BTW, looks like the broadcast optimization won't be recognized if I do a
>> left join instead of a inner join. Is that true? How can I make it work for
>> left joins?
>>
>> Cheers,
>> Jianshi
>>
>> On Thu, Oct 9, 2014 at 3:10 AM, Michael Armbrust <mi...@databricks.com>
>> wrote:
>>
>>> Thanks for the input.  We purposefully made sure that the config option
>>> did not make it into a release as it is not something that we are willing
>>> to support long term.  That said we'll try and make this easier in the
>>> future either through hints or better support for statistics.
>>>
>>> In this particular case you can get what you want by registering the
>>> tables as external tables and setting an flag.  Here's a helper function to
>>> do what you need.
>>>
>>> /**
>>>  * Sugar for creating a Hive external table from a parquet path.
>>>  */
>>> def createParquetTable(name: String, file: String): Unit = {
>>>   import org.apache.spark.sql.hive.HiveMetastoreTypes
>>>
>>>   val rdd = parquetFile(file)
>>>   val schema = rdd.schema.fields.map(f => s"${f.name}
>>> ${HiveMetastoreTypes.toMetastoreType(f.dataType)}").mkString(",\n")
>>>   val ddl = s"""
>>>     |CREATE EXTERNAL TABLE $name (
>>>     |  $schema
>>>     |)
>>>     |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
>>>     |STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
>>>     |OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
>>>     |LOCATION '$file'""".stripMargin
>>>   sql(ddl)
>>>   setConf("spark.sql.hive.convertMetastoreParquet", "true")
>>> }
>>>
>>> You'll also need to run this to populate the statistics:
>>>
>>> ANALYZE TABLE  tableName COMPUTE STATISTICS noscan;
>>>
>>>
>>> On Wed, Oct 8, 2014 at 1:44 AM, Jianshi Huang <ji...@gmail.com>
>>> wrote:
>>>
>>>> Ok, currently there's cost-based optimization however Parquet
>>>> statistics is not implemented...
>>>>
>>>> What's the good way if I want to join a big fact table with several
>>>> tiny dimension tables in Spark SQL (1.1)?
>>>>
>>>> I wish we can allow user hint for the join.
>>>>
>>>> Jianshi
>>>>
>>>> On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang <ji...@gmail.com>
>>>> wrote:
>>>>
>>>>> Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not
>>>>> merged into master?
>>>>>
>>>>> I cannot find spark.sql.hints.broadcastTables in latest master, but
>>>>> it's in the following patch.
>>>>>
>>>>>
>>>>> https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5
>>>>>
>>>>>
>>>>> Jianshi
>>>>>
>>>>>
>>>>> On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang <
>>>>> jianshi.huang@gmail.com> wrote:
>>>>>
>>>>>> Yes, looks like it can only be controlled by the
>>>>>> parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird
>>>>>> to me.
>>>>>>
>>>>>> How am I suppose to know the exact bytes of a table? Let me specify
>>>>>> the join algorithm is preferred I think.
>>>>>>
>>>>>> Jianshi
>>>>>>
>>>>>> On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>>>
>>>>>>> Have you looked at SPARK-1800 ?
>>>>>>>
>>>>>>> e.g. see
>>>>>>> sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
>>>>>>> Cheers
>>>>>>>
>>>>>>> On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang <
>>>>>>> jianshi.huang@gmail.com> wrote:
>>>>>>>
>>>>>>>> I cannot find it in the documentation. And I have a dozen dimension
>>>>>>>> tables to (left) join...
>>>>>>>>
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> --
>>>>>>>> Jianshi Huang
>>>>>>>>
>>>>>>>> LinkedIn: jianshi
>>>>>>>> Twitter: @jshuang
>>>>>>>> Github & Blog: http://huangjs.github.com/
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Jianshi Huang
>>>>>>
>>>>>> LinkedIn: jianshi
>>>>>> Twitter: @jshuang
>>>>>> Github & Blog: http://huangjs.github.com/
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Jianshi Huang
>>>>>
>>>>> LinkedIn: jianshi
>>>>> Twitter: @jshuang
>>>>> Github & Blog: http://huangjs.github.com/
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Jianshi Huang
>>>>
>>>> LinkedIn: jianshi
>>>> Twitter: @jshuang
>>>> Github & Blog: http://huangjs.github.com/
>>>>
>>>
>>>
>>
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/

Re: How to do broadcast join in SparkSQL

Posted by Jianshi Huang <ji...@gmail.com>.
Oh, I found a explanation from
http://cmenguy.github.io/blog/2013/10/30/using-hive-with-parquet-format-in-cdh-4-dot-3/

The error here is a bit misleading, what it really means is that the class
parquet.hive.DeprecatedParquetOutputFormat isn’t in the classpath for Hive.
Sure enough, doing a ls /usr/lib/hive/lib doesn’t show any of the parquet
jars, but ls /usr/lib/impala/lib shows the jar we’re looking for as
parquet-hive-1.0.jar
Is it removed from latest Spark?

Jianshi


On Wed, Nov 26, 2014 at 2:13 PM, Jianshi Huang <ji...@gmail.com>
wrote:

> Hi,
>
> Looks like the latest SparkSQL with Hive 0.12 has a bug in Parquet
> support. I got the following exceptions:
>
> org.apache.hadoop.hive.ql.parse.SemanticException: Output Format must
> implement HiveOutputFormat, otherwise it should be either
> IgnoreKeyTextOutputFormat or SequenceFileOutputFormat
>         at
> org.apache.hadoop.hive.ql.plan.CreateTableDesc.validate(CreateTableDesc.java:431)
>         at
> org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:9964)
>         at
> org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:9180)
>         at
> org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:327)
>
> Using the same DDL and Analyze script above.
>
> Jianshi
>
>
> On Sat, Oct 11, 2014 at 2:18 PM, Jianshi Huang <ji...@gmail.com>
> wrote:
>
>> It works fine, thanks for the help Michael.
>>
>> Liancheng also told me a trick, using a subquery with LIMIT n. It works
>> in latest 1.2.0
>>
>> BTW, looks like the broadcast optimization won't be recognized if I do a
>> left join instead of a inner join. Is that true? How can I make it work for
>> left joins?
>>
>> Cheers,
>> Jianshi
>>
>> On Thu, Oct 9, 2014 at 3:10 AM, Michael Armbrust <mi...@databricks.com>
>> wrote:
>>
>>> Thanks for the input.  We purposefully made sure that the config option
>>> did not make it into a release as it is not something that we are willing
>>> to support long term.  That said we'll try and make this easier in the
>>> future either through hints or better support for statistics.
>>>
>>> In this particular case you can get what you want by registering the
>>> tables as external tables and setting an flag.  Here's a helper function to
>>> do what you need.
>>>
>>> /**
>>>  * Sugar for creating a Hive external table from a parquet path.
>>>  */
>>> def createParquetTable(name: String, file: String): Unit = {
>>>   import org.apache.spark.sql.hive.HiveMetastoreTypes
>>>
>>>   val rdd = parquetFile(file)
>>>   val schema = rdd.schema.fields.map(f => s"${f.name}
>>> ${HiveMetastoreTypes.toMetastoreType(f.dataType)}").mkString(",\n")
>>>   val ddl = s"""
>>>     |CREATE EXTERNAL TABLE $name (
>>>     |  $schema
>>>     |)
>>>     |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
>>>     |STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
>>>     |OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
>>>     |LOCATION '$file'""".stripMargin
>>>   sql(ddl)
>>>   setConf("spark.sql.hive.convertMetastoreParquet", "true")
>>> }
>>>
>>> You'll also need to run this to populate the statistics:
>>>
>>> ANALYZE TABLE  tableName COMPUTE STATISTICS noscan;
>>>
>>>
>>> On Wed, Oct 8, 2014 at 1:44 AM, Jianshi Huang <ji...@gmail.com>
>>> wrote:
>>>
>>>> Ok, currently there's cost-based optimization however Parquet
>>>> statistics is not implemented...
>>>>
>>>> What's the good way if I want to join a big fact table with several
>>>> tiny dimension tables in Spark SQL (1.1)?
>>>>
>>>> I wish we can allow user hint for the join.
>>>>
>>>> Jianshi
>>>>
>>>> On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang <ji...@gmail.com>
>>>> wrote:
>>>>
>>>>> Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not
>>>>> merged into master?
>>>>>
>>>>> I cannot find spark.sql.hints.broadcastTables in latest master, but
>>>>> it's in the following patch.
>>>>>
>>>>>
>>>>> https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5
>>>>>
>>>>>
>>>>> Jianshi
>>>>>
>>>>>
>>>>> On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang <
>>>>> jianshi.huang@gmail.com> wrote:
>>>>>
>>>>>> Yes, looks like it can only be controlled by the
>>>>>> parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird
>>>>>> to me.
>>>>>>
>>>>>> How am I suppose to know the exact bytes of a table? Let me specify
>>>>>> the join algorithm is preferred I think.
>>>>>>
>>>>>> Jianshi
>>>>>>
>>>>>> On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>>>
>>>>>>> Have you looked at SPARK-1800 ?
>>>>>>>
>>>>>>> e.g. see
>>>>>>> sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
>>>>>>> Cheers
>>>>>>>
>>>>>>> On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang <
>>>>>>> jianshi.huang@gmail.com> wrote:
>>>>>>>
>>>>>>>> I cannot find it in the documentation. And I have a dozen dimension
>>>>>>>> tables to (left) join...
>>>>>>>>
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> --
>>>>>>>> Jianshi Huang
>>>>>>>>
>>>>>>>> LinkedIn: jianshi
>>>>>>>> Twitter: @jshuang
>>>>>>>> Github & Blog: http://huangjs.github.com/
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Jianshi Huang
>>>>>>
>>>>>> LinkedIn: jianshi
>>>>>> Twitter: @jshuang
>>>>>> Github & Blog: http://huangjs.github.com/
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Jianshi Huang
>>>>>
>>>>> LinkedIn: jianshi
>>>>> Twitter: @jshuang
>>>>> Github & Blog: http://huangjs.github.com/
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Jianshi Huang
>>>>
>>>> LinkedIn: jianshi
>>>> Twitter: @jshuang
>>>> Github & Blog: http://huangjs.github.com/
>>>>
>>>
>>>
>>
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/

Re: How to do broadcast join in SparkSQL

Posted by Jianshi Huang <ji...@gmail.com>.
Hi,

Looks like the latest SparkSQL with Hive 0.12 has a bug in Parquet support.
I got the following exceptions:

org.apache.hadoop.hive.ql.parse.SemanticException: Output Format must
implement HiveOutputFormat, otherwise it should be either
IgnoreKeyTextOutputFormat or SequenceFileOutputFormat
        at
org.apache.hadoop.hive.ql.plan.CreateTableDesc.validate(CreateTableDesc.java:431)
        at
org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:9964)
        at
org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:9180)
        at
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:327)

Using the same DDL and Analyze script above.

Jianshi


On Sat, Oct 11, 2014 at 2:18 PM, Jianshi Huang <ji...@gmail.com>
wrote:

> It works fine, thanks for the help Michael.
>
> Liancheng also told me a trick, using a subquery with LIMIT n. It works in
> latest 1.2.0
>
> BTW, looks like the broadcast optimization won't be recognized if I do a
> left join instead of a inner join. Is that true? How can I make it work for
> left joins?
>
> Cheers,
> Jianshi
>
> On Thu, Oct 9, 2014 at 3:10 AM, Michael Armbrust <mi...@databricks.com>
> wrote:
>
>> Thanks for the input.  We purposefully made sure that the config option
>> did not make it into a release as it is not something that we are willing
>> to support long term.  That said we'll try and make this easier in the
>> future either through hints or better support for statistics.
>>
>> In this particular case you can get what you want by registering the
>> tables as external tables and setting an flag.  Here's a helper function to
>> do what you need.
>>
>> /**
>>  * Sugar for creating a Hive external table from a parquet path.
>>  */
>> def createParquetTable(name: String, file: String): Unit = {
>>   import org.apache.spark.sql.hive.HiveMetastoreTypes
>>
>>   val rdd = parquetFile(file)
>>   val schema = rdd.schema.fields.map(f => s"${f.name}
>> ${HiveMetastoreTypes.toMetastoreType(f.dataType)}").mkString(",\n")
>>   val ddl = s"""
>>     |CREATE EXTERNAL TABLE $name (
>>     |  $schema
>>     |)
>>     |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
>>     |STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
>>     |OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
>>     |LOCATION '$file'""".stripMargin
>>   sql(ddl)
>>   setConf("spark.sql.hive.convertMetastoreParquet", "true")
>> }
>>
>> You'll also need to run this to populate the statistics:
>>
>> ANALYZE TABLE  tableName COMPUTE STATISTICS noscan;
>>
>>
>> On Wed, Oct 8, 2014 at 1:44 AM, Jianshi Huang <ji...@gmail.com>
>> wrote:
>>
>>> Ok, currently there's cost-based optimization however Parquet statistics
>>> is not implemented...
>>>
>>> What's the good way if I want to join a big fact table with several tiny
>>> dimension tables in Spark SQL (1.1)?
>>>
>>> I wish we can allow user hint for the join.
>>>
>>> Jianshi
>>>
>>> On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang <ji...@gmail.com>
>>> wrote:
>>>
>>>> Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not
>>>> merged into master?
>>>>
>>>> I cannot find spark.sql.hints.broadcastTables in latest master, but
>>>> it's in the following patch.
>>>>
>>>>
>>>> https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5
>>>>
>>>>
>>>> Jianshi
>>>>
>>>>
>>>> On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang <jianshi.huang@gmail.com
>>>> > wrote:
>>>>
>>>>> Yes, looks like it can only be controlled by the
>>>>> parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird
>>>>> to me.
>>>>>
>>>>> How am I suppose to know the exact bytes of a table? Let me specify
>>>>> the join algorithm is preferred I think.
>>>>>
>>>>> Jianshi
>>>>>
>>>>> On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>>
>>>>>> Have you looked at SPARK-1800 ?
>>>>>>
>>>>>> e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
>>>>>> Cheers
>>>>>>
>>>>>> On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang <
>>>>>> jianshi.huang@gmail.com> wrote:
>>>>>>
>>>>>>> I cannot find it in the documentation. And I have a dozen dimension
>>>>>>> tables to (left) join...
>>>>>>>
>>>>>>>
>>>>>>> Cheers,
>>>>>>> --
>>>>>>> Jianshi Huang
>>>>>>>
>>>>>>> LinkedIn: jianshi
>>>>>>> Twitter: @jshuang
>>>>>>> Github & Blog: http://huangjs.github.com/
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Jianshi Huang
>>>>>
>>>>> LinkedIn: jianshi
>>>>> Twitter: @jshuang
>>>>> Github & Blog: http://huangjs.github.com/
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Jianshi Huang
>>>>
>>>> LinkedIn: jianshi
>>>> Twitter: @jshuang
>>>> Github & Blog: http://huangjs.github.com/
>>>>
>>>
>>>
>>>
>>> --
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>
>>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/

Re: How to do broadcast join in SparkSQL

Posted by Jianshi Huang <ji...@gmail.com>.
Hi,

Looks like the latest SparkSQL with Hive 0.12 has a bug in Parquet support.
I got the following exceptions:

org.apache.hadoop.hive.ql.parse.SemanticException: Output Format must
implement HiveOutputFormat, otherwise it should be either
IgnoreKeyTextOutputFormat or SequenceFileOutputFormat
        at
org.apache.hadoop.hive.ql.plan.CreateTableDesc.validate(CreateTableDesc.java:431)
        at
org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:9964)
        at
org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:9180)
        at
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:327)

Using the same DDL and Analyze script above.

Jianshi


On Sat, Oct 11, 2014 at 2:18 PM, Jianshi Huang <ji...@gmail.com>
wrote:

> It works fine, thanks for the help Michael.
>
> Liancheng also told me a trick, using a subquery with LIMIT n. It works in
> latest 1.2.0
>
> BTW, looks like the broadcast optimization won't be recognized if I do a
> left join instead of a inner join. Is that true? How can I make it work for
> left joins?
>
> Cheers,
> Jianshi
>
> On Thu, Oct 9, 2014 at 3:10 AM, Michael Armbrust <mi...@databricks.com>
> wrote:
>
>> Thanks for the input.  We purposefully made sure that the config option
>> did not make it into a release as it is not something that we are willing
>> to support long term.  That said we'll try and make this easier in the
>> future either through hints or better support for statistics.
>>
>> In this particular case you can get what you want by registering the
>> tables as external tables and setting an flag.  Here's a helper function to
>> do what you need.
>>
>> /**
>>  * Sugar for creating a Hive external table from a parquet path.
>>  */
>> def createParquetTable(name: String, file: String): Unit = {
>>   import org.apache.spark.sql.hive.HiveMetastoreTypes
>>
>>   val rdd = parquetFile(file)
>>   val schema = rdd.schema.fields.map(f => s"${f.name}
>> ${HiveMetastoreTypes.toMetastoreType(f.dataType)}").mkString(",\n")
>>   val ddl = s"""
>>     |CREATE EXTERNAL TABLE $name (
>>     |  $schema
>>     |)
>>     |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
>>     |STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
>>     |OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
>>     |LOCATION '$file'""".stripMargin
>>   sql(ddl)
>>   setConf("spark.sql.hive.convertMetastoreParquet", "true")
>> }
>>
>> You'll also need to run this to populate the statistics:
>>
>> ANALYZE TABLE  tableName COMPUTE STATISTICS noscan;
>>
>>
>> On Wed, Oct 8, 2014 at 1:44 AM, Jianshi Huang <ji...@gmail.com>
>> wrote:
>>
>>> Ok, currently there's cost-based optimization however Parquet statistics
>>> is not implemented...
>>>
>>> What's the good way if I want to join a big fact table with several tiny
>>> dimension tables in Spark SQL (1.1)?
>>>
>>> I wish we can allow user hint for the join.
>>>
>>> Jianshi
>>>
>>> On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang <ji...@gmail.com>
>>> wrote:
>>>
>>>> Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not
>>>> merged into master?
>>>>
>>>> I cannot find spark.sql.hints.broadcastTables in latest master, but
>>>> it's in the following patch.
>>>>
>>>>
>>>> https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5
>>>>
>>>>
>>>> Jianshi
>>>>
>>>>
>>>> On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang <jianshi.huang@gmail.com
>>>> > wrote:
>>>>
>>>>> Yes, looks like it can only be controlled by the
>>>>> parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird
>>>>> to me.
>>>>>
>>>>> How am I suppose to know the exact bytes of a table? Let me specify
>>>>> the join algorithm is preferred I think.
>>>>>
>>>>> Jianshi
>>>>>
>>>>> On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>>
>>>>>> Have you looked at SPARK-1800 ?
>>>>>>
>>>>>> e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
>>>>>> Cheers
>>>>>>
>>>>>> On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang <
>>>>>> jianshi.huang@gmail.com> wrote:
>>>>>>
>>>>>>> I cannot find it in the documentation. And I have a dozen dimension
>>>>>>> tables to (left) join...
>>>>>>>
>>>>>>>
>>>>>>> Cheers,
>>>>>>> --
>>>>>>> Jianshi Huang
>>>>>>>
>>>>>>> LinkedIn: jianshi
>>>>>>> Twitter: @jshuang
>>>>>>> Github & Blog: http://huangjs.github.com/
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Jianshi Huang
>>>>>
>>>>> LinkedIn: jianshi
>>>>> Twitter: @jshuang
>>>>> Github & Blog: http://huangjs.github.com/
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Jianshi Huang
>>>>
>>>> LinkedIn: jianshi
>>>> Twitter: @jshuang
>>>> Github & Blog: http://huangjs.github.com/
>>>>
>>>
>>>
>>>
>>> --
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>
>>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/

Re: How to do broadcast join in SparkSQL

Posted by Jianshi Huang <ji...@gmail.com>.
It works fine, thanks for the help Michael.

Liancheng also told me a trick, using a subquery with LIMIT n. It works in
latest 1.2.0

BTW, looks like the broadcast optimization won't be recognized if I do a
left join instead of a inner join. Is that true? How can I make it work for
left joins?

Cheers,
Jianshi

On Thu, Oct 9, 2014 at 3:10 AM, Michael Armbrust <mi...@databricks.com>
wrote:

> Thanks for the input.  We purposefully made sure that the config option
> did not make it into a release as it is not something that we are willing
> to support long term.  That said we'll try and make this easier in the
> future either through hints or better support for statistics.
>
> In this particular case you can get what you want by registering the
> tables as external tables and setting an flag.  Here's a helper function to
> do what you need.
>
> /**
>  * Sugar for creating a Hive external table from a parquet path.
>  */
> def createParquetTable(name: String, file: String): Unit = {
>   import org.apache.spark.sql.hive.HiveMetastoreTypes
>
>   val rdd = parquetFile(file)
>   val schema = rdd.schema.fields.map(f => s"${f.name}
> ${HiveMetastoreTypes.toMetastoreType(f.dataType)}").mkString(",\n")
>   val ddl = s"""
>     |CREATE EXTERNAL TABLE $name (
>     |  $schema
>     |)
>     |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
>     |STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
>     |OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
>     |LOCATION '$file'""".stripMargin
>   sql(ddl)
>   setConf("spark.sql.hive.convertMetastoreParquet", "true")
> }
>
> You'll also need to run this to populate the statistics:
>
> ANALYZE TABLE  tableName COMPUTE STATISTICS noscan;
>
>
> On Wed, Oct 8, 2014 at 1:44 AM, Jianshi Huang <ji...@gmail.com>
> wrote:
>
>> Ok, currently there's cost-based optimization however Parquet statistics
>> is not implemented...
>>
>> What's the good way if I want to join a big fact table with several tiny
>> dimension tables in Spark SQL (1.1)?
>>
>> I wish we can allow user hint for the join.
>>
>> Jianshi
>>
>> On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang <ji...@gmail.com>
>> wrote:
>>
>>> Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not
>>> merged into master?
>>>
>>> I cannot find spark.sql.hints.broadcastTables in latest master, but
>>> it's in the following patch.
>>>
>>>
>>> https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5
>>>
>>>
>>> Jianshi
>>>
>>>
>>> On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang <ji...@gmail.com>
>>> wrote:
>>>
>>>> Yes, looks like it can only be controlled by the
>>>> parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird
>>>> to me.
>>>>
>>>> How am I suppose to know the exact bytes of a table? Let me specify the
>>>> join algorithm is preferred I think.
>>>>
>>>> Jianshi
>>>>
>>>> On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>
>>>>> Have you looked at SPARK-1800 ?
>>>>>
>>>>> e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
>>>>> Cheers
>>>>>
>>>>> On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang <
>>>>> jianshi.huang@gmail.com> wrote:
>>>>>
>>>>>> I cannot find it in the documentation. And I have a dozen dimension
>>>>>> tables to (left) join...
>>>>>>
>>>>>>
>>>>>> Cheers,
>>>>>> --
>>>>>> Jianshi Huang
>>>>>>
>>>>>> LinkedIn: jianshi
>>>>>> Twitter: @jshuang
>>>>>> Github & Blog: http://huangjs.github.com/
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Jianshi Huang
>>>>
>>>> LinkedIn: jianshi
>>>> Twitter: @jshuang
>>>> Github & Blog: http://huangjs.github.com/
>>>>
>>>
>>>
>>>
>>> --
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>
>>
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>


-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/

Re: How to do broadcast join in SparkSQL

Posted by Jianshi Huang <ji...@gmail.com>.
It works fine, thanks for the help Michael.

Liancheng also told me a trick, using a subquery with LIMIT n. It works in
latest 1.2.0

BTW, looks like the broadcast optimization won't be recognized if I do a
left join instead of a inner join. Is that true? How can I make it work for
left joins?

Cheers,
Jianshi

On Thu, Oct 9, 2014 at 3:10 AM, Michael Armbrust <mi...@databricks.com>
wrote:

> Thanks for the input.  We purposefully made sure that the config option
> did not make it into a release as it is not something that we are willing
> to support long term.  That said we'll try and make this easier in the
> future either through hints or better support for statistics.
>
> In this particular case you can get what you want by registering the
> tables as external tables and setting an flag.  Here's a helper function to
> do what you need.
>
> /**
>  * Sugar for creating a Hive external table from a parquet path.
>  */
> def createParquetTable(name: String, file: String): Unit = {
>   import org.apache.spark.sql.hive.HiveMetastoreTypes
>
>   val rdd = parquetFile(file)
>   val schema = rdd.schema.fields.map(f => s"${f.name}
> ${HiveMetastoreTypes.toMetastoreType(f.dataType)}").mkString(",\n")
>   val ddl = s"""
>     |CREATE EXTERNAL TABLE $name (
>     |  $schema
>     |)
>     |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
>     |STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
>     |OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
>     |LOCATION '$file'""".stripMargin
>   sql(ddl)
>   setConf("spark.sql.hive.convertMetastoreParquet", "true")
> }
>
> You'll also need to run this to populate the statistics:
>
> ANALYZE TABLE  tableName COMPUTE STATISTICS noscan;
>
>
> On Wed, Oct 8, 2014 at 1:44 AM, Jianshi Huang <ji...@gmail.com>
> wrote:
>
>> Ok, currently there's cost-based optimization however Parquet statistics
>> is not implemented...
>>
>> What's the good way if I want to join a big fact table with several tiny
>> dimension tables in Spark SQL (1.1)?
>>
>> I wish we can allow user hint for the join.
>>
>> Jianshi
>>
>> On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang <ji...@gmail.com>
>> wrote:
>>
>>> Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not
>>> merged into master?
>>>
>>> I cannot find spark.sql.hints.broadcastTables in latest master, but
>>> it's in the following patch.
>>>
>>>
>>> https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5
>>>
>>>
>>> Jianshi
>>>
>>>
>>> On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang <ji...@gmail.com>
>>> wrote:
>>>
>>>> Yes, looks like it can only be controlled by the
>>>> parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird
>>>> to me.
>>>>
>>>> How am I suppose to know the exact bytes of a table? Let me specify the
>>>> join algorithm is preferred I think.
>>>>
>>>> Jianshi
>>>>
>>>> On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>
>>>>> Have you looked at SPARK-1800 ?
>>>>>
>>>>> e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
>>>>> Cheers
>>>>>
>>>>> On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang <
>>>>> jianshi.huang@gmail.com> wrote:
>>>>>
>>>>>> I cannot find it in the documentation. And I have a dozen dimension
>>>>>> tables to (left) join...
>>>>>>
>>>>>>
>>>>>> Cheers,
>>>>>> --
>>>>>> Jianshi Huang
>>>>>>
>>>>>> LinkedIn: jianshi
>>>>>> Twitter: @jshuang
>>>>>> Github & Blog: http://huangjs.github.com/
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Jianshi Huang
>>>>
>>>> LinkedIn: jianshi
>>>> Twitter: @jshuang
>>>> Github & Blog: http://huangjs.github.com/
>>>>
>>>
>>>
>>>
>>> --
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>
>>
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>


-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/

Re: How to do broadcast join in SparkSQL

Posted by Dima Zhiyanov <di...@hotmail.com>.
Thank you!

The Hive solution seemed more like a workaround. I was wondering if a native Spark Sql support for computing statistics for Parquet files would be available 

Dima



Sent from my iPhone

> On Feb 11, 2015, at 3:34 PM, Ted Yu <yu...@gmail.com> wrote:
> 
> See earlier thread:
> http://search-hadoop.com/m/JW1q5BZhf92
> 
>> On Wed, Feb 11, 2015 at 3:04 PM, Dima Zhiyanov <di...@gmail.com> wrote:
>> Hello
>> 
>> Has Spark implemented computing statistics for Parquet files? Or is there
>> any other way I can enable broadcast joins between parquet file RDDs in
>> Spark Sql?
>> 
>> Thanks
>> Dima
>> 
>> 
>> 
>> 
>> --
>> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-do-broadcast-join-in-SparkSQL-tp15298p21609.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
> 

Re: How to do broadcast join in SparkSQL

Posted by Ted Yu <yu...@gmail.com>.
See earlier thread:
http://search-hadoop.com/m/JW1q5BZhf92

On Wed, Feb 11, 2015 at 3:04 PM, Dima Zhiyanov <di...@gmail.com>
wrote:

> Hello
>
> Has Spark implemented computing statistics for Parquet files? Or is there
> any other way I can enable broadcast joins between parquet file RDDs in
> Spark Sql?
>
> Thanks
> Dima
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-do-broadcast-join-in-SparkSQL-tp15298p21609.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: How to do broadcast join in SparkSQL

Posted by Dima Zhiyanov <di...@gmail.com>.
Hello

Has Spark implemented computing statistics for Parquet files? Or is there
any other way I can enable broadcast joins between parquet file RDDs in
Spark Sql?

Thanks
Dima




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-do-broadcast-join-in-SparkSQL-tp15298p21609.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: How to do broadcast join in SparkSQL

Posted by Dima Zhiyanov <di...@hotmail.com>.
Hello 

Has Spark implemented computing statistics for Parquet files? Or is there
any other way I can enable broadcast joins between parquet file RDDs in
Spark Sql? 

Thanks 
Dima 




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-do-broadcast-join-in-SparkSQL-tp15298p21610.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: How to do broadcast join in SparkSQL

Posted by Michael Armbrust <mi...@databricks.com>.
Thanks for the input.  We purposefully made sure that the config option did
not make it into a release as it is not something that we are willing to
support long term.  That said we'll try and make this easier in the future
either through hints or better support for statistics.

In this particular case you can get what you want by registering the tables
as external tables and setting an flag.  Here's a helper function to do
what you need.

/**
 * Sugar for creating a Hive external table from a parquet path.
 */
def createParquetTable(name: String, file: String): Unit = {
  import org.apache.spark.sql.hive.HiveMetastoreTypes

  val rdd = parquetFile(file)
  val schema = rdd.schema.fields.map(f => s"${f.name}
${HiveMetastoreTypes.toMetastoreType(f.dataType)}").mkString(",\n")
  val ddl = s"""
    |CREATE EXTERNAL TABLE $name (
    |  $schema
    |)
    |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
    |STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
    |OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
    |LOCATION '$file'""".stripMargin
  sql(ddl)
  setConf("spark.sql.hive.convertMetastoreParquet", "true")
}

You'll also need to run this to populate the statistics:

ANALYZE TABLE  tableName COMPUTE STATISTICS noscan;


On Wed, Oct 8, 2014 at 1:44 AM, Jianshi Huang <ji...@gmail.com>
wrote:

> Ok, currently there's cost-based optimization however Parquet statistics
> is not implemented...
>
> What's the good way if I want to join a big fact table with several tiny
> dimension tables in Spark SQL (1.1)?
>
> I wish we can allow user hint for the join.
>
> Jianshi
>
> On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang <ji...@gmail.com>
> wrote:
>
>> Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not
>> merged into master?
>>
>> I cannot find spark.sql.hints.broadcastTables in latest master, but it's
>> in the following patch.
>>
>>
>> https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5
>>
>>
>> Jianshi
>>
>>
>> On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang <ji...@gmail.com>
>> wrote:
>>
>>> Yes, looks like it can only be controlled by the
>>> parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird
>>> to me.
>>>
>>> How am I suppose to know the exact bytes of a table? Let me specify the
>>> join algorithm is preferred I think.
>>>
>>> Jianshi
>>>
>>> On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu <yu...@gmail.com> wrote:
>>>
>>>> Have you looked at SPARK-1800 ?
>>>>
>>>> e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
>>>> Cheers
>>>>
>>>> On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang <jianshi.huang@gmail.com
>>>> > wrote:
>>>>
>>>>> I cannot find it in the documentation. And I have a dozen dimension
>>>>> tables to (left) join...
>>>>>
>>>>>
>>>>> Cheers,
>>>>> --
>>>>> Jianshi Huang
>>>>>
>>>>> LinkedIn: jianshi
>>>>> Twitter: @jshuang
>>>>> Github & Blog: http://huangjs.github.com/
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>
>>
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>

Re: How to do broadcast join in SparkSQL

Posted by Michael Armbrust <mi...@databricks.com>.
Thanks for the input.  We purposefully made sure that the config option did
not make it into a release as it is not something that we are willing to
support long term.  That said we'll try and make this easier in the future
either through hints or better support for statistics.

In this particular case you can get what you want by registering the tables
as external tables and setting an flag.  Here's a helper function to do
what you need.

/**
 * Sugar for creating a Hive external table from a parquet path.
 */
def createParquetTable(name: String, file: String): Unit = {
  import org.apache.spark.sql.hive.HiveMetastoreTypes

  val rdd = parquetFile(file)
  val schema = rdd.schema.fields.map(f => s"${f.name}
${HiveMetastoreTypes.toMetastoreType(f.dataType)}").mkString(",\n")
  val ddl = s"""
    |CREATE EXTERNAL TABLE $name (
    |  $schema
    |)
    |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
    |STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
    |OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
    |LOCATION '$file'""".stripMargin
  sql(ddl)
  setConf("spark.sql.hive.convertMetastoreParquet", "true")
}

You'll also need to run this to populate the statistics:

ANALYZE TABLE  tableName COMPUTE STATISTICS noscan;


On Wed, Oct 8, 2014 at 1:44 AM, Jianshi Huang <ji...@gmail.com>
wrote:

> Ok, currently there's cost-based optimization however Parquet statistics
> is not implemented...
>
> What's the good way if I want to join a big fact table with several tiny
> dimension tables in Spark SQL (1.1)?
>
> I wish we can allow user hint for the join.
>
> Jianshi
>
> On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang <ji...@gmail.com>
> wrote:
>
>> Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not
>> merged into master?
>>
>> I cannot find spark.sql.hints.broadcastTables in latest master, but it's
>> in the following patch.
>>
>>
>> https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5
>>
>>
>> Jianshi
>>
>>
>> On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang <ji...@gmail.com>
>> wrote:
>>
>>> Yes, looks like it can only be controlled by the
>>> parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird
>>> to me.
>>>
>>> How am I suppose to know the exact bytes of a table? Let me specify the
>>> join algorithm is preferred I think.
>>>
>>> Jianshi
>>>
>>> On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu <yu...@gmail.com> wrote:
>>>
>>>> Have you looked at SPARK-1800 ?
>>>>
>>>> e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
>>>> Cheers
>>>>
>>>> On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang <jianshi.huang@gmail.com
>>>> > wrote:
>>>>
>>>>> I cannot find it in the documentation. And I have a dozen dimension
>>>>> tables to (left) join...
>>>>>
>>>>>
>>>>> Cheers,
>>>>> --
>>>>> Jianshi Huang
>>>>>
>>>>> LinkedIn: jianshi
>>>>> Twitter: @jshuang
>>>>> Github & Blog: http://huangjs.github.com/
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>
>>
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>

Re: How to do broadcast join in SparkSQL

Posted by Jianshi Huang <ji...@gmail.com>.
Ok, currently there's cost-based optimization however Parquet statistics is
not implemented...

What's the good way if I want to join a big fact table with several tiny
dimension tables in Spark SQL (1.1)?

I wish we can allow user hint for the join.

Jianshi

On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang <ji...@gmail.com>
wrote:

> Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not merged
> into master?
>
> I cannot find spark.sql.hints.broadcastTables in latest master, but it's
> in the following patch.
>
>
> https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5
>
>
> Jianshi
>
>
> On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang <ji...@gmail.com>
> wrote:
>
>> Yes, looks like it can only be controlled by the
>> parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird
>> to me.
>>
>> How am I suppose to know the exact bytes of a table? Let me specify the
>> join algorithm is preferred I think.
>>
>> Jianshi
>>
>> On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu <yu...@gmail.com> wrote:
>>
>>> Have you looked at SPARK-1800 ?
>>>
>>> e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
>>> Cheers
>>>
>>> On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang <ji...@gmail.com>
>>> wrote:
>>>
>>>> I cannot find it in the documentation. And I have a dozen dimension
>>>> tables to (left) join...
>>>>
>>>>
>>>> Cheers,
>>>> --
>>>> Jianshi Huang
>>>>
>>>> LinkedIn: jianshi
>>>> Twitter: @jshuang
>>>> Github & Blog: http://huangjs.github.com/
>>>>
>>>
>>>
>>
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/

Re: How to do broadcast join in SparkSQL

Posted by Jianshi Huang <ji...@gmail.com>.
Ok, currently there's cost-based optimization however Parquet statistics is
not implemented...

What's the good way if I want to join a big fact table with several tiny
dimension tables in Spark SQL (1.1)?

I wish we can allow user hint for the join.

Jianshi

On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang <ji...@gmail.com>
wrote:

> Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not merged
> into master?
>
> I cannot find spark.sql.hints.broadcastTables in latest master, but it's
> in the following patch.
>
>
> https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5
>
>
> Jianshi
>
>
> On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang <ji...@gmail.com>
> wrote:
>
>> Yes, looks like it can only be controlled by the
>> parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird
>> to me.
>>
>> How am I suppose to know the exact bytes of a table? Let me specify the
>> join algorithm is preferred I think.
>>
>> Jianshi
>>
>> On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu <yu...@gmail.com> wrote:
>>
>>> Have you looked at SPARK-1800 ?
>>>
>>> e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
>>> Cheers
>>>
>>> On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang <ji...@gmail.com>
>>> wrote:
>>>
>>>> I cannot find it in the documentation. And I have a dozen dimension
>>>> tables to (left) join...
>>>>
>>>>
>>>> Cheers,
>>>> --
>>>> Jianshi Huang
>>>>
>>>> LinkedIn: jianshi
>>>> Twitter: @jshuang
>>>> Github & Blog: http://huangjs.github.com/
>>>>
>>>
>>>
>>
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/

Re: How to do broadcast join in SparkSQL

Posted by Jianshi Huang <ji...@gmail.com>.
Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not merged
into master?

I cannot find spark.sql.hints.broadcastTables in latest master, but it's in
the following patch.


https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5


Jianshi


On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang <ji...@gmail.com>
wrote:

> Yes, looks like it can only be controlled by the
> parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird
> to me.
>
> How am I suppose to know the exact bytes of a table? Let me specify the
> join algorithm is preferred I think.
>
> Jianshi
>
> On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu <yu...@gmail.com> wrote:
>
>> Have you looked at SPARK-1800 ?
>>
>> e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
>> Cheers
>>
>> On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang <ji...@gmail.com>
>> wrote:
>>
>>> I cannot find it in the documentation. And I have a dozen dimension
>>> tables to (left) join...
>>>
>>>
>>> Cheers,
>>> --
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>
>>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/

Re: How to do broadcast join in SparkSQL

Posted by Jianshi Huang <ji...@gmail.com>.
Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not merged
into master?

I cannot find spark.sql.hints.broadcastTables in latest master, but it's in
the following patch.


https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5


Jianshi


On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang <ji...@gmail.com>
wrote:

> Yes, looks like it can only be controlled by the
> parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird
> to me.
>
> How am I suppose to know the exact bytes of a table? Let me specify the
> join algorithm is preferred I think.
>
> Jianshi
>
> On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu <yu...@gmail.com> wrote:
>
>> Have you looked at SPARK-1800 ?
>>
>> e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
>> Cheers
>>
>> On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang <ji...@gmail.com>
>> wrote:
>>
>>> I cannot find it in the documentation. And I have a dozen dimension
>>> tables to (left) join...
>>>
>>>
>>> Cheers,
>>> --
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>
>>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/

Re: How to do broadcast join in SparkSQL

Posted by Jianshi Huang <ji...@gmail.com>.
Yes, looks like it can only be controlled by the
parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird
to me.

How am I suppose to know the exact bytes of a table? Let me specify the
join algorithm is preferred I think.

Jianshi

On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu <yu...@gmail.com> wrote:

> Have you looked at SPARK-1800 ?
>
> e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
> Cheers
>
> On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang <ji...@gmail.com>
> wrote:
>
>> I cannot find it in the documentation. And I have a dozen dimension
>> tables to (left) join...
>>
>>
>> Cheers,
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>


-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/

Re: How to do broadcast join in SparkSQL

Posted by Ted Yu <yu...@gmail.com>.
Have you looked at SPARK-1800 ?

e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Cheers

On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang <ji...@gmail.com>
wrote:

> I cannot find it in the documentation. And I have a dozen dimension tables
> to (left) join...
>
>
> Cheers,
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>