You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by Dooyoung-Hwang <gi...@git.apache.org> on 2018/08/24 12:25:48 UTC

[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

GitHub user Dooyoung-Hwang opened a pull request:

    https://github.com/apache/spark/pull/22219

    [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer memory management

    ## What changes were proposed in this pull request?
    
    Spark SQL only have two options for managing thriftserver memory - enable spark.sql.thriftServer.incrementalCollect or not
    
    ### The case of enabling spark.sql.thriftServer.incrementalCollects
       - Pros
            - thriftserver can handle large output without OOM.
       - Cons
            - Performance degradation because of executing task partition by partition.
            - Handle queries with count-limit inefficiently because of executing all partitions.
           (executeTake stop scanning after collecting count-limit.)
            - Does not cache result for FETCH_FIRST
    
    ### The case of disabling spark.sql.thriftServer.incrementalCollects
       - Pros
            - Good performance for small output
       - Cons
            - Memory peak usage is too large because allocating decompressed & deserialized rows in "batch" manner, and OOM could occur for large output.
            - It is difficult to measure memory peak usage of Query, so configuring spark.driver.maxResultSize is very difficult.
            - If decompressed & deserialized rows fills up eden area of JVM Heap, they moves to old Gen and could increase possibility of "Full GC" that stops the world.
     
    ### The improvement idea of solving these problems is below.
       - DataSet does not decompress & deserialize result, and just return total row count & iterator to SQL-Executor. By doing that, only uncompressed data reside in memory, so that the memory usage is not only much lower than before but can be controlled with spark.driver.maxResultSize confit.
    
       - After SQL-Executor get total row count & iterator from DataSet, it could decide whether collecting them as batch manner(appropriate for small row count) or deserializing and sending them iteratively (appropriate for large row count) with considering returned row count.
    
    ## How was this patch tested?
    Existing tests


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/Dooyoung-Hwang/spark master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22219.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22219
    
----
commit 44f40c3399665fe68630a349cad5956ca5278663
Author: Dooyoung Hwang <do...@...>
Date:   2018-08-24T11:32:21Z

    Add collectCountAndIterator api to DataSet.
    
    When collect result with this api, only compressed rows reside in heap memory,
    so  that heap usage of collection can be controlled by caller side adaptively.

commit ec43fee8c6c68d03089a80a2987f79cab014f0ea
Author: Dooyoung Hwang <do...@...>
Date:   2018-08-24T11:41:45Z

    Add executeTakeIterator to SparkPlan
    
    1. executeTakeIterator returns iterator of decoded rows but only
    encoded rows reside in heap space
    2. Now excuteTake use executeTakeIterator. So it decodes row
    exactly n-times.

commit df762f0df7dd1a868e32b521987d7685c46ba39f
Author: Dooyoung Hwang <do...@...>
Date:   2018-08-24T11:44:41Z

    Thriftserver use collectCountAndIterator() instead of collect().
    
    - So that Thriftserver can decide whether to decompress & deserialize them
    all together or do it incrementally with considering total row count.
    If total row count is bigger than configured threshold(spark.sql.thriftServer.
    batchCollectionLimit), rows are decoded incrementally before they are sent to client.
    Else case is collected all together for performance.
    
    - Add feature to SQLConf for configuring output result rows.
    spark.sql.thriftServer.batchCollectionLimit : When a count of result row exceed this,
    result rows are collected incrementally. Only valid for non-incremental collection.
    Default is no limit.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97363/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by Dooyoung-Hwang <gi...@git.apache.org>.
Github user Dooyoung-Hwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r212704387
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
    @@ -348,30 +350,30 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
             // Otherwise, interpolate the number of partitions we need to try, but overestimate
             // it by 50%. We also cap the estimation in the end.
             val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor, 2)
    -        if (buf.isEmpty) {
    +        if (scannedRowCount == 0) {
               numPartsToTry = partsScanned * limitScaleUpFactor
             } else {
    -          val left = n - buf.size
    +          val left = n - scannedRowCount
               // As left > 0, numPartsToTry is always >= 1
    -          numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
    +          numPartsToTry = Math.ceil(1.5 * left * partsScanned / scannedRowCount).toInt
    --- End diff --
    
    Do you mean scannedRowCount? scannedRowCount is update at line 369, and it's difficult to define as val.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    **[Test build #97530 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97530/testReport)** for PR 22219 at commit [`136a4f9`](https://github.com/apache/spark/commit/136a4f9b4d205b0c94ae71f465c4790b759e6f1d).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r214825542
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
    @@ -329,17 +337,26 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
        *
        * This is modeled after `RDD.take` but never runs any job locally on the driver.
        */
    -  def executeTake(n: Int): Array[InternalRow] = {
    +  def executeTake(n: Int): Array[InternalRow] = executeTakeSeqView(n)._2.force
    +
    +  /**
    +   * Runs this query returning the tuple of the row count and the SeqView of first `n` rows.
    +   *
    +   * This is modeled to execute decodeUnsafeRows lazily to reduce peak memory usage of
    +   * decoding rows. Only compressed byte arrays consume memory after return.
    +   */
    +  private[spark] def executeTakeSeqView(
    --- End diff --
    
    For example, `collectCountAndSeqView` and `executeTakeSeqView` depend on each other? If no, please split them into separate PRs.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r212854255
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -3237,6 +3237,20 @@ class Dataset[T] private[sql](
         files.toSet.toArray
       }
     
    +  def collectCountAndIterator(): (Long, Iterator[T]) =
    --- End diff --
    
    And add a description for it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by Dooyoung-Hwang <gi...@git.apache.org>.
Github user Dooyoung-Hwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r213682460
  
    --- Diff: sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala ---
    @@ -289,6 +289,14 @@ private[hive] class SparkExecuteStatementOperation(
           sqlContext.sparkContext.cancelJobGroup(statementId)
         }
       }
    +
    +  private def getResultIterator(): Iterator[SparkRow] = {
    +    val (totalRowCount, iterResult) = result.collectCountAndIterator()
    +    val batchCollectLimit =
    +      sqlContext.getConf(SQLConf.THRIFTSERVER_BATCH_COLLECTION_LIMIT.key).toLong
    +    resultList = if (totalRowCount < batchCollectLimit) Some(iterResult.toArray) else None
    +    if (resultList.isDefined) resultList.get.iterator else iterResult
    --- End diff --
    
    I will try to cache it. Thank you for reply.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by Dooyoung-Hwang <gi...@git.apache.org>.
Github user Dooyoung-Hwang commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    Dear reviewers (cc : @dongjoon-hyun )
    
    I updated these.
     1. No behavior changes, if the new config is off. So, [PR SPARK-25353](https://github.com/apache/spark/pull/22347) is not required for this PR anymore.
     2. Apply review comments. (Define config as boolean, and do not add function to DataSet.)
     3. Add test case to test fetch_next & fetch_first.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    **[Test build #97492 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97492/testReport)** for PR 22219 at commit [`9158edc`](https://github.com/apache/spark/commit/9158edc52405b4c31127311735dbf41dbe6d17f7).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by Dooyoung-Hwang <gi...@git.apache.org>.
Github user Dooyoung-Hwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r214908763
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -3237,6 +3238,28 @@ class Dataset[T] private[sql](
         files.toSet.toArray
       }
     
    +  /**
    +   * Returns the tuple of the row count and an SeqView that contains all rows in this Dataset.
    +   *
    +   * The SeqView will consume as much memory as the total size of serialized results which can be
    +   * limited with the config 'spark.driver.maxResultSize'. Rows are deserialized when iterating rows
    +   * with iterator of returned SeqView. Whether to collect all deserialized rows or to iterate them
    +   * incrementally can be decided with considering total rows count and driver memory.
    +   */
    +  private[sql] def collectCountAndSeqView(): (Long, SeqView[T, Array[T]]) =
    +    withAction("collectCountAndSeqView", queryExecution) { plan =>
    +      // This projection writes output to a `InternalRow`, which means applying this projection is
    +      // not thread-safe. Here we create the projection inside this method to make `Dataset`
    +      // thread-safe.
    +      val objProj = GenerateSafeProjection.generate(deserializer :: Nil)
    +      val (totalRowCount, internalRowsView) = plan.executeCollectSeqView()
    +      (totalRowCount, internalRowsView.map { row =>
    +        // The row returned by SafeProjection is `SpecificInternalRow`, which ignore the data type
    +        // parameter of its `get` method, so it's safe to use null here.
    +        objProj(row).get(0, null).asInstanceOf[T]
    +      }.asInstanceOf[SeqView[T, Array[T]]])
    +    }
    --- End diff --
    
    Yes, that's what I mean. I thought that 'deserializer' is declared with private, so there is now way to get 'deserializer' out of Dataset.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r213608853
  
    --- Diff: sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala ---
    @@ -289,6 +289,14 @@ private[hive] class SparkExecuteStatementOperation(
           sqlContext.sparkContext.cancelJobGroup(statementId)
         }
       }
    +
    +  private def getResultIterator(): Iterator[SparkRow] = {
    +    val (totalRowCount, iterResult) = result.collectCountAndIterator()
    +    val batchCollectLimit =
    +      sqlContext.getConf(SQLConf.THRIFTSERVER_BATCH_COLLECTION_LIMIT.key).toLong
    +    resultList = if (totalRowCount < batchCollectLimit) Some(iterResult.toArray) else None
    +    if (resultList.isDefined) resultList.get.iterator else iterResult
    --- End diff --
    
    I think we should try to cache encoded result if row count >`THRIFTSERVER_BATCH_COLLECTION_LIMIT` when incremental collect is disabled. It sounds to me more close to what the mode does.
    
    Otherwise its behavior looks close as incremental collect mode as it does re-execution. Besides, it collects all data back to driver in encoded format.
    
    
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    cc @srinathshankar @yuchenhuo 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    **[Test build #97893 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97893/testReport)** for PR 22219 at commit [`136a4f9`](https://github.com/apache/spark/commit/136a4f9b4d205b0c94ae71f465c4790b759e6f1d).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by tooptoop4 <gi...@git.apache.org>.
Github user tooptoop4 commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    @HyukjinKwon  can this be merged?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by Dooyoung-Hwang <gi...@git.apache.org>.
Github user Dooyoung-Hwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r214818164
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
    @@ -329,17 +337,26 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
        *
        * This is modeled after `RDD.take` but never runs any job locally on the driver.
        */
    -  def executeTake(n: Int): Array[InternalRow] = {
    +  def executeTake(n: Int): Array[InternalRow] = executeTakeSeqView(n)._2.force
    +
    +  /**
    +   * Runs this query returning the tuple of the row count and the SeqView of first `n` rows.
    +   *
    +   * This is modeled to execute decodeUnsafeRows lazily to reduce peak memory usage of
    +   * decoding rows. Only compressed byte arrays consume memory after return.
    +   */
    +  private[spark] def executeTakeSeqView(
    --- End diff --
    
    Do you mean to separate this PR by sql part and thriftserver part?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r214809444
  
    --- Diff: sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala ---
    @@ -120,10 +120,11 @@ private[hive] class SparkExecuteStatementOperation(
             resultList = None
             result.toLocalIterator.asScala
           } else {
    -        if (resultList.isEmpty) {
    -          resultList = Some(result.collect())
    --- End diff --
    
    Can you keep the current behavior? Then, please implement a `SeqView` iteration model turned on/off by a new option.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    **[Test build #97530 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97530/testReport)** for PR 22219 at commit [`136a4f9`](https://github.com/apache/spark/commit/136a4f9b4d205b0c94ae71f465c4790b759e6f1d).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r212664064
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -3237,6 +3237,20 @@ class Dataset[T] private[sql](
         files.toSet.toArray
       }
     
    +  def collectCountAndIterator(): (Long, Iterator[T]) =
    +    withAction("collectCountAndIterator", queryExecution) { plan =>
    +      // This projection writes output to a `InternalRow`, which means applying this projection is
    +      // not thread-safe. Here we create the projection inside this method to make `Dataset`
    +      // thread-safe.
    +    val objProj = GenerateSafeProjection.generate(deserializer :: Nil)
    +      val (totalRowCount, iterInternalRows) = plan.executeCollectIterator()
    +      (totalRowCount, iterInternalRows.map { row =>
    +        // The row returned by SafeProjection is `SpecificInternalRow`, which ignore the data type
    +        // parameter of its `get` method, so it's safe to use null here.
    +      objProj(row).get(0, null).asInstanceOf[T]
    --- End diff --
    
    nit: two more spaces for indentation?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r215115685
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -3237,6 +3238,28 @@ class Dataset[T] private[sql](
         files.toSet.toArray
       }
     
    +  /**
    +   * Returns the tuple of the row count and an SeqView that contains all rows in this Dataset.
    +   *
    +   * The SeqView will consume as much memory as the total size of serialized results which can be
    +   * limited with the config 'spark.driver.maxResultSize'. Rows are deserialized when iterating rows
    +   * with iterator of returned SeqView. Whether to collect all deserialized rows or to iterate them
    +   * incrementally can be decided with considering total rows count and driver memory.
    +   */
    +  private[sql] def collectCountAndSeqView(): (Long, SeqView[T, Array[T]]) =
    +    withAction("collectCountAndSeqView", queryExecution) { plan =>
    +      // This projection writes output to a `InternalRow`, which means applying this projection is
    +      // not thread-safe. Here we create the projection inside this method to make `Dataset`
    +      // thread-safe.
    +      val objProj = GenerateSafeProjection.generate(deserializer :: Nil)
    +      val (totalRowCount, internalRowsView) = plan.executeCollectSeqView()
    +      (totalRowCount, internalRowsView.map { row =>
    +        // The row returned by SafeProjection is `SpecificInternalRow`, which ignore the data type
    +        // parameter of its `get` method, so it's safe to use null here.
    +        objProj(row).get(0, null).asInstanceOf[T]
    +      }.asInstanceOf[SeqView[T, Array[T]]])
    +    }
    --- End diff --
    
    how about changing private to private[sql], then implementing this based on the deserializer?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r212834287
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -3237,6 +3237,20 @@ class Dataset[T] private[sql](
         files.toSet.toArray
       }
     
    +  def collectCountAndIterator(): (Long, Iterator[T]) =
    --- End diff --
    
    Would it be good to make it public?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by Dooyoung-Hwang <gi...@git.apache.org>.
Github user Dooyoung-Hwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r212866466
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
    @@ -329,49 +329,52 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
        *
        * This is modeled after `RDD.take` but never runs any job locally on the driver.
        */
    -  def executeTake(n: Int): Array[InternalRow] = {
    +  def executeTake(n: Int): Array[InternalRow] = executeTakeIterator(n)._2.toArray
    +
    +  private[spark] def executeTakeIterator(n: Int): (Long, Iterator[InternalRow]) = {
         if (n == 0) {
    -      return new Array[InternalRow](0)
    +      return (0, Iterator.empty)
         }
     
    -    val childRDD = getByteArrayRdd(n).map(_._2)
    -
    -    val buf = new ArrayBuffer[InternalRow]
    +    val childRDD = getByteArrayRdd(n)
    +    val encodedBuf = new ArrayBuffer[Array[Byte]]
         val totalParts = childRDD.partitions.length
    +    var scannedRowCount = 0L
         var partsScanned = 0
    -    while (buf.size < n && partsScanned < totalParts) {
    +    while (scannedRowCount < n && partsScanned < totalParts) {
           // The number of partitions to try in this iteration. It is ok for this number to be
           // greater than totalParts because we actually cap it at totalParts in runJob.
    -      var numPartsToTry = 1L
    -      if (partsScanned > 0) {
    +      val numPartsToTry = if (partsScanned > 0) {
             // If we didn't find any rows after the previous iteration, quadruple and retry.
             // Otherwise, interpolate the number of partitions we need to try, but overestimate
             // it by 50%. We also cap the estimation in the end.
             val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor, 2)
    -        if (buf.isEmpty) {
    -          numPartsToTry = partsScanned * limitScaleUpFactor
    +        if (scannedRowCount == 0) {
    +          partsScanned * limitScaleUpFactor
             } else {
    -          val left = n - buf.size
    +          val left = n - scannedRowCount
               // As left > 0, numPartsToTry is always >= 1
    -          numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
    -          numPartsToTry = Math.min(numPartsToTry, partsScanned * limitScaleUpFactor)
    +          Math.min(Math.ceil(1.5 * left * partsScanned / scannedRowCount).toInt,
    +            partsScanned * limitScaleUpFactor)
             }
    +      } else {
    +        1L
    --- End diff --
    
    @kiszk 
    Yeah, I'll prepare test cases.
    
    @viirya 
    Above are changed to execute decodeUnsafeRows lazily for reduce peak memory. Changing type of numPartsToTry to val may be refactoring part that can be separated from this patch. If reviewers want to revert [this refactoring]( https://github.com/apache/spark/pull/22219/commits/91617caaab56760ea2f64f3da7486fbf445d7aa9), I can separate it and make another trivial pull request for it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97530/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r212854351
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
    @@ -329,49 +329,52 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
        *
        * This is modeled after `RDD.take` but never runs any job locally on the driver.
        */
    -  def executeTake(n: Int): Array[InternalRow] = {
    +  def executeTake(n: Int): Array[InternalRow] = executeTakeIterator(n)._2.toArray
    +
    +  private[spark] def executeTakeIterator(n: Int): (Long, Iterator[InternalRow]) = {
         if (n == 0) {
    -      return new Array[InternalRow](0)
    +      return (0, Iterator.empty)
         }
     
    -    val childRDD = getByteArrayRdd(n).map(_._2)
    -
    -    val buf = new ArrayBuffer[InternalRow]
    +    val childRDD = getByteArrayRdd(n)
    +    val encodedBuf = new ArrayBuffer[Array[Byte]]
         val totalParts = childRDD.partitions.length
    +    var scannedRowCount = 0L
         var partsScanned = 0
    -    while (buf.size < n && partsScanned < totalParts) {
    +    while (scannedRowCount < n && partsScanned < totalParts) {
           // The number of partitions to try in this iteration. It is ok for this number to be
           // greater than totalParts because we actually cap it at totalParts in runJob.
    -      var numPartsToTry = 1L
    -      if (partsScanned > 0) {
    +      val numPartsToTry = if (partsScanned > 0) {
             // If we didn't find any rows after the previous iteration, quadruple and retry.
             // Otherwise, interpolate the number of partitions we need to try, but overestimate
             // it by 50%. We also cap the estimation in the end.
             val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor, 2)
    -        if (buf.isEmpty) {
    -          numPartsToTry = partsScanned * limitScaleUpFactor
    +        if (scannedRowCount == 0) {
    +          partsScanned * limitScaleUpFactor
             } else {
    -          val left = n - buf.size
    +          val left = n - scannedRowCount
               // As left > 0, numPartsToTry is always >= 1
    -          numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
    -          numPartsToTry = Math.min(numPartsToTry, partsScanned * limitScaleUpFactor)
    +          Math.min(Math.ceil(1.5 * left * partsScanned / scannedRowCount).toInt,
    +            partsScanned * limitScaleUpFactor)
             }
    +      } else {
    +        1L
    --- End diff --
    
    Most of above change looks just refactoring. Looks fine but may be avoided to reduce diff.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97376/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    **[Test build #97388 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97388/testReport)** for PR 22219 at commit [`2a41b70`](https://github.com/apache/spark/commit/2a41b704f3aaa80aeb2d6c5b1118ee7faea5bf79).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r214785788
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -3237,6 +3238,28 @@ class Dataset[T] private[sql](
         files.toSet.toArray
       }
     
    +  /**
    +   * Returns the tuple of the row count and an SeqView that contains all rows in this Dataset.
    +   *
    +   * The SeqView will consume as much memory as the total size of serialized results which can be
    +   * limited with the config 'spark.driver.maxResultSize'. Rows are deserialized when iterating rows
    +   * with iterator of returned SeqView. Whether to collect all deserialized rows or to iterate them
    +   * incrementally can be decided with considering total rows count and driver memory.
    +   */
    +  private[sql] def collectCountAndSeqView(): (Long, SeqView[T, Array[T]]) =
    +    withAction("collectCountAndSeqView", queryExecution) { plan =>
    +      // This projection writes output to a `InternalRow`, which means applying this projection is
    +      // not thread-safe. Here we create the projection inside this method to make `Dataset`
    +      // thread-safe.
    +      val objProj = GenerateSafeProjection.generate(deserializer :: Nil)
    +      val (totalRowCount, internalRowsView) = plan.executeCollectSeqView()
    +      (totalRowCount, internalRowsView.map { row =>
    +        // The row returned by SafeProjection is `SpecificInternalRow`, which ignore the data type
    +        // parameter of its `get` method, so it's safe to use null here.
    +        objProj(row).get(0, null).asInstanceOf[T]
    +      }.asInstanceOf[SeqView[T, Array[T]]])
    +    }
    --- End diff --
    
    If this is a thriftserver specific issue, can we do the same thing by fixing code only in the thriftserver package?
    IMHO we'd be better not to modify code in the sql package as much as possible.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r212664399
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
    @@ -348,30 +350,30 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
             // Otherwise, interpolate the number of partitions we need to try, but overestimate
             // it by 50%. We also cap the estimation in the end.
             val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor, 2)
    -        if (buf.isEmpty) {
    +        if (scannedRowCount == 0) {
               numPartsToTry = partsScanned * limitScaleUpFactor
             } else {
    -          val left = n - buf.size
    +          val left = n - scannedRowCount
               // As left > 0, numPartsToTry is always >= 1
    -          numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
    +          numPartsToTry = Math.ceil(1.5 * left * partsScanned / scannedRowCount).toInt
    --- End diff --
    
    nit: Is it better to define a variable as `val`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97893/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97388/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    Would it possible to prepare test cases? IIUC, this feature can be enabled without thriftServer by writing some test code.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    @Dooyoung-Hwang Would it possible to add a test case to verify result with and without incrementalCollects by changing a value of `spark.sql.thriftServer.batchDeserializeLimit`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    **[Test build #97452 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97452/testReport)** for PR 22219 at commit [`e5baa50`](https://github.com/apache/spark/commit/e5baa50daa6cd531399fe3c36242c8c06879b120).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    nope not yet. It needs some more review iterations.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r214824489
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -3237,6 +3238,28 @@ class Dataset[T] private[sql](
         files.toSet.toArray
       }
     
    +  /**
    +   * Returns the tuple of the row count and an SeqView that contains all rows in this Dataset.
    +   *
    +   * The SeqView will consume as much memory as the total size of serialized results which can be
    +   * limited with the config 'spark.driver.maxResultSize'. Rows are deserialized when iterating rows
    +   * with iterator of returned SeqView. Whether to collect all deserialized rows or to iterate them
    +   * incrementally can be decided with considering total rows count and driver memory.
    +   */
    +  private[sql] def collectCountAndSeqView(): (Long, SeqView[T, Array[T]]) =
    +    withAction("collectCountAndSeqView", queryExecution) { plan =>
    +      // This projection writes output to a `InternalRow`, which means applying this projection is
    +      // not thread-safe. Here we create the projection inside this method to make `Dataset`
    +      // thread-safe.
    +      val objProj = GenerateSafeProjection.generate(deserializer :: Nil)
    +      val (totalRowCount, internalRowsView) = plan.executeCollectSeqView()
    +      (totalRowCount, internalRowsView.map { row =>
    +        // The row returned by SafeProjection is `SpecificInternalRow`, which ignore the data type
    +        // parameter of its `get` method, so it's safe to use null here.
    +        objProj(row).get(0, null).asInstanceOf[T]
    +      }.asInstanceOf[SeqView[T, Array[T]]])
    +    }
    --- End diff --
    
    You mean there is no way to  implement that functionality outside `Dataset`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    Did you verify this feature manually?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97452/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r214808257
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
    @@ -329,17 +337,26 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
        *
        * This is modeled after `RDD.take` but never runs any job locally on the driver.
        */
    -  def executeTake(n: Int): Array[InternalRow] = {
    +  def executeTake(n: Int): Array[InternalRow] = executeTakeSeqView(n)._2.force
    +
    +  /**
    +   * Runs this query returning the tuple of the row count and the SeqView of first `n` rows.
    +   *
    +   * This is modeled to execute decodeUnsafeRows lazily to reduce peak memory usage of
    +   * decoding rows. Only compressed byte arrays consume memory after return.
    +   */
    +  private[spark] def executeTakeSeqView(
    --- End diff --
    
    You'd be better to separate this PR into two parts you proposed in the PR description.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by Dooyoung-Hwang <gi...@git.apache.org>.
Github user Dooyoung-Hwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r213597686
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
    @@ -329,49 +329,52 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
        *
        * This is modeled after `RDD.take` but never runs any job locally on the driver.
        */
    -  def executeTake(n: Int): Array[InternalRow] = {
    +  def executeTake(n: Int): Array[InternalRow] = executeTakeIterator(n)._2.toArray
    +
    +  private[spark] def executeTakeIterator(n: Int): (Long, Iterator[InternalRow]) = {
         if (n == 0) {
    -      return new Array[InternalRow](0)
    +      return (0, Iterator.empty)
         }
     
    -    val childRDD = getByteArrayRdd(n).map(_._2)
    -
    -    val buf = new ArrayBuffer[InternalRow]
    +    val childRDD = getByteArrayRdd(n)
    +    val encodedBuf = new ArrayBuffer[Array[Byte]]
         val totalParts = childRDD.partitions.length
    +    var scannedRowCount = 0L
         var partsScanned = 0
    -    while (buf.size < n && partsScanned < totalParts) {
    +    while (scannedRowCount < n && partsScanned < totalParts) {
           // The number of partitions to try in this iteration. It is ok for this number to be
           // greater than totalParts because we actually cap it at totalParts in runJob.
    -      var numPartsToTry = 1L
    -      if (partsScanned > 0) {
    +      val numPartsToTry = if (partsScanned > 0) {
             // If we didn't find any rows after the previous iteration, quadruple and retry.
             // Otherwise, interpolate the number of partitions we need to try, but overestimate
             // it by 50%. We also cap the estimation in the end.
             val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor, 2)
    -        if (buf.isEmpty) {
    -          numPartsToTry = partsScanned * limitScaleUpFactor
    +        if (scannedRowCount == 0) {
    +          partsScanned * limitScaleUpFactor
             } else {
    -          val left = n - buf.size
    +          val left = n - scannedRowCount
               // As left > 0, numPartsToTry is always >= 1
    -          numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
    -          numPartsToTry = Math.min(numPartsToTry, partsScanned * limitScaleUpFactor)
    +          Math.min(Math.ceil(1.5 * left * partsScanned / scannedRowCount).toInt,
    +            partsScanned * limitScaleUpFactor)
             }
    +      } else {
    +        1L
    --- End diff --
    
    @kiszk
    Do we revert [this commit](https://github.com/apache/spark/pull/22219/commits/91617caaab56760ea2f64f3da7486fbf445d7aa9) to reduce diff?



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by Dooyoung-Hwang <gi...@git.apache.org>.
Github user Dooyoung-Hwang commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    Change the accessor of collectCountAndIterator to private[sql]. And updated doc of feature that I define in ThriftServer. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r212664660
  
    --- Diff: sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala ---
    @@ -289,6 +289,19 @@ private[hive] class SparkExecuteStatementOperation(
           sqlContext.sparkContext.cancelJobGroup(statementId)
         }
       }
    +
    +  private def getResultIterator(): Iterator[SparkRow] = {
    +    val (totalRowCount, iterResult) = result.collectCountAndIterator()
    +    val batchCollectLimit =
    +      sqlContext.getConf(SQLConf.THRIFTSERVER_BATCH_COLLECTION_LIMIT.key).toLong
    +
    +    resultList = {
    +      if (totalRowCount < batchCollectLimit) Some(iterResult.toArray)
    +      else None
    +    }
    +    if (resultList.isDefined) resultList.get.iterator
    --- End diff --
    
    nit: Is it better to put them into one line?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    **[Test build #97376 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97376/testReport)** for PR 22219 at commit [`f05570c`](https://github.com/apache/spark/commit/f05570c2a8fc88d35085ee8ca377785acc400cdd).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by Dooyoung-Hwang <gi...@git.apache.org>.
Github user Dooyoung-Hwang commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    Yes, I verified results of a variety of queries, and memory & performance.
    
    This patch passed all our query test. And there was no performance degradation in our test cases.
    
    And below is result of memory test.
    I checked **memory utilization of Old Gen in JVM Heap** when executes query of **2,481,284 rows**. (I Executed "jstat -gc thriftserver-pid" and checked OU field.)
     
    After patch : 283910.0KB -> 316108.3KB => 31.44MB increases
    Before patch : 279425.6KB -> 1511834.2KB => 1203.52MB increases
    
    Memory improvement is very large, because the size of compressed result buffer surprisingly smaller than I expected. **Decompressed InternalRows are collected immediately after sending them while Young GC is done**, so the usage of Old Gen Heap is much smaller than before.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by Dooyoung-Hwang <gi...@git.apache.org>.
Github user Dooyoung-Hwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r213597102
  
    --- Diff: sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala ---
    @@ -289,6 +289,14 @@ private[hive] class SparkExecuteStatementOperation(
           sqlContext.sparkContext.cancelJobGroup(statementId)
         }
       }
    +
    +  private def getResultIterator(): Iterator[SparkRow] = {
    +    val (totalRowCount, iterResult) = result.collectCountAndIterator()
    +    val batchCollectLimit =
    +      sqlContext.getConf(SQLConf.THRIFTSERVER_BATCH_COLLECTION_LIMIT.key).toLong
    +    resultList = if (totalRowCount < batchCollectLimit) Some(iterResult.toArray) else None
    +    if (resultList.isDefined) resultList.get.iterator else iterResult
    --- End diff --
    
    @viirya 
    I share my idea of the problem you commented.
    
    1. Change the return type of "collectCountAndIterator" to tuple of (Long, SeqView)
    2. The SeqView is created from encoded result array(which is the result of getByteArrayRdd().collect() in SparkPlan), and holds deserializing operations defined in DataSet.
    3. Change type of resultList in SparkExecuteStatementOperation to Option[Iterable[SparkRow]], because both Array & SeqView are Iterable.
    4. ThriftServer checks if row count exceeds THRIFTSERVER_BATCH_COLLECTION_LIMIT, and decide.
       -> if row count > THRIFTSERVER_BATCH_COLLECTION_LIMIT => resultList cache SeqView.
       -> else resultList caches Array which is collected from SeqView. => resultList cache Array.
    
    How do you think about this idea?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    **[Test build #97363 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97363/testReport)** for PR 22219 at commit [`ffafd62`](https://github.com/apache/spark/commit/ffafd62b6d5b916e286ee6870e0db168d36a09eb).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    **[Test build #97893 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97893/testReport)** for PR 22219 at commit [`136a4f9`](https://github.com/apache/spark/commit/136a4f9b4d205b0c94ae71f465c4790b759e6f1d).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by Dooyoung-Hwang <gi...@git.apache.org>.
Github user Dooyoung-Hwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r214818478
  
    --- Diff: sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala ---
    @@ -120,10 +120,11 @@ private[hive] class SparkExecuteStatementOperation(
             resultList = None
             result.toLocalIterator.asScala
           } else {
    -        if (resultList.isEmpty) {
    -          resultList = Some(result.collect())
    --- End diff --
    
    Yea, I'll change this feature as boolean. Thank you for review.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by Dooyoung-Hwang <gi...@git.apache.org>.
Github user Dooyoung-Hwang commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    @kiszk @viirya @HyukjinKwon @cloud-fan 
    Could you review this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by Dooyoung-Hwang <gi...@git.apache.org>.
Github user Dooyoung-Hwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r212862913
  
    --- Diff: sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala ---
    @@ -289,6 +289,14 @@ private[hive] class SparkExecuteStatementOperation(
           sqlContext.sparkContext.cancelJobGroup(statementId)
         }
       }
    +
    +  private def getResultIterator(): Iterator[SparkRow] = {
    +    val (totalRowCount, iterResult) = result.collectCountAndIterator()
    +    val batchCollectLimit =
    +      sqlContext.getConf(SQLConf.THRIFTSERVER_BATCH_COLLECTION_LIMIT.key).toLong
    +    resultList = if (totalRowCount < batchCollectLimit) Some(iterResult.toArray) else None
    +    if (resultList.isDefined) resultList.get.iterator else iterResult
    --- End diff --
    
    Yes, the case you commented (incremental collect is disabled & FETCH_FIRST) has performance degradation. If total rows are bigger than batchCollectLimit, I thought it is not suitable case for caching decompressed rows because of memory pressure. If FETCH_FIRST caches compressed rows(not decompressed rows) regardless of row count, the result that exceed batchLimit can be cached too. But "Iterator" return type may not a good choice for that. Instead "View" of scala is proper choice, because "Iterator" can be created again with compressed rows "View", but it causes much more source change so I didn't do that.  


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    **[Test build #97363 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97363/testReport)** for PR 22219 at commit [`ffafd62`](https://github.com/apache/spark/commit/ffafd62b6d5b916e286ee6870e0db168d36a09eb).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by Dooyoung-Hwang <gi...@git.apache.org>.
Github user Dooyoung-Hwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r214819341
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -3237,6 +3238,28 @@ class Dataset[T] private[sql](
         files.toSet.toArray
       }
     
    +  /**
    +   * Returns the tuple of the row count and an SeqView that contains all rows in this Dataset.
    +   *
    +   * The SeqView will consume as much memory as the total size of serialized results which can be
    +   * limited with the config 'spark.driver.maxResultSize'. Rows are deserialized when iterating rows
    +   * with iterator of returned SeqView. Whether to collect all deserialized rows or to iterate them
    +   * incrementally can be decided with considering total rows count and driver memory.
    +   */
    +  private[sql] def collectCountAndSeqView(): (Long, SeqView[T, Array[T]]) =
    +    withAction("collectCountAndSeqView", queryExecution) { plan =>
    +      // This projection writes output to a `InternalRow`, which means applying this projection is
    +      // not thread-safe. Here we create the projection inside this method to make `Dataset`
    +      // thread-safe.
    +      val objProj = GenerateSafeProjection.generate(deserializer :: Nil)
    +      val (totalRowCount, internalRowsView) = plan.executeCollectSeqView()
    +      (totalRowCount, internalRowsView.map { row =>
    +        // The row returned by SafeProjection is `SpecificInternalRow`, which ignore the data type
    +        // parameter of its `get` method, so it's safe to use null here.
    +        objProj(row).get(0, null).asInstanceOf[T]
    +      }.asInstanceOf[SeqView[T, Array[T]]])
    +    }
    --- End diff --
    
    Currently, there is no API to deserialize result iteratively in Dataset.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    **[Test build #97492 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97492/testReport)** for PR 22219 at commit [`9158edc`](https://github.com/apache/spark/commit/9158edc52405b4c31127311735dbf41dbe6d17f7).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by Dooyoung-Hwang <gi...@git.apache.org>.
Github user Dooyoung-Hwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r215122865
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -3237,6 +3238,28 @@ class Dataset[T] private[sql](
         files.toSet.toArray
       }
     
    +  /**
    +   * Returns the tuple of the row count and an SeqView that contains all rows in this Dataset.
    +   *
    +   * The SeqView will consume as much memory as the total size of serialized results which can be
    +   * limited with the config 'spark.driver.maxResultSize'. Rows are deserialized when iterating rows
    +   * with iterator of returned SeqView. Whether to collect all deserialized rows or to iterate them
    +   * incrementally can be decided with considering total rows count and driver memory.
    +   */
    +  private[sql] def collectCountAndSeqView(): (Long, SeqView[T, Array[T]]) =
    +    withAction("collectCountAndSeqView", queryExecution) { plan =>
    +      // This projection writes output to a `InternalRow`, which means applying this projection is
    +      // not thread-safe. Here we create the projection inside this method to make `Dataset`
    +      // thread-safe.
    +      val objProj = GenerateSafeProjection.generate(deserializer :: Nil)
    +      val (totalRowCount, internalRowsView) = plan.executeCollectSeqView()
    +      (totalRowCount, internalRowsView.map { row =>
    +        // The row returned by SafeProjection is `SpecificInternalRow`, which ignore the data type
    +        // parameter of its `get` method, so it's safe to use null here.
    +        objProj(row).get(0, null).asInstanceOf[T]
    +      }.asInstanceOf[SeqView[T, Array[T]]])
    +    }
    --- End diff --
    
    Ok, you don't prefer adding function to Dataset. If withAction & deserializer are changed to private[sql], this implementation can be moved out. Is this function useful for other SQL server to reduce memory usage of query execution? I don't think it looks good because Projection is created in the outside of Dataset.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by Dooyoung-Hwang <gi...@git.apache.org>.
Github user Dooyoung-Hwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r212877431
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -3237,6 +3237,20 @@ class Dataset[T] private[sql](
         files.toSet.toArray
       }
     
    +  def collectCountAndIterator(): (Long, Iterator[T]) =
    --- End diff --
    
    Add description to collectCountAndIterator, but don't make it public or experimental yet.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    ok to test


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by Dooyoung-Hwang <gi...@git.apache.org>.
Github user Dooyoung-Hwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r225500776
  
    --- Diff: sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala ---
    @@ -120,10 +120,11 @@ private[hive] class SparkExecuteStatementOperation(
             resultList = None
             result.toLocalIterator.asScala
           } else {
    -        if (resultList.isEmpty) {
    -          resultList = Some(result.collect())
    --- End diff --
    
    Done


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r212856186
  
    --- Diff: sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala ---
    @@ -289,6 +289,14 @@ private[hive] class SparkExecuteStatementOperation(
           sqlContext.sparkContext.cancelJobGroup(statementId)
         }
       }
    +
    +  private def getResultIterator(): Iterator[SparkRow] = {
    +    val (totalRowCount, iterResult) = result.collectCountAndIterator()
    +    val batchCollectLimit =
    +      sqlContext.getConf(SQLConf.THRIFTSERVER_BATCH_COLLECTION_LIMIT.key).toLong
    +    resultList = if (totalRowCount < batchCollectLimit) Some(iterResult.toArray) else None
    +    if (resultList.isDefined) resultList.get.iterator else iterResult
    --- End diff --
    
    When incremental collect is disabled, and users want to use `FETCH_FIRST`, we expect the returned rows are cached and can get iterators again. Now `FETCH_FIRST` will trigger re-execution no matter incremental collect or not. I think this maybe performance regression in some cases.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r212706680
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
    @@ -348,30 +350,30 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
             // Otherwise, interpolate the number of partitions we need to try, but overestimate
             // it by 50%. We also cap the estimation in the end.
             val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor, 2)
    -        if (buf.isEmpty) {
    +        if (scannedRowCount == 0) {
               numPartsToTry = partsScanned * limitScaleUpFactor
             } else {
    -          val left = n - buf.size
    +          val left = n - scannedRowCount
               // As left > 0, numPartsToTry is always >= 1
    -          numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
    +          numPartsToTry = Math.ceil(1.5 * left * partsScanned / scannedRowCount).toInt
    --- End diff --
    
    Sorry, I mean `numPartsToTry`. I also realize we can write as follows without `var`.
    
    ```
    val numPartsToTry = if (partsScanned > 0) {
      ...
    } else {
      1L
    }
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by Dooyoung-Hwang <gi...@git.apache.org>.
Github user Dooyoung-Hwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r213200433
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -3237,6 +3237,28 @@ class Dataset[T] private[sql](
         files.toSet.toArray
       }
     
    +  /**
    +   * Returns the tuple of the row count and an iterator that contains all rows in this Dataset.
    +   *
    +   * The iterator will consume as much memory as the total size of serialized results which can be
    +   * limited with the config 'spark.driver.maxResultSize'. Rows are deserialized when iterating rows
    +   * with returned iterator. Whether to collect all deserialized rows or to iterate them
    +   * incrementally can be decided with considering total rows count and driver memory.
    +   */
    +  def collectCountAndIterator(): (Long, Iterator[T]) =
    --- End diff --
    
    Ok. I agree with you.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r212857783
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -3237,6 +3237,20 @@ class Dataset[T] private[sql](
         files.toSet.toArray
       }
     
    +  def collectCountAndIterator(): (Long, Iterator[T]) =
    --- End diff --
    
    Yeah, we need to consider the decision carefully. At least, if we would decide it as public, is it better to add `@experimental`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r213645236
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
    @@ -329,49 +329,52 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
        *
        * This is modeled after `RDD.take` but never runs any job locally on the driver.
        */
    -  def executeTake(n: Int): Array[InternalRow] = {
    +  def executeTake(n: Int): Array[InternalRow] = executeTakeIterator(n)._2.toArray
    +
    +  private[spark] def executeTakeIterator(n: Int): (Long, Iterator[InternalRow]) = {
         if (n == 0) {
    -      return new Array[InternalRow](0)
    +      return (0, Iterator.empty)
         }
     
    -    val childRDD = getByteArrayRdd(n).map(_._2)
    -
    -    val buf = new ArrayBuffer[InternalRow]
    +    val childRDD = getByteArrayRdd(n)
    +    val encodedBuf = new ArrayBuffer[Array[Byte]]
         val totalParts = childRDD.partitions.length
    +    var scannedRowCount = 0L
         var partsScanned = 0
    -    while (buf.size < n && partsScanned < totalParts) {
    +    while (scannedRowCount < n && partsScanned < totalParts) {
           // The number of partitions to try in this iteration. It is ok for this number to be
           // greater than totalParts because we actually cap it at totalParts in runJob.
    -      var numPartsToTry = 1L
    -      if (partsScanned > 0) {
    +      val numPartsToTry = if (partsScanned > 0) {
             // If we didn't find any rows after the previous iteration, quadruple and retry.
             // Otherwise, interpolate the number of partitions we need to try, but overestimate
             // it by 50%. We also cap the estimation in the end.
             val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor, 2)
    -        if (buf.isEmpty) {
    -          numPartsToTry = partsScanned * limitScaleUpFactor
    +        if (scannedRowCount == 0) {
    +          partsScanned * limitScaleUpFactor
             } else {
    -          val left = n - buf.size
    +          val left = n - scannedRowCount
               // As left > 0, numPartsToTry is always >= 1
    -          numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
    -          numPartsToTry = Math.min(numPartsToTry, partsScanned * limitScaleUpFactor)
    +          Math.min(Math.ceil(1.5 * left * partsScanned / scannedRowCount).toInt,
    +            partsScanned * limitScaleUpFactor)
             }
    +      } else {
    +        1L
    --- End diff --
    
    It is also fine to revert this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97492/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r213006794
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -3237,6 +3237,28 @@ class Dataset[T] private[sql](
         files.toSet.toArray
       }
     
    +  /**
    +   * Returns the tuple of the row count and an iterator that contains all rows in this Dataset.
    +   *
    +   * The iterator will consume as much memory as the total size of serialized results which can be
    +   * limited with the config 'spark.driver.maxResultSize'. Rows are deserialized when iterating rows
    +   * with returned iterator. Whether to collect all deserialized rows or to iterate them
    +   * incrementally can be decided with considering total rows count and driver memory.
    +   */
    +  def collectCountAndIterator(): (Long, Iterator[T]) =
    --- End diff --
    
    I don't think we should expose this as an API. This JIRA/PR don't target this API anyway, right? shall we just leave it as private?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    **[Test build #97376 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97376/testReport)** for PR 22219 at commit [`f05570c`](https://github.com/apache/spark/commit/f05570c2a8fc88d35085ee8ca377785acc400cdd).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r212854113
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
    @@ -329,49 +329,52 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
        *
        * This is modeled after `RDD.take` but never runs any job locally on the driver.
        */
    -  def executeTake(n: Int): Array[InternalRow] = {
    +  def executeTake(n: Int): Array[InternalRow] = executeTakeIterator(n)._2.toArray
    +
    +  private[spark] def executeTakeIterator(n: Int): (Long, Iterator[InternalRow]) = {
    --- End diff --
    
    Please add some description for this method.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    ok to test


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    **[Test build #97388 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97388/testReport)** for PR 22219 at commit [`2a41b70`](https://github.com/apache/spark/commit/2a41b704f3aaa80aeb2d6c5b1118ee7faea5bf79).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    **[Test build #97452 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97452/testReport)** for PR 22219 at commit [`e5baa50`](https://github.com/apache/spark/commit/e5baa50daa6cd531399fe3c36242c8c06879b120).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r212850811
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -3237,6 +3237,20 @@ class Dataset[T] private[sql](
         files.toSet.toArray
       }
     
    +  def collectCountAndIterator(): (Long, Iterator[T]) =
    --- End diff --
    
    yea, why is it public


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r212664629
  
    --- Diff: sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala ---
    @@ -289,6 +289,19 @@ private[hive] class SparkExecuteStatementOperation(
           sqlContext.sparkContext.cancelJobGroup(statementId)
         }
       }
    +
    +  private def getResultIterator(): Iterator[SparkRow] = {
    +    val (totalRowCount, iterResult) = result.collectCountAndIterator()
    +    val batchCollectLimit =
    +      sqlContext.getConf(SQLConf.THRIFTSERVER_BATCH_COLLECTION_LIMIT.key).toLong
    +
    +    resultList = {
    +      if (totalRowCount < batchCollectLimit) Some(iterResult.toArray)
    --- End diff --
    
    nit: Is it better to put them into one line?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by Dooyoung-Hwang <gi...@git.apache.org>.
Github user Dooyoung-Hwang commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    Add test cases.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by Dooyoung-Hwang <gi...@git.apache.org>.
Github user Dooyoung-Hwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r212855096
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -3237,6 +3237,20 @@ class Dataset[T] private[sql](
         files.toSet.toArray
       }
     
    +  def collectCountAndIterator(): (Long, Iterator[T]) =
    --- End diff --
    
    I think it would be good to make public. There is no "action" to decode rows incrementally except toLocalIterator. toLocalIterator has poor performance for data sources that have many partitions or for selecting rows with limited count. So it would be good choice providing another option for reducing memory pressure of decompressing & deserializing result rows.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r212856674
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -3237,6 +3237,20 @@ class Dataset[T] private[sql](
         files.toSet.toArray
       }
     
    +  def collectCountAndIterator(): (Long, Iterator[T]) =
    --- End diff --
    
    We can always make it public later if there is such requirement. We should be careful to add public api.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...

Posted by Dooyoung-Hwang <gi...@git.apache.org>.
Github user Dooyoung-Hwang commented on the issue:

    https://github.com/apache/spark/pull/22219
  
    I refactored collectionResultAsSeqView function with using implicit class.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r214785499
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
    @@ -641,6 +641,16 @@ object SQLConf {
         .intConf
         .createWithDefault(200)
     
    +  val THRIFTSERVER_BATCH_DESERIALIZE_LIMIT =
    +    buildConf("spark.sql.thriftServer.batchDeserializeLimit")
    +      .doc("The maximum number of result rows that can be deserialized at one time. " +
    +        "If the number of result rows exceeds this value, the Thrift Server will only use " +
    +        "'memory of serialized rows' + 'memory of the deserialized rows being fetched to the " +
    +        "client'. Only valid if spark.sql.thriftServer.incrementalCollect is false. " +
    --- End diff --
    
    nit: `s"client'. Only valid if ${THRIFTSERVER_INCREMENTAL_COLLECT.key} is false. " +`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org