You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by sujith71955 <gi...@git.apache.org> on 2018/10/17 20:49:40 UTC

[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...

GitHub user sujith71955 opened a pull request:

    https://github.com/apache/spark/pull/22758

    [SPARK-25332][SQL] Instead of broadcast hash join ,Sort merge join has selected when restart spark-shell/spark-JDBC for hive provider

    ## What changes were proposed in this pull request?
    Problem:
    Below steps in sequence to reproduce the issue.
    ```
    a.Create parquet table with stored as clause.
    b.Run insert statement  => This will not update Hivestats.
    c.Run (Select query which needs to calculate stats or explain cost select statement)  => this will evaluate  stats from HadoopFsRelation
    d.Since correct stats(sizeInBytes) is available , the plan  will select  broadcast node if join with any table.
    e. Exit => (come out of shell)
    
    f.Now again run **step c** ( calculate stat) query. This gives wrong stats (sizeInBytes - default value will be assigned)  in plan. Because it is calculated from HiveCatalogTable which does not have any stats as it is not updated in **step b**
    g.Since in-correct stats(sizeInBytes - default value will be assigned) is available, the plan  will select  SortMergeJoin node if join with any table.
    h.Now Run insert statement => This will  update Hivestats .
    i.Now again run **step c** ( calculate stat) query. This gives correct stat (sizeInBytes)  in plan .because it can read the hive stats which is updated in **step i**.
    j.Now onward always stat is available so correct stat is plan will be displayed  which picks  Broadcast join node(based on threshold size) always.
    ```
    ## What changes were proposed in this pull request?
    Here the main problem is hive stats is not getting recorded after insert command, this is because of a condition  "if (table.stats.nonEmpty)" in updateTableStats()  which will be executed as part of InsertIntoHadoopFsRelationCommand command.
    So as part of fix we initialized a default value for the CatalogTable stats if there is no cache of a particular LogicalRelation.
    Also it is observed in Test Case “test statistics of LogicalRelation converted from Hive serde tables" in StatisticsSuite,  orc and parquet both are convertible but we are expecting that only orc should
    get/record stats Hivestats  not for parquet.But since both relations are convertible so we should have same expectation. Same is corrected in this PR.
    
    ## How was this patch tested?
    Manually tested, attaching the snapshot, also corrected a UT as mentioned above in description which will compliment this PR changes.
    Step 1: 
    Login to spark-shell => create 2 tables => Run insert commad => Explain the check the plan =>Plan contains Broadcast join => Exit
    
    ![step-1_spark-25332](https://user-images.githubusercontent.com/12999161/47113009-83db6400-d275-11e8-8439-0b9cba0cb413.PNG)
    
    Step 2:
    Relaunch Spark-shell => Run explain command of particular select statement => verify the plan => Plan contains SortMergeJoin - This is incorrect result.
    
    ![step-2_spark-25332](https://user-images.githubusercontent.com/12999161/47113119-d288fe00-d275-11e8-9c8c-971f02fddda7.PNG)
    
    Step 3:
    Again Run insert command => Run explain command of particular select statement => verify the plan
    we can observer the node is been changed as BroadcastJoin - This makes the flow inconsistent.
    
    **After Fix**
    Login to spark-shell => create 2 tables => Run insert commad => Explain the check the plan =>Plan contains Broadcast join => Exit
    
    ![step-1-afterfix-spark-25332](https://user-images.githubusercontent.com/12999161/47113323-52af6380-d276-11e8-9eb9-71d1076d7e38.PNG)
    
    Step 2:
    Relaunch Spark-shell => Run explain command of particular select statement => verify the plan => Plan still contains Broadcast join since after fix Hivestats is available for the table.
    ![step-2-afterfix-spark-25332](https://user-images.githubusercontent.com/12999161/47113407-94400e80-d276-11e8-99c1-66fa0c333beb.PNG)
    
    Step 3:
    Again Run insert command => Run explain command of particular select statement => verify the plan
    we can observer the plan still retains BroadcastJoin - Nowonwards the results are always consistent


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sujith71955/spark master_statistics

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22758.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22758
    
----
commit 469f3693b641dae161bbb673599f55f20a60767b
Author: s71955 <su...@...>
Date:   2018-10-17T19:43:39Z

    [SPARK-25332][SQL] Instead of broadcast hash join ,Sort merge join has selected when restart spark-shell/spark-JDBC for hive provider
    
    ## What changes were proposed in this pull request?
    Problaem:
    Below steps in sequence to reproduce the issue.
    a.Create parquet table with stored as clause.
    b.Run insert statement  => This will not update Hivestats.
    c.Run (Select query which needs to calculate stats or explain cost select statement)  -> this will evaluate  stats from HadoopFsRelation
    d.Since correct stats(sizeInBytes) is available , the plan  will select  broadcast node if join with any table.
    e. Exit; (come out of shell)
    
    f.Now again run setp c ( calculate stat) query. This gives wrong stats (sizeInBytes - default value will be assigned)  in plan. Because it is calculated from HiveCatalogTable which does not have any stats as it is not updated in step b
    g.Since in-correct stats(sizeInBytes - default value will be assigned) is available, the plan  will select  SortMergeJoin node if join with any table.
    h.Now Run insert statement => This will  update Hivestats .
    i.Now again run setp c ( calculate stat) query. This gives correct stat (sizeInBytes)  in plan .because it can read the hive stats which is updated in step i.
    j.Now onward always stat is available so correct stat is plan will be displayed  which picks  Broadcast join node(based on threshold size) always.
    
    Solution:
    Here the main problem is hive stats is not getting recorded after insert command, this is because of a condition  "if (table.stats.nonEmpty)" in updateTableStats()  which will be executed as part of InsertIntoHadoopFsRelationCommand command.
    So as part of fix we initialized a default value for the CatalogTable stats if there is no cache of a particular LogicalRelation.
    
    Also it is observed in Test Case “test statistics of LogicalRelation converted from Hive serde tables" in StatisticsSuite,  orc and parquet both are convertible but we are expecting that only orc should
    get/record stats Hivestats  not for parquet.But since both relations are convertible so we should have same expectation. Same is corrected in this PR.
    
    How this patch tested:
    Manually testes, adding the test snapshots and the UT is corrected which will verify the PR scenario.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22758: [SPARK-25332][SQL] Instead of broadcast hash join ,Sort ...

Posted by sujith71955 <gi...@git.apache.org>.
Github user sujith71955 commented on the issue:

    https://github.com/apache/spark/pull/22758
  
    cc @srowen 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22758: [SPARK-25332][SQL] Instead of broadcast hash join ,Sort ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22758
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...

Posted by sujith71955 <gi...@git.apache.org>.
Github user sujith71955 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22758#discussion_r226515799
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
    @@ -193,6 +193,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
               None)
             val logicalRelation = cached.getOrElse {
               val updatedTable = inferIfNeeded(relation, options, fileFormat)
    +          // Intialize the catalogTable stats if its not defined.An intial value has to be defined
    --- End diff --
    
    You are right its about considering the default size, but i am not very sure whether we shall  invalidate the cache, i will explain my understanding below.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22758: [SPARK-25332][SQL] Instead of broadcast hash join ,Sort ...

Posted by sujith71955 <gi...@git.apache.org>.
Github user sujith71955 commented on the issue:

    https://github.com/apache/spark/pull/22758
  
    Thanks for the comment Sean , there are certain areas which i found
    inconsistencies, if i get some inputs from experts i think i can update
     the PR , if we are planning to tackle this issue via other PR then I will
    close this PR.
    
    Currently this issue is causing performance degradation as the node is
    getting converted to SortMergeJoin  even though it has potential to opt
    Broadcast join as the data size is less than broadcast threshold which
    triggers unnecessary shuffle.
    
    On Fri, 2 Nov 2018 at 12:33 AM, Sean Owen <no...@github.com> wrote:
    
    > I don't know this code well enough to review. I think there is skepticism
    > from people who know this code whether this is change is correct and
    > beneficial. If there's doubt, I think it should be closed.
    >
    > —
    > You are receiving this because you were mentioned.
    > Reply to this email directly, view it on GitHub
    > <https://github.com/apache/spark/pull/22758#issuecomment-435148158>, or mute
    > the thread
    > <https://github.com/notifications/unsubscribe-auth/AMZZ-Ws13ku8Nrh7hwpdjDYDY8pOzD-Kks5uq0VzgaJpZM4Xm3l1>
    > .
    >



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...

Posted by sujith71955 <gi...@git.apache.org>.
Github user sujith71955 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22758#discussion_r226203075
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
    @@ -193,6 +193,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
               None)
             val logicalRelation = cached.getOrElse {
               val updatedTable = inferIfNeeded(relation, options, fileFormat)
    +          // Intialize the catalogTable stats if its not defined.An intial value has to be defined
    --- End diff --
    
    > > but after create table command, when we do insert command within the same session Hive statistics is not getting updated
    > 
    > This is the thing I don't understand. Like I said before, even if table has no stats, Spark will still get a stats via the `DetermineTableStats` rule.
    
    Right,but i DefaultStatistics will return default value for the stats
    
    > > but after create table command, when we do insert command within the same session Hive statistics is not getting updated
    > 
    > This is the thing I don't understand. Like I said before, even if table has no stats, Spark will still get a stats via the `DetermineTableStats` rule.
    I think this rule will return default stats always unless we make session.sessionState.conf.fallBackToHdfsForStatsEnabled as true, i will reconfirm this behaviour.
    
    ![image](https://user-images.githubusercontent.com/12999161/47139545-b3bc5300-d2d9-11e8-9ae9-b13ee0dac970.png)



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22758: [SPARK-25332][SQL] Instead of broadcast hash join ,Sort ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22758
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22758: [SPARK-25332][SQL] Instead of broadcast hash join ,Sort ...

Posted by sujith71955 <gi...@git.apache.org>.
Github user sujith71955 commented on the issue:

    https://github.com/apache/spark/pull/22758
  
    do we need to handle this scenario? do we have any PR for handling this issue?



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22758: [SPARK-25332][SQL] Instead of broadcast hash join ,Sort ...

Posted by sujith71955 <gi...@git.apache.org>.
Github user sujith71955 commented on the issue:

    https://github.com/apache/spark/pull/22758
  
    @srowen @cloud-fan @HyukjinKwon @felixcheung.
    @wangyum i think this PR shall also solves the problem mentioned in SPARK-25403.
    Please review and provide me any suggestions. Thanks all 



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22758: [SPARK-25332][SQL] Instead of broadcast hash join ,Sort ...

Posted by sujith71955 <gi...@git.apache.org>.
Github user sujith71955 commented on the issue:

    https://github.com/apache/spark/pull/22758
  
    Inorder to make this flow consistent  either
    a)  we need to record HiveStats for insert command flow and always consider this stats while compting
     OR
    b) As mentioned above in snapshot we will estimate the data size with files always for convertable relations.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...

Posted by wangyum <gi...@git.apache.org>.
Github user wangyum commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22758#discussion_r226199284
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
    @@ -193,6 +193,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
               None)
             val logicalRelation = cached.getOrElse {
               val updatedTable = inferIfNeeded(relation, options, fileFormat)
    +          // Intialize the catalogTable stats if its not defined.An intial value has to be defined
    --- End diff --
    
    @sujith71955 What if `spark.sql.statistics.size.autoUpdate.enabled=false` or `hive.stats.autogather=false`? It still update stats?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22758#discussion_r226150341
  
    --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
    @@ -1051,7 +1051,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
     
       test("test statistics of LogicalRelation converted from Hive serde tables") {
         Seq("orc", "parquet").foreach { format =>
    -      Seq(true, false).foreach { isConverted =>
    +      // Botth parquet and orc will have Hivestatistics, both are convertable to Logical Relation.
    +      Seq(true, true).foreach { isConverted =>
    --- End diff --
    
    This is to test when the conversion is on and off. We shouldn't change it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...

Posted by sujith71955 <gi...@git.apache.org>.
Github user sujith71955 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22758#discussion_r228758621
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
    @@ -193,6 +193,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
               None)
             val logicalRelation = cached.getOrElse {
               val updatedTable = inferIfNeeded(relation, options, fileFormat)
    +          // Intialize the catalogTable stats if its not defined.An intial value has to be defined
    --- End diff --
    
    > > but after create table command, when we do insert command within the same session Hive statistics is not getting updated
    > 
    > This is the thing I don't understand. Like I said before, even if table has no stats, Spark will still get a stats via the `DetermineTableStats` rule.
    
    @cloud-fan DetermineStats is just initializing the stats if the stats is not set, only if session.sessionState.conf.fallBackToHdfsForStatsEnabled is true then the rule is deriving the stats from file system and updating the stats as shown below code snippet. In insert flow  this condition never gets executed, so the stats will be still none.
    ![image](https://user-images.githubusercontent.com/12999161/47619998-e3096600-db0a-11e8-9315-fa0d18be0860.png)



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22758: [SPARK-25332][SQL] Instead of broadcast hash join ,Sort ...

Posted by sujith71955 <gi...@git.apache.org>.
Github user sujith71955 commented on the issue:

    https://github.com/apache/spark/pull/22758
  
    @cloud-fan @HyukjinKwon @srowen 
    As result of my above observations 
    a) I am having some doubt like if we are expecting the stats shall estimate the data size with files then why in the insert flow there is a statement for updating the HiveStats?
    
    b) If we have mechanism to  read the stats from hive then why we shall estimate the data size with files?
    
    Please let me know your suggestions i feel there is an inconsistency in this flow


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22758: [SPARK-25332][SQL] Instead of broadcast hash join ,Sort ...

Posted by sujith71955 <gi...@git.apache.org>.
Github user sujith71955 commented on the issue:

    https://github.com/apache/spark/pull/22758
  
    @cloud-fan Shall i  update this PR based on the second approach, will that be fine?I tested with the second approach also and the usecases are working fine which is mentioned in this JIRA . please let me know your view, or we are going to continue with the approach mentioned in #22721.
    ![image](https://user-images.githubusercontent.com/12999161/47302893-3af22980-d640-11e8-8ba9-1ad39ba2804e.png)
    
    If any clarifications is required regarding this flow please let me know , i will try my best to explain as the scenarios are confusing.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22758: [SPARK-25332][SQL] Instead of broadcast hash join ,Sort ...

Posted by sujith71955 <gi...@git.apache.org>.
Github user sujith71955 commented on the issue:

    https://github.com/apache/spark/pull/22758
  
    cc@wzhfy 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...

Posted by wangyum <gi...@git.apache.org>.
Github user wangyum commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22758#discussion_r226203589
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
    @@ -193,6 +193,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
               None)
             val logicalRelation = cached.getOrElse {
               val updatedTable = inferIfNeeded(relation, options, fileFormat)
    +          // Intialize the catalogTable stats if its not defined.An intial value has to be defined
    --- End diff --
    
    The table created in the current session does not have stats. In this situation. It gets `sizeInBytes` from
    https://github.com/apache/spark/blob/1ff4a77be498615ee7216fd9cc2d510ecbd43b27/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L42-L46
    https://github.com/apache/spark/blob/25c2776dd9ae3f9792048c78be2cbd958fd99841/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala#L88-L91
    .
    It's realy size, that why it's `broadcast join`. In fact, we should invalidate this table to let Spark use the `DetermineTableStats` to take effect. I am doing it here: https://github.com/apache/spark/pull/22721


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22758: [SPARK-25332][SQL] Instead of broadcast hash join ,Sort ...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the issue:

    https://github.com/apache/spark/pull/22758
  
    I don't know this code well enough to review. I think there is skepticism from people who know this code whether this is change is correct and beneficial. If there's doubt, I think it should be closed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22758: [SPARK-25332][SQL] Instead of broadcast hash join ,Sort ...

Posted by sujith71955 <gi...@git.apache.org>.
Github user sujith71955 commented on the issue:

    https://github.com/apache/spark/pull/22758
  
    I think this issue shall not be in improvement category, it shall be Critical Bug which is affecting the normal join query performances. Hope we address this issue.
    
    "Insert query flow is not updating the table stats because its stats is empty hence failed to refresh the table , so i thought lets initialize the stats even in this flow so that after insert query the flow will also refresh the table and  invalidate the respective relational cache."
    
    ![image](https://user-images.githubusercontent.com/12999161/48186580-65d9be80-e35e-11e8-9feb-b4c0604d7eac.png)
    
    ![image](https://user-images.githubusercontent.com/12999161/48186601-738f4400-e35e-11e8-8876-dd5b55623a9c.png)
    
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22758#discussion_r226151421
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
    @@ -193,6 +193,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
               None)
             val logicalRelation = cached.getOrElse {
               val updatedTable = inferIfNeeded(relation, options, fileFormat)
    +          // Intialize the catalogTable stats if its not defined.An intial value has to be defined
    --- End diff --
    
    I don't quite understand why table must have stats. For both file sources and hive tables, we will estimate the data size with files, if the table doesn't have stats.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...

Posted by sujith71955 <gi...@git.apache.org>.
Github user sujith71955 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22758#discussion_r226192210
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
    @@ -193,6 +193,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
               None)
             val logicalRelation = cached.getOrElse {
               val updatedTable = inferIfNeeded(relation, options, fileFormat)
    +          // Intialize the catalogTable stats if its not defined.An intial value has to be defined
    --- End diff --
    
    Thanks for your valuable feedback.
    My observations : 
    1) In insert flow we are always trying to update the HiveStats as per the below statement in InsertIntoHadoopFsRelationCommand. 
    ```
          if (catalogTable.nonEmpty) {
            CommandUtils.updateTableStats(sparkSession, catalogTable.get)
          }
    
    ```
    but after create table command, when we do insert command within the same session Hive statistics is not getting updated due to below validation where condition expects stats to be non-empty as below
      
    ```
    def updateTableStats(sparkSession: SparkSession, table: CatalogTable): Unit = {
        if (table.stats.nonEmpty) { 
    ```
    But if we re-launch spark-shell and trying to do insert command the Hivestatistics will be saved and now onward the stats will be taken from HiveStats and the flow will never try to estimate the data size with file .
    
    2) Currently always system is not trying to  estimate the data size with files when we are executing the insert command, as i told above if we launch the query from a new context , system will try to read the stats from the Hive. i think there is a problem in the behavior consistency and also if we can always get the stats from hive then shall we need to calculate again eveytime the stats from files?
    
     >> I think we may need to update the flow where it shall always try read the data size from files, it shall never depend on HiveStats,
     >> Or if we are recording the HiveStats then everytime it shall read the Hivestats.  
    Please let me know whether i am going right direction, let me know for any clarifications.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22758: [SPARK-25332][SQL] Instead of broadcast hash join ,Sort ...

Posted by sujith71955 <gi...@git.apache.org>.
Github user sujith71955 commented on the issue:

    https://github.com/apache/spark/pull/22758
  
    > I think the cost of get the stats from `HadoopFileSystem` may be quite high.
    
    Then we shall depend on HiveStats always to get the statistics, which is happening now also but partially. and i think this PR solving that problem, But what i told is based on cloudFans expectation
    
    ![image](https://user-images.githubusercontent.com/12999161/47195764-f3874700-d37a-11e8-9b93-e3c1cb228c54.png)



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22758: [SPARK-25332][SQL] Instead of broadcast hash join ,Sort ...

Posted by wangyum <gi...@git.apache.org>.
Github user wangyum commented on the issue:

    https://github.com/apache/spark/pull/22758
  
    I think the cost of get the stats from `HadoopFileSystem` may be quite high.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22758: [SPARK-25332][SQL] Instead of broadcast hash join ,Sort ...

Posted by sujith71955 <gi...@git.apache.org>.
Github user sujith71955 commented on the issue:

    https://github.com/apache/spark/pull/22758
  
    @cloud-fan  @HyukjinKwon @wangyum   Any suggestions on this issue , because of this defect we are facing some performance issues in our customer environment. Requesting you all to please have a look on this again and please provide me suggestions if any  so that i can handle it.
    @wangyum I am not sure whether invalidating the cache will solve my problem.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22758: [SPARK-25332][SQL] Instead of broadcast hash join ,Sort ...

Posted by sujith71955 <gi...@git.apache.org>.
Github user sujith71955 commented on the issue:

    https://github.com/apache/spark/pull/22758
  
    @cloud-fan 
    I can think as one solution, that In DetermineStats flow we can add one more condition to not update the stats  for convertable relations, since we always get the stats from HadoopFileSystem for convertable relations . this shall solve all the problems which we are facing . please let me know for your suggestions as i will update this PR as per this logic.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22758: [SPARK-25332][SQL] Instead of broadcast hash join ,Sort ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22758
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...

Posted by sujith71955 <gi...@git.apache.org>.
Github user sujith71955 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22758#discussion_r226201756
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
    @@ -193,6 +193,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
               None)
             val logicalRelation = cached.getOrElse {
               val updatedTable = inferIfNeeded(relation, options, fileFormat)
    +          // Intialize the catalogTable stats if its not defined.An intial value has to be defined
    --- End diff --
    
    Yes its default setting which means false.  but i think it should be fine to keep default setting in this scenario .


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...

Posted by sujith71955 <gi...@git.apache.org>.
Github user sujith71955 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22758#discussion_r226197174
  
    --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
    @@ -1051,7 +1051,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
     
       test("test statistics of LogicalRelation converted from Hive serde tables") {
         Seq("orc", "parquet").foreach { format =>
    -      Seq(true, false).foreach { isConverted =>
    +      // Botth parquet and orc will have Hivestatistics, both are convertable to Logical Relation.
    +      Seq(true, true).foreach { isConverted =>
    --- End diff --
    
    Right. i think some misunderstanding i will recheck into this. Thanks


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22758: [SPARK-25332][SQL] Instead of broadcast hash join ,Sort ...

Posted by sujith71955 <gi...@git.apache.org>.
Github user sujith71955 commented on the issue:

    https://github.com/apache/spark/pull/22758
  
    > Inorder to make this flow consistent either
    > a) we need to record HiveStats for insert command flow and always consider this stats while compting
    > OR
    > b) As mentioned above in snapshot we will estimate the data size with files always for convertable relations.
    
    Just a suggestion  let me know for any thoughts;) Thanks all for your valuable time.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22758#discussion_r226198591
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
    @@ -193,6 +193,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
               None)
             val logicalRelation = cached.getOrElse {
               val updatedTable = inferIfNeeded(relation, options, fileFormat)
    +          // Intialize the catalogTable stats if its not defined.An intial value has to be defined
    --- End diff --
    
    > but after create table command, when we do insert command within the same session Hive statistics is not getting updated
    
    This is the thing I don't understand. Like I said before, even if table has no stats, Spark will still get a stats via the `DetermineTableStats` rule.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22758: [SPARK-25332][SQL] Instead of broadcast hash join ,Sort ...

Posted by sujith71955 <gi...@git.apache.org>.
Github user sujith71955 commented on the issue:

    https://github.com/apache/spark/pull/22758
  
    @cloud-fan Please find my understanding of the flow as mentioned below, its bit tricky :)
     Lets elaborate this flow might be we get more suggestions.
    
    Step 1 : insert command
    
    insert command --> In DetermineTableStats  condition will not be met  since its a OneTableRelation in HiveStrategies(so CatalogTable stats is none) --> In convertToLogicalRelation(), Relation will be saved with None stats  as per HiveMetastoreCatalog flow  ---> HiveStats will not be set since CatalogTable stats is none, as per InsertIntoHadoopFsRelationCommand flow
    
    Step 2 : Execute select statement
    
    Select flow --> In DetermineTableStats  we get as HiveTableRelation condition met so update stats with default value --> In convertToLogicalRelation(), the LogicalRelation will be fetched from cache with  stats as none (set in insert command flow), -->,While computing LogicalRelation since CatalogTable stats is none , stats will be calculated from HadoopFSRelation based on file source
    
    Result : we get a plan with Broadcast join  as per expectation.
    
    Step 3 : 
    
    Exit; ( come out of spark-shell)
    
    Step 4: Execute select statement
    
    Select flow -> In DetermineTableStats  we get as HiveTableRelation condition met so update stats with default value--> Now no LogicalRelation cache will be available as  per   convertToLogicalRelation() flow so the CatalogTable with updated stats(set by DetermineStats)  will be added in cache-->while computing LogicalRelation stats since no Hivestats is available, it will consider default stats of LogicalRelation(set by DetermineStats) .
    
    Result : we get a plan with SortMergejoin  , which is wrong.
    
    Step 5: Again Run insert command in same session
    
    insert command --> In DetermineTableStats  condition will not be met  since its a OneTableRelation in HiveStrategies(so CatalogTable stats is none) --> In convertToLogicalRelation(), Relation is already present in cache with default value(set in select flow of prev step)  as per HiveMetastoreCatalog flow.  ---> This time HiveStats will be recorded , since CatalogTable stats is not none it has default value already set as per InsertIntoHadoopFsRelationCommand flow.
    
    Step 6 : 
    
    exit; ( come out of spark-shell).
    
    Step 7 : 
    
    Run Select -> In DetermineTableStats  we get as HiveTableRelation condition met so update stats with default value-->Now no LogicalRelation cache will be available as  per   convertToLogicalRelation() flow so the CatalogTable with updated stats(set by DetermineStats)  will be added in cache -->While computing LogicalRelation stats since Hivestats is available(set in insert command of  prev step), it will consider HiveStats (HiveStats is available now which is set in prev step 5).
    
    Result : This time we get a plan with Broadcast join  as per expectation.
    
    So overall i can see there is a inconsistency in the flow which is happening due to the DetermineStats flow ,which is setting default  stats in CatalogTable for even convertable type relations.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org