You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by tdas <gi...@git.apache.org> on 2018/04/23 06:23:54 UTC

[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/21126

    [SPARK-24050][SS] Calculate input / processing rates correctly for DataSourceV2 streaming sources

    ## What changes were proposed in this pull request?
    
    In some streaming queries, the input and processing rates are not calculated at all (shows up as zero) because MicroBatchExecution fails to associated metrics from the executed plan of a trigger with the sources in the logical plan of the trigger. The way this executed-plan-leaf-to-logical-source attribution works is as follows. With V1 sources, there was no way to identify which execution plan leaves were generated by a streaming source. So did a best-effort attempt to match logical and execution plan leaves when the number of leaves were same. In cases where the number of leaves is different, we just give up and report zero rates. An example where this may happen is as follows.
    
    ```
    val cachedStaticDF = someStaticDF.union(anotherStaticDF).cache()
    val streamingInputDF = ...
    
    val query = streamingInputDF.join(cachedStaticDF).writeStream....
    ```
    In this case, the `cachedStaticDF` has multiple logical leaves, but in the trigger's execution plan it only has leaf because a cached subplan is represented as a single InMemoryTableScanExec leaf. This leads to a mismatch in the number of leaves causing the input rates to be computed as zero. 
    
    With DataSourceV2, all inputs are represented in the executed plan using `DataSourceV2ScanExec`, each of which has a reference to the associated logical `DataSource` and `DataSourceReader`. So its easy to associate the metrics to the original streaming sources.
    
    In this PR, the solution is as follows. If all the streaming sources in a streaming query as v2 sources, then use a new code path where the execution-metrics-to-source mapping is done directly. Otherwise we fall back to existing mapping logic.
    
    ## How was this patch tested?
    - New unit tests using V2 memory source
    - Existing unit tests using V1 source


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark SPARK-24050

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21126.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21126
    
----
commit d485db8ec70a8bd8e2fff488e75be08d384ceef0
Author: Tathagata Das <ta...@...>
Date:   2018-04-23T06:20:14Z

    SPARK-24050

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2666/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    **[Test build #89834 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89834/testReport)** for PR 21126 at commit [`a56707f`](https://github.com/apache/spark/commit/a56707f18989c76582c1e5238f2a80bd8a13b2a0).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21126#discussion_r183559926
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala ---
    @@ -207,62 +209,92 @@ trait ProgressReporter extends Logging {
           return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
         }
     
    -    // We want to associate execution plan leaves to sources that generate them, so that we match
    -    // the their metrics (e.g. numOutputRows) to the sources. To do this we do the following.
    -    // Consider the translation from the streaming logical plan to the final executed plan.
    -    //
    -    //  streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan
    -    //
    -    // 1. We keep track of streaming sources associated with each leaf in the trigger's logical plan
    -    //    - Each logical plan leaf will be associated with a single streaming source.
    -    //    - There can be multiple logical plan leaves associated with a streaming source.
    -    //    - There can be leaves not associated with any streaming source, because they were
    -    //      generated from a batch source (e.g. stream-batch joins)
    -    //
    -    // 2. Assuming that the executed plan has same number of leaves in the same order as that of
    -    //    the trigger logical plan, we associate executed plan leaves with corresponding
    -    //    streaming sources.
    -    //
    -    // 3. For each source, we sum the metrics of the associated execution plan leaves.
    -    //
    -    val logicalPlanLeafToSource = newData.flatMap { case (source, logicalPlan) =>
    -      logicalPlan.collectLeaves().map { leaf => leaf -> source }
    +    val numInputRows = extractSourceToNumInputRows()
    +
    +    val eventTimeStats = lastExecution.executedPlan.collect {
    +      case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
    +        val stats = e.eventTimeStats.value
    +        Map(
    +          "max" -> stats.max,
    +          "min" -> stats.min,
    +          "avg" -> stats.avg.toLong).mapValues(formatTimestamp)
    +    }.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
    +
    +    ExecutionStats(numInputRows, stateOperators, eventTimeStats)
    +  }
    +
    +  /** Extract number of input sources for each streaming source in plan */
    +  private def extractSourceToNumInputRows(): Map[BaseStreamingSource, Long] = {
    +
    +    def sumRows(tuples: Seq[(BaseStreamingSource, Long)]): Map[BaseStreamingSource, Long] = {
    +      tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source
         }
    -    val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming
    -    val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
    -    val numInputRows: Map[BaseStreamingSource, Long] =
    +
    +    val onlyDataSourceV2Sources = {
    +      // Check whether the streaming query's logical plan has only V2 data sources
    +      val allStreamingLeaves =
    +        logicalPlan.collect { case s: StreamingExecutionRelation => s }
    +      allStreamingLeaves.forall { _.source.isInstanceOf[MicroBatchReader] }
    --- End diff --
    
    Maybe i can make it work for continuous as well with a small tweak



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    **[Test build #89707 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89707/testReport)** for PR 21126 at commit [`d485db8`](https://github.com/apache/spark/commit/d485db8ec70a8bd8e2fff488e75be08d384ceef0).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89702/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    **[Test build #89766 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89766/testReport)** for PR 21126 at commit [`855e24d`](https://github.com/apache/spark/commit/855e24d3edc9b3febb9e117cfaad1f16ecc1be5d).
     * This patch **fails to generate documentation**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    lgtm


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    **[Test build #89702 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89702/testReport)** for PR 21126 at commit [`d485db8`](https://github.com/apache/spark/commit/d485db8ec70a8bd8e2fff488e75be08d384ceef0).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    **[Test build #89702 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89702/testReport)** for PR 21126 at commit [`d485db8`](https://github.com/apache/spark/commit/d485db8ec70a8bd8e2fff488e75be08d384ceef0).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2658/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    **[Test build #89777 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89777/testReport)** for PR 21126 at commit [`6d80cfd`](https://github.com/apache/spark/commit/6d80cfd9c70f9be83a7c551750f791947bd3387f).
     * This patch **fails to generate documentation**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89766/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2576/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    **[Test build #89707 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89707/testReport)** for PR 21126 at commit [`d485db8`](https://github.com/apache/spark/commit/d485db8ec70a8bd8e2fff488e75be08d384ceef0).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    **[Test build #89811 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89811/testReport)** for PR 21126 at commit [`a56707f`](https://github.com/apache/spark/commit/a56707f18989c76582c1e5238f2a80bd8a13b2a0).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    **[Test build #89825 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89825/testReport)** for PR 21126 at commit [`a56707f`](https://github.com/apache/spark/commit/a56707f18989c76582c1e5238f2a80bd8a13b2a0).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89810/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21126#discussion_r183559395
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala ---
    @@ -492,6 +492,77 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
         assert(progress.sources(0).numInputRows === 10)
       }
     
    +
    +  test("input row calculation with trigger having data for one of two V2 sources") {
    +    val streamInput1 = MemoryStream[Int]
    +    val streamInput2 = MemoryStream[Int]
    +
    +    testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = true)(
    +      AddData(streamInput1, 1, 2, 3),
    +      CheckAnswer(1, 2, 3),
    +      AssertOnQuery { q =>
    +        val lastProgress = getLastProgressWithData(q)
    +        assert(lastProgress.nonEmpty)
    +        assert(lastProgress.get.numInputRows == 3)
    +        assert(lastProgress.get.sources.length == 2)
    +        assert(lastProgress.get.sources(0).numInputRows == 3)
    +        assert(lastProgress.get.sources(1).numInputRows == 0)
    +        true
    +      }
    +    )
    +  }
    +
    +  test("input row calculation with mixed batch and streaming V2 sources") {
    +
    +    val streamInput = MemoryStream[Int]
    +    val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> "2")).toDF("value", "anotherValue")
    +
    +    testStream(streamInput.toDF().join(staticInputDF, "value"), useV2Sink = true)(
    +      AddData(streamInput, 1, 2, 3),
    +      AssertOnQuery { q =>
    +        q.processAllAvailable()
    +
    +        // The number of leaves in the trigger's logical plan should be same as the executed plan.
    +        require(
    +          q.lastExecution.logical.collectLeaves().length ==
    +            q.lastExecution.executedPlan.collectLeaves().length)
    +
    +        val lastProgress = getLastProgressWithData(q)
    +        assert(lastProgress.nonEmpty)
    +        assert(lastProgress.get.numInputRows == 3)
    +        assert(lastProgress.get.sources.length == 1)
    +        assert(lastProgress.get.sources(0).numInputRows == 3)
    +        true
    +      }
    +    )
    +
    +    val streamInput2 = MemoryStream[Int]
    +    val staticInputDF2 = staticInputDF.union(staticInputDF).cache()
    +
    +    testStream(streamInput2.toDF().join(staticInputDF2, "value"), useV2Sink = true)(
    --- End diff --
    
    then there will be two DataSourceScanV2Execs reading from the same location. So we will be reading data twice, and the counts will reflect that. But yes, I should add a test for that.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21126#discussion_r183533297
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala ---
    @@ -207,62 +209,92 @@ trait ProgressReporter extends Logging {
           return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
         }
     
    -    // We want to associate execution plan leaves to sources that generate them, so that we match
    -    // the their metrics (e.g. numOutputRows) to the sources. To do this we do the following.
    -    // Consider the translation from the streaming logical plan to the final executed plan.
    -    //
    -    //  streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan
    -    //
    -    // 1. We keep track of streaming sources associated with each leaf in the trigger's logical plan
    -    //    - Each logical plan leaf will be associated with a single streaming source.
    -    //    - There can be multiple logical plan leaves associated with a streaming source.
    -    //    - There can be leaves not associated with any streaming source, because they were
    -    //      generated from a batch source (e.g. stream-batch joins)
    -    //
    -    // 2. Assuming that the executed plan has same number of leaves in the same order as that of
    -    //    the trigger logical plan, we associate executed plan leaves with corresponding
    -    //    streaming sources.
    -    //
    -    // 3. For each source, we sum the metrics of the associated execution plan leaves.
    -    //
    -    val logicalPlanLeafToSource = newData.flatMap { case (source, logicalPlan) =>
    -      logicalPlan.collectLeaves().map { leaf => leaf -> source }
    +    val numInputRows = extractSourceToNumInputRows()
    +
    +    val eventTimeStats = lastExecution.executedPlan.collect {
    +      case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
    +        val stats = e.eventTimeStats.value
    +        Map(
    +          "max" -> stats.max,
    +          "min" -> stats.min,
    +          "avg" -> stats.avg.toLong).mapValues(formatTimestamp)
    +    }.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
    +
    +    ExecutionStats(numInputRows, stateOperators, eventTimeStats)
    +  }
    +
    +  /** Extract number of input sources for each streaming source in plan */
    +  private def extractSourceToNumInputRows(): Map[BaseStreamingSource, Long] = {
    +
    +    def sumRows(tuples: Seq[(BaseStreamingSource, Long)]): Map[BaseStreamingSource, Long] = {
    +      tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source
         }
    -    val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming
    -    val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
    -    val numInputRows: Map[BaseStreamingSource, Long] =
    +
    +    val onlyDataSourceV2Sources = {
    +      // Check whether the streaming query's logical plan has only V2 data sources
    +      val allStreamingLeaves =
    +        logicalPlan.collect { case s: StreamingExecutionRelation => s }
    +      allStreamingLeaves.forall { _.source.isInstanceOf[MicroBatchReader] }
    --- End diff --
    
    we don't have a way to track these for ContinuousProcessing at the moment?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21126#discussion_r183559165
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala ---
    @@ -492,6 +492,77 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
         assert(progress.sources(0).numInputRows === 10)
       }
     
    +
    +  test("input row calculation with trigger having data for one of two V2 sources") {
    +    val streamInput1 = MemoryStream[Int]
    +    val streamInput2 = MemoryStream[Int]
    +
    +    testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = true)(
    +      AddData(streamInput1, 1, 2, 3),
    +      CheckAnswer(1, 2, 3),
    +      AssertOnQuery { q =>
    +        val lastProgress = getLastProgressWithData(q)
    +        assert(lastProgress.nonEmpty)
    +        assert(lastProgress.get.numInputRows == 3)
    +        assert(lastProgress.get.sources.length == 2)
    +        assert(lastProgress.get.sources(0).numInputRows == 3)
    +        assert(lastProgress.get.sources(1).numInputRows == 0)
    +        true
    +      }
    +    )
    +  }
    +
    +  test("input row calculation with mixed batch and streaming V2 sources") {
    +
    +    val streamInput = MemoryStream[Int]
    +    val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> "2")).toDF("value", "anotherValue")
    +
    +    testStream(streamInput.toDF().join(staticInputDF, "value"), useV2Sink = true)(
    +      AddData(streamInput, 1, 2, 3),
    +      AssertOnQuery { q =>
    +        q.processAllAvailable()
    +
    +        // The number of leaves in the trigger's logical plan should be same as the executed plan.
    +        require(
    +          q.lastExecution.logical.collectLeaves().length ==
    +            q.lastExecution.executedPlan.collectLeaves().length)
    +
    +        val lastProgress = getLastProgressWithData(q)
    +        assert(lastProgress.nonEmpty)
    +        assert(lastProgress.get.numInputRows == 3)
    +        assert(lastProgress.get.sources.length == 1)
    +        assert(lastProgress.get.sources(0).numInputRows == 3)
    +        true
    +      }
    +    )
    +
    +    val streamInput2 = MemoryStream[Int]
    +    val staticInputDF2 = staticInputDF.union(staticInputDF).cache()
    --- End diff --
    
    really doesnt matter as the testsuite will shutdown the sparkcontext anyways.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2614/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89777/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21126#discussion_r183660795
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala ---
    @@ -207,62 +209,92 @@ trait ProgressReporter extends Logging {
           return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
         }
     
    -    // We want to associate execution plan leaves to sources that generate them, so that we match
    -    // the their metrics (e.g. numOutputRows) to the sources. To do this we do the following.
    -    // Consider the translation from the streaming logical plan to the final executed plan.
    -    //
    -    //  streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan
    -    //
    -    // 1. We keep track of streaming sources associated with each leaf in the trigger's logical plan
    -    //    - Each logical plan leaf will be associated with a single streaming source.
    -    //    - There can be multiple logical plan leaves associated with a streaming source.
    -    //    - There can be leaves not associated with any streaming source, because they were
    -    //      generated from a batch source (e.g. stream-batch joins)
    -    //
    -    // 2. Assuming that the executed plan has same number of leaves in the same order as that of
    -    //    the trigger logical plan, we associate executed plan leaves with corresponding
    -    //    streaming sources.
    -    //
    -    // 3. For each source, we sum the metrics of the associated execution plan leaves.
    -    //
    -    val logicalPlanLeafToSource = newData.flatMap { case (source, logicalPlan) =>
    -      logicalPlan.collectLeaves().map { leaf => leaf -> source }
    +    val numInputRows = extractSourceToNumInputRows()
    +
    +    val eventTimeStats = lastExecution.executedPlan.collect {
    +      case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
    +        val stats = e.eventTimeStats.value
    +        Map(
    +          "max" -> stats.max,
    +          "min" -> stats.min,
    +          "avg" -> stats.avg.toLong).mapValues(formatTimestamp)
    +    }.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
    +
    +    ExecutionStats(numInputRows, stateOperators, eventTimeStats)
    +  }
    +
    +  /** Extract number of input sources for each streaming source in plan */
    +  private def extractSourceToNumInputRows(): Map[BaseStreamingSource, Long] = {
    +
    +    def sumRows(tuples: Seq[(BaseStreamingSource, Long)]): Map[BaseStreamingSource, Long] = {
    +      tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source
         }
    -    val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming
    -    val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
    -    val numInputRows: Map[BaseStreamingSource, Long] =
    +
    +    val onlyDataSourceV2Sources = {
    +      // Check whether the streaming query's logical plan has only V2 data sources
    +      val allStreamingLeaves =
    +        logicalPlan.collect { case s: StreamingExecutionRelation => s }
    +      allStreamingLeaves.forall { _.source.isInstanceOf[MicroBatchReader] }
    --- End diff --
    
    Yeah. This code path is not used by continuous processing. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89811/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21126#discussion_r183604136
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala ---
    @@ -207,62 +209,92 @@ trait ProgressReporter extends Logging {
           return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
         }
     
    -    // We want to associate execution plan leaves to sources that generate them, so that we match
    -    // the their metrics (e.g. numOutputRows) to the sources. To do this we do the following.
    -    // Consider the translation from the streaming logical plan to the final executed plan.
    -    //
    -    //  streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan
    -    //
    -    // 1. We keep track of streaming sources associated with each leaf in the trigger's logical plan
    -    //    - Each logical plan leaf will be associated with a single streaming source.
    -    //    - There can be multiple logical plan leaves associated with a streaming source.
    -    //    - There can be leaves not associated with any streaming source, because they were
    -    //      generated from a batch source (e.g. stream-batch joins)
    -    //
    -    // 2. Assuming that the executed plan has same number of leaves in the same order as that of
    -    //    the trigger logical plan, we associate executed plan leaves with corresponding
    -    //    streaming sources.
    -    //
    -    // 3. For each source, we sum the metrics of the associated execution plan leaves.
    -    //
    -    val logicalPlanLeafToSource = newData.flatMap { case (source, logicalPlan) =>
    -      logicalPlan.collectLeaves().map { leaf => leaf -> source }
    +    val numInputRows = extractSourceToNumInputRows()
    +
    +    val eventTimeStats = lastExecution.executedPlan.collect {
    +      case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
    +        val stats = e.eventTimeStats.value
    +        Map(
    +          "max" -> stats.max,
    +          "min" -> stats.min,
    +          "avg" -> stats.avg.toLong).mapValues(formatTimestamp)
    +    }.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
    +
    +    ExecutionStats(numInputRows, stateOperators, eventTimeStats)
    +  }
    +
    +  /** Extract number of input sources for each streaming source in plan */
    +  private def extractSourceToNumInputRows(): Map[BaseStreamingSource, Long] = {
    +
    +    def sumRows(tuples: Seq[(BaseStreamingSource, Long)]): Map[BaseStreamingSource, Long] = {
    +      tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source
         }
    -    val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming
    -    val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
    -    val numInputRows: Map[BaseStreamingSource, Long] =
    +
    +    val onlyDataSourceV2Sources = {
    +      // Check whether the streaming query's logical plan has only V2 data sources
    +      val allStreamingLeaves =
    +        logicalPlan.collect { case s: StreamingExecutionRelation => s }
    +      allStreamingLeaves.forall { _.source.isInstanceOf[MicroBatchReader] }
    --- End diff --
    
    A point fix here won't be sufficient - right now the metrics don't make it to the driver at all in continuous processing.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    **[Test build #89810 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89810/testReport)** for PR 21126 at commit [`7d79dce`](https://github.com/apache/spark/commit/7d79dce972c2209fb9cb9388f8a979bb52beff66).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    @brkyvz @jose-torres 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21126#discussion_r183542025
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala ---
    @@ -733,6 +804,11 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
         }
       }
     
    +  def getLastProgressWithData(q: StreamingQuery): Option[StreamingQueryProgress] = {
    +    q.recentProgress.filter(_.numInputRows > 0).lastOption
    +  }
    +
    +
    --- End diff --
    
    nit: extra line


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    **[Test build #89766 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89766/testReport)** for PR 21126 at commit [`855e24d`](https://github.com/apache/spark/commit/855e24d3edc9b3febb9e117cfaad1f16ecc1be5d).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21126#discussion_r183542307
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala ---
    @@ -492,6 +492,77 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
         assert(progress.sources(0).numInputRows === 10)
       }
     
    +
    +  test("input row calculation with trigger having data for one of two V2 sources") {
    +    val streamInput1 = MemoryStream[Int]
    +    val streamInput2 = MemoryStream[Int]
    +
    +    testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = true)(
    +      AddData(streamInput1, 1, 2, 3),
    +      CheckAnswer(1, 2, 3),
    +      AssertOnQuery { q =>
    +        val lastProgress = getLastProgressWithData(q)
    +        assert(lastProgress.nonEmpty)
    +        assert(lastProgress.get.numInputRows == 3)
    +        assert(lastProgress.get.sources.length == 2)
    +        assert(lastProgress.get.sources(0).numInputRows == 3)
    +        assert(lastProgress.get.sources(1).numInputRows == 0)
    +        true
    +      }
    +    )
    +  }
    +
    +  test("input row calculation with mixed batch and streaming V2 sources") {
    +
    +    val streamInput = MemoryStream[Int]
    +    val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> "2")).toDF("value", "anotherValue")
    +
    +    testStream(streamInput.toDF().join(staticInputDF, "value"), useV2Sink = true)(
    +      AddData(streamInput, 1, 2, 3),
    +      AssertOnQuery { q =>
    +        q.processAllAvailable()
    +
    +        // The number of leaves in the trigger's logical plan should be same as the executed plan.
    +        require(
    +          q.lastExecution.logical.collectLeaves().length ==
    +            q.lastExecution.executedPlan.collectLeaves().length)
    +
    +        val lastProgress = getLastProgressWithData(q)
    +        assert(lastProgress.nonEmpty)
    +        assert(lastProgress.get.numInputRows == 3)
    +        assert(lastProgress.get.sources.length == 1)
    +        assert(lastProgress.get.sources(0).numInputRows == 3)
    +        true
    +      }
    +    )
    +
    +    val streamInput2 = MemoryStream[Int]
    +    val staticInputDF2 = staticInputDF.union(staticInputDF).cache()
    +
    +    testStream(streamInput2.toDF().join(staticInputDF2, "value"), useV2Sink = true)(
    --- End diff --
    
    what if you do a stream-stream join?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21126#discussion_r183660623
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala ---
    @@ -492,6 +492,77 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
         assert(progress.sources(0).numInputRows === 10)
       }
     
    +
    +  test("input row calculation with trigger having data for one of two V2 sources") {
    +    val streamInput1 = MemoryStream[Int]
    +    val streamInput2 = MemoryStream[Int]
    +
    +    testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = true)(
    +      AddData(streamInput1, 1, 2, 3),
    +      CheckAnswer(1, 2, 3),
    +      AssertOnQuery { q =>
    +        val lastProgress = getLastProgressWithData(q)
    +        assert(lastProgress.nonEmpty)
    +        assert(lastProgress.get.numInputRows == 3)
    +        assert(lastProgress.get.sources.length == 2)
    +        assert(lastProgress.get.sources(0).numInputRows == 3)
    +        assert(lastProgress.get.sources(1).numInputRows == 0)
    +        true
    +      }
    +    )
    +  }
    +
    +  test("input row calculation with mixed batch and streaming V2 sources") {
    +
    +    val streamInput = MemoryStream[Int]
    +    val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> "2")).toDF("value", "anotherValue")
    +
    +    testStream(streamInput.toDF().join(staticInputDF, "value"), useV2Sink = true)(
    +      AddData(streamInput, 1, 2, 3),
    +      AssertOnQuery { q =>
    +        q.processAllAvailable()
    +
    +        // The number of leaves in the trigger's logical plan should be same as the executed plan.
    +        require(
    +          q.lastExecution.logical.collectLeaves().length ==
    +            q.lastExecution.executedPlan.collectLeaves().length)
    +
    +        val lastProgress = getLastProgressWithData(q)
    +        assert(lastProgress.nonEmpty)
    +        assert(lastProgress.get.numInputRows == 3)
    +        assert(lastProgress.get.sources.length == 1)
    +        assert(lastProgress.get.sources(0).numInputRows == 3)
    +        true
    +      }
    +    )
    +
    +    val streamInput2 = MemoryStream[Int]
    +    val staticInputDF2 = staticInputDF.union(staticInputDF).cache()
    +
    +    testStream(streamInput2.toDF().join(staticInputDF2, "value"), useV2Sink = true)(
    --- End diff --
    
    Turns one things were broken for self-joins and self-union. updated the logic and added tests for those for v2 sources.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    **[Test build #89777 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89777/testReport)** for PR 21126 at commit [`6d80cfd`](https://github.com/apache/spark/commit/6d80cfd9c70f9be83a7c551750f791947bd3387f).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    **[Test build #4159 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4159/testReport)** for PR 21126 at commit [`6d80cfd`](https://github.com/apache/spark/commit/6d80cfd9c70f9be83a7c551750f791947bd3387f).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2645/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    jenkins retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    **[Test build #89825 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89825/testReport)** for PR 21126 at commit [`a56707f`](https://github.com/apache/spark/commit/a56707f18989c76582c1e5238f2a80bd8a13b2a0).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2624/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21126#discussion_r183605577
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala ---
    @@ -492,6 +492,77 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
         assert(progress.sources(0).numInputRows === 10)
       }
     
    +
    +  test("input row calculation with trigger having data for one of two V2 sources") {
    +    val streamInput1 = MemoryStream[Int]
    +    val streamInput2 = MemoryStream[Int]
    +
    +    testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = true)(
    +      AddData(streamInput1, 1, 2, 3),
    +      CheckAnswer(1, 2, 3),
    +      AssertOnQuery { q =>
    +        val lastProgress = getLastProgressWithData(q)
    +        assert(lastProgress.nonEmpty)
    +        assert(lastProgress.get.numInputRows == 3)
    +        assert(lastProgress.get.sources.length == 2)
    +        assert(lastProgress.get.sources(0).numInputRows == 3)
    +        assert(lastProgress.get.sources(1).numInputRows == 0)
    +        true
    +      }
    --- End diff --
    
    nit: i'd suggest doing an AddData() for the other stream after, to make sure there's not some weird order dependence


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    **[Test build #89834 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89834/testReport)** for PR 21126 at commit [`a56707f`](https://github.com/apache/spark/commit/a56707f18989c76582c1e5238f2a80bd8a13b2a0).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89825/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2573/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89834/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    jenkins retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89707/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    **[Test build #89811 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89811/testReport)** for PR 21126 at commit [`a56707f`](https://github.com/apache/spark/commit/a56707f18989c76582c1e5238f2a80bd8a13b2a0).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    **[Test build #4159 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4159/testReport)** for PR 21126 at commit [`6d80cfd`](https://github.com/apache/spark/commit/6d80cfd9c70f9be83a7c551750f791947bd3387f).
     * This patch **fails to generate documentation**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21126#discussion_r183541978
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala ---
    @@ -492,6 +492,77 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
         assert(progress.sources(0).numInputRows === 10)
       }
     
    +
    +  test("input row calculation with trigger having data for one of two V2 sources") {
    +    val streamInput1 = MemoryStream[Int]
    +    val streamInput2 = MemoryStream[Int]
    +
    +    testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = true)(
    +      AddData(streamInput1, 1, 2, 3),
    +      CheckAnswer(1, 2, 3),
    +      AssertOnQuery { q =>
    +        val lastProgress = getLastProgressWithData(q)
    +        assert(lastProgress.nonEmpty)
    +        assert(lastProgress.get.numInputRows == 3)
    +        assert(lastProgress.get.sources.length == 2)
    +        assert(lastProgress.get.sources(0).numInputRows == 3)
    +        assert(lastProgress.get.sources(1).numInputRows == 0)
    +        true
    +      }
    +    )
    +  }
    +
    +  test("input row calculation with mixed batch and streaming V2 sources") {
    +
    +    val streamInput = MemoryStream[Int]
    +    val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> "2")).toDF("value", "anotherValue")
    +
    +    testStream(streamInput.toDF().join(staticInputDF, "value"), useV2Sink = true)(
    +      AddData(streamInput, 1, 2, 3),
    +      AssertOnQuery { q =>
    +        q.processAllAvailable()
    +
    +        // The number of leaves in the trigger's logical plan should be same as the executed plan.
    +        require(
    +          q.lastExecution.logical.collectLeaves().length ==
    +            q.lastExecution.executedPlan.collectLeaves().length)
    +
    +        val lastProgress = getLastProgressWithData(q)
    +        assert(lastProgress.nonEmpty)
    +        assert(lastProgress.get.numInputRows == 3)
    +        assert(lastProgress.get.sources.length == 1)
    +        assert(lastProgress.get.sources(0).numInputRows == 3)
    +        true
    +      }
    +    )
    +
    +    val streamInput2 = MemoryStream[Int]
    +    val staticInputDF2 = staticInputDF.union(staticInputDF).cache()
    --- End diff --
    
    nit: unpersist later?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21126#discussion_r183897545
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala ---
    @@ -207,62 +209,126 @@ trait ProgressReporter extends Logging {
           return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
         }
     
    -    // We want to associate execution plan leaves to sources that generate them, so that we match
    -    // the their metrics (e.g. numOutputRows) to the sources. To do this we do the following.
    -    // Consider the translation from the streaming logical plan to the final executed plan.
    -    //
    -    //  streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan
    -    //
    -    // 1. We keep track of streaming sources associated with each leaf in the trigger's logical plan
    -    //    - Each logical plan leaf will be associated with a single streaming source.
    -    //    - There can be multiple logical plan leaves associated with a streaming source.
    -    //    - There can be leaves not associated with any streaming source, because they were
    -    //      generated from a batch source (e.g. stream-batch joins)
    -    //
    -    // 2. Assuming that the executed plan has same number of leaves in the same order as that of
    -    //    the trigger logical plan, we associate executed plan leaves with corresponding
    -    //    streaming sources.
    -    //
    -    // 3. For each source, we sum the metrics of the associated execution plan leaves.
    -    //
    -    val logicalPlanLeafToSource = newData.flatMap { case (source, logicalPlan) =>
    -      logicalPlan.collectLeaves().map { leaf => leaf -> source }
    +    val numInputRows = extractSourceToNumInputRows()
    +
    +    val eventTimeStats = lastExecution.executedPlan.collect {
    +      case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
    +        val stats = e.eventTimeStats.value
    +        Map(
    +          "max" -> stats.max,
    +          "min" -> stats.min,
    +          "avg" -> stats.avg.toLong).mapValues(formatTimestamp)
    +    }.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
    +
    --- End diff --
    
    This above code stayed the same. The diff is pretty dumb. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21126#discussion_r183542367
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala ---
    @@ -492,6 +492,77 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
         assert(progress.sources(0).numInputRows === 10)
       }
     
    +
    +  test("input row calculation with trigger having data for one of two V2 sources") {
    +    val streamInput1 = MemoryStream[Int]
    +    val streamInput2 = MemoryStream[Int]
    +
    +    testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = true)(
    +      AddData(streamInput1, 1, 2, 3),
    +      CheckAnswer(1, 2, 3),
    +      AssertOnQuery { q =>
    +        val lastProgress = getLastProgressWithData(q)
    +        assert(lastProgress.nonEmpty)
    +        assert(lastProgress.get.numInputRows == 3)
    +        assert(lastProgress.get.sources.length == 2)
    +        assert(lastProgress.get.sources(0).numInputRows == 3)
    +        assert(lastProgress.get.sources(1).numInputRows == 0)
    +        true
    +      }
    +    )
    +  }
    +
    +  test("input row calculation with mixed batch and streaming V2 sources") {
    +
    +    val streamInput = MemoryStream[Int]
    +    val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> "2")).toDF("value", "anotherValue")
    +
    +    testStream(streamInput.toDF().join(staticInputDF, "value"), useV2Sink = true)(
    +      AddData(streamInput, 1, 2, 3),
    +      AssertOnQuery { q =>
    +        q.processAllAvailable()
    +
    +        // The number of leaves in the trigger's logical plan should be same as the executed plan.
    +        require(
    +          q.lastExecution.logical.collectLeaves().length ==
    +            q.lastExecution.executedPlan.collectLeaves().length)
    +
    +        val lastProgress = getLastProgressWithData(q)
    +        assert(lastProgress.nonEmpty)
    +        assert(lastProgress.get.numInputRows == 3)
    +        assert(lastProgress.get.sources.length == 1)
    +        assert(lastProgress.get.sources(0).numInputRows == 3)
    +        true
    +      }
    +    )
    +
    +    val streamInput2 = MemoryStream[Int]
    +    val staticInputDF2 = staticInputDF.union(staticInputDF).cache()
    +
    +    testStream(streamInput2.toDF().join(staticInputDF2, "value"), useV2Sink = true)(
    --- End diff --
    
    e.g. self-join?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    jenkins retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21126#discussion_r183912999
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala ---
    @@ -207,62 +209,126 @@ trait ProgressReporter extends Logging {
           return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
         }
     
    -    // We want to associate execution plan leaves to sources that generate them, so that we match
    -    // the their metrics (e.g. numOutputRows) to the sources. To do this we do the following.
    -    // Consider the translation from the streaming logical plan to the final executed plan.
    -    //
    -    //  streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan
    -    //
    -    // 1. We keep track of streaming sources associated with each leaf in the trigger's logical plan
    -    //    - Each logical plan leaf will be associated with a single streaming source.
    -    //    - There can be multiple logical plan leaves associated with a streaming source.
    -    //    - There can be leaves not associated with any streaming source, because they were
    -    //      generated from a batch source (e.g. stream-batch joins)
    -    //
    -    // 2. Assuming that the executed plan has same number of leaves in the same order as that of
    -    //    the trigger logical plan, we associate executed plan leaves with corresponding
    -    //    streaming sources.
    -    //
    -    // 3. For each source, we sum the metrics of the associated execution plan leaves.
    -    //
    -    val logicalPlanLeafToSource = newData.flatMap { case (source, logicalPlan) =>
    -      logicalPlan.collectLeaves().map { leaf => leaf -> source }
    +    val numInputRows = extractSourceToNumInputRows()
    +
    +    val eventTimeStats = lastExecution.executedPlan.collect {
    +      case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
    +        val stats = e.eventTimeStats.value
    +        Map(
    +          "max" -> stats.max,
    +          "min" -> stats.min,
    +          "avg" -> stats.avg.toLong).mapValues(formatTimestamp)
    +    }.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
    +
    +    ExecutionStats(numInputRows, stateOperators, eventTimeStats)
    +  }
    +
    +  /** Extract number of input sources for each streaming source in plan */
    +  private def extractSourceToNumInputRows(): Map[BaseStreamingSource, Long] = {
    +
    +    import java.util.IdentityHashMap
    +    import scala.collection.JavaConverters._
    +
    +    def sumRows(tuples: Seq[(BaseStreamingSource, Long)]): Map[BaseStreamingSource, Long] = {
    +      tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source
         }
    -    val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming
    -    val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
    -    val numInputRows: Map[BaseStreamingSource, Long] =
    +
    +    val onlyDataSourceV2Sources = {
    +      // Check whether the streaming query's logical plan has only V2 data sources
    +      val allStreamingLeaves =
    +        logicalPlan.collect { case s: StreamingExecutionRelation => s }
    +      allStreamingLeaves.forall { _.source.isInstanceOf[MicroBatchReader] }
    +    }
    +
    +    if (onlyDataSourceV2Sources) {
    +      // DataSourceV2ScanExec is the execution plan leaf that is responsible for reading data
    +      // from a V2 source and has a direct reference to the V2 source that generated it. Each
    +      // DataSourceV2ScanExec records the number of rows it has read using SQLMetrics. However,
    +      // just collecting all DataSourceV2ScanExec nodes and getting the metric is not correct as
    +      // a DataSourceV2ScanExec instance may be referred to in the execution plan from two (or
    +      // even multiple times) points and considering it twice will leads to double counting. We
    +      // can't dedup them using their hashcode either because two different instances of
    +      // DataSourceV2ScanExec can have the same hashcode but account for separate sets of
    +      // records read, and deduping them to consider only one of them would be undercounting the
    +      // records read. Therefore the right way to do this is to consider the unique instances of
    +      // DataSourceV2ScanExec (using their identity hash codes) and get metrics from them.
    +      // Hence we calculate in the following way.
    +      //
    +      // 1. Collect all the unique DataSourceV2ScanExec instances using IdentityHashMap.
    +      //
    +      // 2. Extract the source and the number of rows read from the DataSourceV2ScanExec instanes.
    +      //
    +      // 3. Multiple DataSourceV2ScanExec instance may refer to the same source (can happen with
    +      //    self-unions or self-joins). Add up the number of rows for each unique source.
    +      val uniqueStreamingExecLeavesMap =
    +        new IdentityHashMap[DataSourceV2ScanExec, DataSourceV2ScanExec]()
    +
    +      lastExecution.executedPlan.collectLeaves().foreach {
    +        case s: DataSourceV2ScanExec if s.reader.isInstanceOf[BaseStreamingSource] =>
    +          uniqueStreamingExecLeavesMap.put(s, s)
    +        case _ =>
    +      }
    +
    +      val sourceToInputRowsTuples =
    +        uniqueStreamingExecLeavesMap.values.asScala.map { execLeaf =>
    +          val numRows = execLeaf.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
    +          val source = execLeaf.reader.asInstanceOf[BaseStreamingSource]
    +          source -> numRows
    +        }.toSeq
    +      logDebug("Source -> # input rows\n\t" + sourceToInputRowsTuples.mkString("\n\t"))
    +      sumRows(sourceToInputRowsTuples)
    +    } else {
    +
    +      // Since V1 source do not generate execution plan leaves that directly link with source that
    +      // generated it, we can only do a best-effort association between execution plan leaves to the
    +      // sources. This is known to fail in a few cases, see SPARK-24050.
    +      //
    +      // We want to associate execution plan leaves to sources that generate them, so that we match
    +      // the their metrics (e.g. numOutputRows) to the sources. To do this we do the following.
    +      // Consider the translation from the streaming logical plan to the final executed plan.
    +      //
    +      // streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan
    +      //
    +      // 1. We keep track of streaming sources associated with each leaf in trigger's logical plan
    +      //  - Each logical plan leaf will be associated with a single streaming source.
    +      //  - There can be multiple logical plan leaves associated with a streaming source.
    +      //  - There can be leaves not associated with any streaming source, because they were
    +      //      generated from a batch source (e.g. stream-batch joins)
    +      //
    +      // 2. Assuming that the executed plan has same number of leaves in the same order as that of
    +      //    the trigger logical plan, we associate executed plan leaves with corresponding
    +      //    streaming sources.
    +      //
    +      // 3. For each source, we sum the metrics of the associated execution plan leaves.
    +      //
    +      val logicalPlanLeafToSource = newData.flatMap { case (source, logicalPlan) =>
    +        logicalPlan.collectLeaves().map { leaf => leaf -> source }
    +      }
    +      val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming
    +      val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
           if (allLogicalPlanLeaves.size == allExecPlanLeaves.size) {
             val execLeafToSource = allLogicalPlanLeaves.zip(allExecPlanLeaves).flatMap {
               case (lp, ep) => logicalPlanLeafToSource.get(lp).map { source => ep -> source }
             }
    -        val sourceToNumInputRows = execLeafToSource.map { case (execLeaf, source) =>
    +        val sourceToInputRowsTuples = execLeafToSource.map { case (execLeaf, source) =>
               val numRows = execLeaf.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
               source -> numRows
             }
    -        sourceToNumInputRows.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source
    +        sumRows(sourceToInputRowsTuples)
           } else {
             if (!metricWarningLogged) {
               def toString[T](seq: Seq[T]): String = s"(size = ${seq.size}), ${seq.mkString(", ")}"
    +
               logWarning(
                 "Could not report metrics as number leaves in trigger logical plan did not match that" +
    -                s" of the execution plan:\n" +
    -                s"logical plan leaves: ${toString(allLogicalPlanLeaves)}\n" +
    -                s"execution plan leaves: ${toString(allExecPlanLeaves)}\n")
    +              s" of the execution plan:\n" +
    --- End diff --
    
    existing nit: maybe we should've just used 
    ```
    """
       |
       |
    """
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21126: [SPARK-24050][SS] Calculate input / processing ra...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/21126


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21126: [SPARK-24050][SS] Calculate input / processing rates cor...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/21126
  
    jenkins retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org