You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by viirya <gi...@git.apache.org> on 2018/09/22 10:11:43 UTC

[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

GitHub user viirya opened a pull request:

    https://github.com/apache/spark/pull/22524

    [SPARK-25497][SQL] Limit operation within whole stage codegen should not consume all the inputs

    ## What changes were proposed in this pull request?
    
    This issue was discovered during https://github.com/apache/spark/pull/21738 .
    
    It turns out that limit is not whole-stage-codegened correctly and always consume all the inputs.
    
    This patch fixes limit's whole-stage codegen. Some nodes like hash aggregate and range have loop structure that doesn't properly check the condition to stop early. It is fixed to stop consume inputs after limit number is reached.
    
    ## How was this patch tested?
    
    Added tests.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/viirya/spark-1 SPARK-25497

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22524.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22524
    
----
commit 12703bded143002be417ffa247eef4a970ffd54c
Author: Liang-Chi Hsieh <vi...@...>
Date:   2018-09-22T09:34:41Z

    limit operation within whole stage codegen should not consume all the inputs.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [WIP][SPARK-25497][SQL] Limit operation within whole sta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3400/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    **[Test build #96860 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96860/testReport)** for PR 22524 at commit [`1b2ab61`](https://github.com/apache/spark/commit/1b2ab6106f52558c48c8a82af6bda507b5189f64).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [WIP][SPARK-25497][SQL] Limit operation within whole sta...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22524#discussion_r220044271
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -71,22 +71,14 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
       }
     
       override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
    -    val stopEarly =
    -      ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "stopEarly") // init as stopEarly = false
    -
    -    ctx.addNewFunction("stopEarly", s"""
    -      @Override
    -      protected boolean stopEarly() {
    -        return $stopEarly;
    -      }
    -    """, inlineToOuterClass = true)
         val countTerm = ctx.addMutableState(CodeGenerator.JAVA_INT, "count") // init as count = 0
         s"""
            | if ($countTerm < $limit) {
            |   $countTerm += 1;
    +       |   if ($countTerm == $limit) {
    +       |     setStopEarly(true);
    --- End diff --
    
    shall we do this after `consume`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96807/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    **[Test build #96807 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96807/testReport)** for PR 22524 at commit [`ed2c269`](https://github.com/apache/spark/commit/ed2c26928bb9f4ecf634245331a9be366a0642d5).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [WIP][SPARK-25497][SQL] Limit operation within whole sta...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Is jenkins down now?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    We can always tune `if (count < given_limit)` to consume one more or less more record, isn't it?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [WIP][SPARK-25497][SQL] Limit operation within whole sta...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Got it, same with me :(


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [WIP][SPARK-25497][SQL] Limit operation within whole sta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    **[Test build #96499 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96499/testReport)** for PR 22524 at commit [`a09e60f`](https://github.com/apache/spark/commit/a09e60f1e026504657f3de7669eb79cc0b4c2c8c).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [WIP][SPARK-25497][SQL] Limit operation within whole sta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [WIP][SPARK-25497][SQL] Limit operation within whole sta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96499/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    **[Test build #96807 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96807/testReport)** for PR 22524 at commit [`ed2c269`](https://github.com/apache/spark/commit/ed2c26928bb9f4ecf634245331a9be366a0642d5).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    **[Test build #96675 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96675/testReport)** for PR 22524 at commit [`6d95b65`](https://github.com/apache/spark/commit/6d95b65c7897479151e912d919e04a96c932fe72).
     * This patch **fails Java style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    @viirya my only concern about this is that this relies on resetting the flag to false in many places. So the maintenance may become difficult. The same for `stopEarly()`, I am afraid we may forget in the future to maintain it properly and we may miss some places. But I am not sure if there is a better solution. Is there a way we can achieve the same by adding a global boolean when there is a limit operation? If there is not and other people agree with this change, I am fine with it anyway, as I don't have a better suggestion. Thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [WIP][SPARK-25497][SQL] Limit operation within whole sta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    **[Test build #96501 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96501/testReport)** for PR 22524 at commit [`a09e60f`](https://github.com/apache/spark/commit/a09e60f1e026504657f3de7669eb79cc0b4c2c8c).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22524#discussion_r219667461
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -84,9 +84,10 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
         s"""
            | if ($countTerm < $limit) {
            |   $countTerm += 1;
    +       |   if ($countTerm == $limit) {
    +       |     $stopEarly = true;
    +       |   }
            |   ${consume(ctx, input)}
    -       | } else {
    --- End diff --
    
    do we need to remove this? Isn't it safer to let it here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    @viirya thanks for adding the explanation! I think it's very clear and helpful. By reading this, I have a new idea.
    
    It seems to me that limit is mostly to stop produce data earlier for upstream operators, so the code template should look like
    ```
    while (iterator.hasNext() && !stopEarly()) {
      // upstream operators
      ...
      if (count < given_limit) {
        count += 1
        consume... // down  stream operators
      } else {
        setStopEarly(true);
      }
      ...
    }
    ```
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96675/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [WIP][SPARK-25497][SQL] Limit operation within whole sta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3599/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    > It will be great to explain how limit works in whole stage codegen, in general. This part is a little hard to understand and I believe many operators need to deal with limit as well.
    
    Ok. Let me add more explanation into the PR description.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3631/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Oh, I see. May the code template make some confusion. I'd change it. The downstream code is wrapped inside an if block. I don't clearly show how downstream operators work in codegen. Let me update it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22524#discussion_r220046213
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java ---
    @@ -73,14 +78,21 @@ public void append(InternalRow row) {
         currentRows.add(row);
       }
     
    +  /**
    +   * Sets the flag of stopping the query execution early.
    +   */
    +  public void setStopEarly(boolean value) {
    --- End diff --
    
    You also hint me that we should reset stop early flag in sort exec node too. I will add it and related test.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22524#discussion_r220039084
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java ---
    @@ -38,6 +38,11 @@
     
       protected int partitionIndex = -1;
     
    +  // This indicates whether the query execution should be stopped even the input rows are still
    +  // available. This is used in limit operator. When it reaches the given number of rows to limit,
    +  // this flag is set and the execution should be stopped.
    +  protected boolean isStopEarly = false;
    --- End diff --
    
    what if there are 2 limits in the query? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96860/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    It will be great to explain how limit works in whole stage codegen, in general. This part is a little hard to understand.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3532/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    **[Test build #96683 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96683/testReport)** for PR 22524 at commit [`6d95b65`](https://github.com/apache/spark/commit/6d95b65c7897479151e912d919e04a96c932fe72).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [WIP][SPARK-25497][SQL] Limit operation within whole sta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96501/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    hmm, in above suggested way, isn't `setStopEarly(true)` called when consuming `given_limit` + 1 records? Otherwise I may misunderstand it?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [WIP][SPARK-25497][SQL] Limit operation within whole sta...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    > Does this means you got a Reason: Error during SSL Handshake with remote server after open the jenkins link?
    
    Yes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22524#discussion_r221520640
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ---
    @@ -2850,6 +2849,80 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
         result.rdd.isEmpty
       }
     
    +  test("SPARK-25497: limit operation within whole stage codegen should not " +
    +    "consume all the inputs") {
    +
    +    val aggDF = spark.range(0, 100, 1, 1)
    +      .groupBy("id")
    +      .count().limit(1).filter('count > 0)
    +    aggDF.collect()
    +    val aggNumRecords = aggDF.queryExecution.sparkPlan.collect {
    +      case h: HashAggregateExec => h
    +    }.map { hashNode =>
    +      hashNode.metrics("numOutputRows").value
    +    }.sum
    +    // The first hash aggregate node outputs 100 records.
    +    // The second hash aggregate before local limit outputs 1 record.
    +    assert(aggNumRecords == 101)
    +
    +    val aggNoGroupingDF = spark.range(0, 100, 1, 1)
    +      .groupBy()
    +      .count().limit(1).filter('count > 0)
    +    aggNoGroupingDF.collect()
    +    val aggNoGroupingNumRecords = aggNoGroupingDF.queryExecution.sparkPlan.collect {
    +      case h: HashAggregateExec => h
    +    }.map { hashNode =>
    +      hashNode.metrics("numOutputRows").value
    +    }.sum
    +    assert(aggNoGroupingNumRecords == 2)
    +
    +    // Sets `TOP_K_SORT_FALLBACK_THRESHOLD` to a low value because we don't want sort + limit
    +    // be planned as `TakeOrderedAndProject` node.
    +    withSQLConf(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "1") {
    +      val sortDF = spark.range(0, 100, 1, 1)
    +        .filter('id >= 0)
    +        .limit(10)
    +        .sortWithinPartitions("id")
    +        // use non-deterministic expr to prevent filter be pushed down.
    +        .selectExpr("rand() + id as id2")
    +        .filter('id2 >= 0)
    +        .limit(5)
    +        .selectExpr("1 + id2 as id3")
    +      sortDF.collect()
    +      val sortNumRecords = sortDF.queryExecution.sparkPlan.collect {
    +        case l@LocalLimitExec(_, f: FilterExec) => f
    +      }.map { filterNode =>
    +        filterNode.metrics("numOutputRows").value
    +      }
    +      assert(sortNumRecords.sorted === Seq(5, 10))
    +    }
    +
    +    val filterDF = spark.range(0, 100, 1, 1).filter('id >= 0)
    +      .selectExpr("id + 1 as id2").limit(1).filter('id > 50)
    +    filterDF.collect()
    +    val filterNumRecords = filterDF.queryExecution.sparkPlan.collect {
    +      case f@FilterExec(_, r: RangeExec) => f
    +    }.map { case filterNode =>
    +      filterNode.metrics("numOutputRows").value
    +    }.head
    +    assert(filterNumRecords == 1)
    +
    +    val twoLimitsDF = spark.range(0, 100, 1, 1)
    +      .filter('id >= 0)
    +      .limit(1)
    +      .selectExpr("id + 1 as id2")
    +      .limit(2)
    +      .filter('id2 >= 0)
    +    twoLimitsDF.collect()
    +    val twoLimitsDFNumRecords = twoLimitsDF.queryExecution.sparkPlan.collect {
    +      case f@FilterExec(_, _: RangeExec) => f
    --- End diff --
    
    nit: spaces


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    **[Test build #96546 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96546/testReport)** for PR 22524 at commit [`2f4d356`](https://github.com/apache/spark/commit/2f4d356872438b609a55f177ead1ee00ea441350).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [WIP][SPARK-25497][SQL] Limit operation within whole sta...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    @viirya As @shaneknapp reply in mail-list, you can try https://hadrian.ist.berkeley.edu/jenkins/. Thanks @shaneknapp :)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [WIP][SPARK-25497][SQL] Limit operation within whole sta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22524#discussion_r220044584
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
    @@ -465,13 +465,18 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
           |   $initRangeFuncName(partitionIndex);
           | }
           |
    -      | while (true) {
    +      | while (true && !stopEarly()) {
           |   long $range = $batchEnd - $number;
           |   if ($range != 0L) {
           |     int $localEnd = (int)($range / ${step}L);
           |     for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
           |       long $value = ((long)$localIdx * ${step}L) + $number;
    +      |       $numOutput.add(1);
    --- End diff --
    
    ok. then I should revert the `numOutput` change if the number of records can be a bit inaccurate.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22524#discussion_r220044149
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
    @@ -465,13 +465,18 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
           |   $initRangeFuncName(partitionIndex);
           | }
           |
    -      | while (true) {
    +      | while (true && !stopEarly()) {
           |   long $range = $batchEnd - $number;
           |   if ($range != 0L) {
           |     int $localEnd = (int)($range / ${step}L);
           |     for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
           |       long $value = ((long)$localIdx * ${step}L) + $number;
    +      |       $numOutput.add(1);
    --- End diff --
    
    This is very likely to hit perf regression since it's not a tight loop anymore.
    
    We want the range operator to stop earlier for better performance, but it doesn't mean the range operator must return exactly the `limit` number of records. Since the range operator is already returning data in batch, I think we can stop earlier in a batch granularity.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3524/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3378/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22524#discussion_r220043421
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java ---
    @@ -73,14 +78,21 @@ public void append(InternalRow row) {
         currentRows.add(row);
       }
     
    +  /**
    +   * Sets the flag of stopping the query execution early.
    +   */
    +  public void setStopEarly(boolean value) {
    --- End diff --
    
    can we have more documents about how to use it? For now I see 2 use cases:
    1. limit operator should call it with `true` when the limit is hit
    2. blocking operator(sort, agg, etc.) should call it with `false` to reset it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22524#discussion_r220054697
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -71,22 +71,14 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
       }
     
       override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
    -    val stopEarly =
    -      ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "stopEarly") // init as stopEarly = false
    -
    -    ctx.addNewFunction("stopEarly", s"""
    -      @Override
    -      protected boolean stopEarly() {
    -        return $stopEarly;
    -      }
    -    """, inlineToOuterClass = true)
         val countTerm = ctx.addMutableState(CodeGenerator.JAVA_INT, "count") // init as count = 0
         s"""
            | if ($countTerm < $limit) {
            |   $countTerm += 1;
    +       |   if ($countTerm == $limit) {
    +       |     setStopEarly(true);
    --- End diff --
    
    Actually as I'm just looking at the query again, there should not be a `stopEarly` check inside `consume` that prevents us to consume the last record. Because the check should be at the outer while loop.
    
    The cases having `stopEarly` check inside `consume`, is blocking operators like sort and aggregate, for them we need to reset the flag.
    
    But for safety, I think I will also move this after `consume`.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22524: [WIP][SPARK-25497][SQL] Limit operation within wh...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22524#discussion_r219690413
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
    @@ -465,13 +465,18 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
           |   $initRangeFuncName(partitionIndex);
           | }
           |
    -      | while (true) {
    +      | while (true && !stopEarly()) {
           |   long $range = $batchEnd - $number;
           |   if ($range != 0L) {
           |     int $localEnd = (int)($range / ${step}L);
           |     for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
           |       long $value = ((long)$localIdx * ${step}L) + $number;
    +      |       $numOutput.add(1);
    --- End diff --
    
    I have no worry about it since it is a simple op.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    **[Test build #96471 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96471/testReport)** for PR 22524 at commit [`12703bd`](https://github.com/apache/spark/commit/12703bded143002be417ffa247eef4a970ffd54c).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96471/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22524#discussion_r220046724
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -71,22 +71,14 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
       }
     
       override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
    -    val stopEarly =
    -      ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "stopEarly") // init as stopEarly = false
    -
    -    ctx.addNewFunction("stopEarly", s"""
    -      @Override
    -      protected boolean stopEarly() {
    -        return $stopEarly;
    -      }
    -    """, inlineToOuterClass = true)
         val countTerm = ctx.addMutableState(CodeGenerator.JAVA_INT, "count") // init as count = 0
         s"""
            | if ($countTerm < $limit) {
            |   $countTerm += 1;
    +       |   if ($countTerm == $limit) {
    +       |     setStopEarly(true);
    --- End diff --
    
    `if ($countTerm == $limit)` means this is the last record, and we should still consume it?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22524#discussion_r219667410
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
    @@ -465,13 +465,18 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
           |   $initRangeFuncName(partitionIndex);
           | }
           |
    -      | while (true) {
    +      | while (true && !stopEarly()) {
           |   long $range = $batchEnd - $number;
           |   if ($range != 0L) {
           |     int $localEnd = (int)($range / ${step}L);
           |     for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
           |       long $value = ((long)$localIdx * ${step}L) + $number;
    +      |       $numOutput.add(1);
    --- End diff --
    
    can this introduce a perf regression?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Thanks @mgaido91. Resetting a flag in blocking operators is feasible solution I think for now to solve this issue that limit operator consumes all inputs. For current blocking operators, we should add enough regression tests. I think we won't frequently add new operators, although I agree that the maintenance might be a potential issue.
    
    I'm not sure about a global boolean is significant different than current flag. This flag is already a global boolean variable.
    
    Fortunately I think this might not be so urgent to fix (is it?), so we may be able to wait for more options.
    
    
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22524#discussion_r220044740
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -71,22 +71,14 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
       }
     
       override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
    -    val stopEarly =
    -      ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "stopEarly") // init as stopEarly = false
    -
    -    ctx.addNewFunction("stopEarly", s"""
    -      @Override
    -      protected boolean stopEarly() {
    -        return $stopEarly;
    -      }
    -    """, inlineToOuterClass = true)
         val countTerm = ctx.addMutableState(CodeGenerator.JAVA_INT, "count") // init as count = 0
         s"""
            | if ($countTerm < $limit) {
            |   $countTerm += 1;
    +       |   if ($countTerm == $limit) {
    +       |     setStopEarly(true);
    --- End diff --
    
    won't we call `shouldStop` inside `consume`? if it does, `stopEarly` will not be set.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    cc @cloud-fan 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    In favor of #22630, so close this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [WIP][SPARK-25497][SQL] Limit operation within whole sta...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    @xuanyuanking Thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22524#discussion_r220046092
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java ---
    @@ -73,14 +78,21 @@ public void append(InternalRow row) {
         currentRows.add(row);
       }
     
    +  /**
    +   * Sets the flag of stopping the query execution early.
    +   */
    +  public void setStopEarly(boolean value) {
    --- End diff --
    
    Ok. Let me add it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    **[Test build #96683 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96683/testReport)** for PR 22524 at commit [`6d95b65`](https://github.com/apache/spark/commit/6d95b65c7897479151e912d919e04a96c932fe72).
     * This patch **fails Java style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    **[Test build #96860 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96860/testReport)** for PR 22524 at commit [`1b2ab61`](https://github.com/apache/spark/commit/1b2ab6106f52558c48c8a82af6bda507b5189f64).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    I didn't look into it, but we can change `if (count < given_limit)` to `if (count < given_limit - 1)` if you are right.
    
    My focus is the code template, without the `if else`, how can the downstream operators stop consuming data?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [WIP][SPARK-25497][SQL] Limit operation within whole sta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    **[Test build #96499 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96499/testReport)** for PR 22524 at commit [`a09e60f`](https://github.com/apache/spark/commit/a09e60f1e026504657f3de7669eb79cc0b4c2c8c).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22524: [WIP][SPARK-25497][SQL] Limit operation within wh...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22524#discussion_r219731695
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala ---
    @@ -556,7 +556,7 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext {
           Seq(Row(1, 2, Seq("a", "b")), Row(3, 2, Seq("c", "c", "d"))))
       }
     
    -  test("SPARK-18004 limit + aggregates") {
    +  test("SPARK-18528 limit + aggregates") {
    --- End diff --
    
    This JIRA number is wrong.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [WIP][SPARK-25497][SQL] Limit operation within whole sta...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    ```
    Is jenkins down now?
    ```
    Does this means you got a `Reason: Error during SSL Handshake with remote server` after open the jenkins link?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96683/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22524: [WIP][SPARK-25497][SQL] Limit operation within wh...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22524#discussion_r219690438
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -84,9 +84,10 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
         s"""
            | if ($countTerm < $limit) {
            |   $countTerm += 1;
    +       |   if ($countTerm == $limit) {
    +       |     $stopEarly = true;
    +       |   }
            |   ${consume(ctx, input)}
    -       | } else {
    --- End diff --
    
    I think we don't execute into it. If we do, there should be a bug.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22524#discussion_r221520772
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ---
    @@ -2850,6 +2849,80 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
         result.rdd.isEmpty
       }
     
    +  test("SPARK-25497: limit operation within whole stage codegen should not " +
    +    "consume all the inputs") {
    +
    +    val aggDF = spark.range(0, 100, 1, 1)
    +      .groupBy("id")
    +      .count().limit(1).filter('count > 0)
    +    aggDF.collect()
    +    val aggNumRecords = aggDF.queryExecution.sparkPlan.collect {
    +      case h: HashAggregateExec => h
    +    }.map { hashNode =>
    +      hashNode.metrics("numOutputRows").value
    +    }.sum
    +    // The first hash aggregate node outputs 100 records.
    +    // The second hash aggregate before local limit outputs 1 record.
    +    assert(aggNumRecords == 101)
    +
    +    val aggNoGroupingDF = spark.range(0, 100, 1, 1)
    +      .groupBy()
    +      .count().limit(1).filter('count > 0)
    +    aggNoGroupingDF.collect()
    +    val aggNoGroupingNumRecords = aggNoGroupingDF.queryExecution.sparkPlan.collect {
    +      case h: HashAggregateExec => h
    +    }.map { hashNode =>
    +      hashNode.metrics("numOutputRows").value
    +    }.sum
    +    assert(aggNoGroupingNumRecords == 2)
    +
    +    // Sets `TOP_K_SORT_FALLBACK_THRESHOLD` to a low value because we don't want sort + limit
    +    // be planned as `TakeOrderedAndProject` node.
    +    withSQLConf(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "1") {
    +      val sortDF = spark.range(0, 100, 1, 1)
    +        .filter('id >= 0)
    +        .limit(10)
    +        .sortWithinPartitions("id")
    +        // use non-deterministic expr to prevent filter be pushed down.
    +        .selectExpr("rand() + id as id2")
    +        .filter('id2 >= 0)
    +        .limit(5)
    +        .selectExpr("1 + id2 as id3")
    +      sortDF.collect()
    +      val sortNumRecords = sortDF.queryExecution.sparkPlan.collect {
    +        case l@LocalLimitExec(_, f: FilterExec) => f
    --- End diff --
    
    nit: spaces


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

Posted by viirya <gi...@git.apache.org>.
Github user viirya closed the pull request at:

    https://github.com/apache/spark/pull/22524


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22524#discussion_r220048264
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -71,22 +71,14 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
       }
     
       override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
    -    val stopEarly =
    -      ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "stopEarly") // init as stopEarly = false
    -
    -    ctx.addNewFunction("stopEarly", s"""
    -      @Override
    -      protected boolean stopEarly() {
    -        return $stopEarly;
    -      }
    -    """, inlineToOuterClass = true)
         val countTerm = ctx.addMutableState(CodeGenerator.JAVA_INT, "count") // init as count = 0
         s"""
            | if ($countTerm < $limit) {
            |   $countTerm += 1;
    +       |   if ($countTerm == $limit) {
    +       |     setStopEarly(true);
    --- End diff --
    
    Oh, I see. And I think `shouldStop` shouldn't be called inside `consume`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22524#discussion_r221520801
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ---
    @@ -2850,6 +2849,80 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
         result.rdd.isEmpty
       }
     
    +  test("SPARK-25497: limit operation within whole stage codegen should not " +
    +    "consume all the inputs") {
    +
    +    val aggDF = spark.range(0, 100, 1, 1)
    +      .groupBy("id")
    +      .count().limit(1).filter('count > 0)
    +    aggDF.collect()
    +    val aggNumRecords = aggDF.queryExecution.sparkPlan.collect {
    +      case h: HashAggregateExec => h
    +    }.map { hashNode =>
    +      hashNode.metrics("numOutputRows").value
    +    }.sum
    +    // The first hash aggregate node outputs 100 records.
    +    // The second hash aggregate before local limit outputs 1 record.
    +    assert(aggNumRecords == 101)
    +
    +    val aggNoGroupingDF = spark.range(0, 100, 1, 1)
    +      .groupBy()
    +      .count().limit(1).filter('count > 0)
    +    aggNoGroupingDF.collect()
    +    val aggNoGroupingNumRecords = aggNoGroupingDF.queryExecution.sparkPlan.collect {
    +      case h: HashAggregateExec => h
    +    }.map { hashNode =>
    +      hashNode.metrics("numOutputRows").value
    +    }.sum
    +    assert(aggNoGroupingNumRecords == 2)
    +
    +    // Sets `TOP_K_SORT_FALLBACK_THRESHOLD` to a low value because we don't want sort + limit
    +    // be planned as `TakeOrderedAndProject` node.
    +    withSQLConf(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "1") {
    +      val sortDF = spark.range(0, 100, 1, 1)
    +        .filter('id >= 0)
    +        .limit(10)
    +        .sortWithinPartitions("id")
    +        // use non-deterministic expr to prevent filter be pushed down.
    +        .selectExpr("rand() + id as id2")
    +        .filter('id2 >= 0)
    +        .limit(5)
    +        .selectExpr("1 + id2 as id3")
    +      sortDF.collect()
    +      val sortNumRecords = sortDF.queryExecution.sparkPlan.collect {
    +        case l@LocalLimitExec(_, f: FilterExec) => f
    +      }.map { filterNode =>
    +        filterNode.metrics("numOutputRows").value
    +      }
    +      assert(sortNumRecords.sorted === Seq(5, 10))
    +    }
    +
    +    val filterDF = spark.range(0, 100, 1, 1).filter('id >= 0)
    +      .selectExpr("id + 1 as id2").limit(1).filter('id > 50)
    +    filterDF.collect()
    +    val filterNumRecords = filterDF.queryExecution.sparkPlan.collect {
    +      case f@FilterExec(_, r: RangeExec) => f
    --- End diff --
    
    nit: spaces


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    ping @cloud-fan @mgaido91 Any more comments or questions on this change?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3431/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96546/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [WIP][SPARK-25497][SQL] Limit operation within whole sta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Doesn't this way consume one more record than given limit number?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    **[Test build #96471 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96471/testReport)** for PR 22524 at commit [`12703bd`](https://github.com/apache/spark/commit/12703bded143002be417ffa247eef4a970ffd54c).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22524#discussion_r220040370
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java ---
    @@ -38,6 +38,11 @@
     
       protected int partitionIndex = -1;
     
    +  // This indicates whether the query execution should be stopped even the input rows are still
    +  // available. This is used in limit operator. When it reaches the given number of rows to limit,
    +  // this flag is set and the execution should be stopped.
    +  protected boolean isStopEarly = false;
    --- End diff --
    
    I've added a test for 2 limits.
    
    When any of 2 limits sets `isStopEarly`, I think the execution should be stopped. Is there any case opposite to this?
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    **[Test build #96675 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96675/testReport)** for PR 22524 at commit [`6d95b65`](https://github.com/apache/spark/commit/6d95b65c7897479151e912d919e04a96c932fe72).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [WIP][SPARK-25497][SQL] Limit operation within whole sta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3398/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [WIP][SPARK-25497][SQL] Limit operation within whole sta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    **[Test build #96501 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96501/testReport)** for PR 22524 at commit [`a09e60f`](https://github.com/apache/spark/commit/a09e60f1e026504657f3de7669eb79cc0b4c2c8c).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22524: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22524
  
    **[Test build #96546 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96546/testReport)** for PR 22524 at commit [`2f4d356`](https://github.com/apache/spark/commit/2f4d356872438b609a55f177ead1ee00ea441350).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org