You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by nongli <gi...@git.apache.org> on 2016/03/28 20:57:50 UTC

[GitHub] spark pull request: [SPARK-14210][SQL] Add a metric for time spent...

GitHub user nongli opened a pull request:

    https://github.com/apache/spark/pull/12007

    [SPARK-14210][SQL] Add a metric for time spent in scans.

    ## What changes were proposed in this pull request?
    
    This adds a metric to parquet scans that measures the time in just the scan phase. This is
    only possible when the scan returns ColumnarBatches, otherwise the overhead is too high.
    
    This combined with the pipeline metric lets us easily see what percent of the time was
    in the scan.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/nongli/spark spark-14210

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/12007.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #12007
    
----
commit b17bac4dd880bc93455d85f9591468941a88ce97
Author: Nong Li <no...@databricks.com>
Date:   2016-03-25T22:47:19Z

    [SPARK-14210][SQL] Add a metric for time spent in scans.
    
    This adds a metric to parquet scans that measures the time in just the scan phase. This is
    only possible when the scan returns ColumnarBatches, otherwise the overhead is too high.
    
    This combined with the pipeline metric lets us easily see what percent of the time was
    in the scan.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14210][SQL] Add a metric for time spent...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12007#issuecomment-202578629
  
    **[Test build #54346 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/54346/consoleFull)** for PR 12007 at commit [`b1943cc`](https://github.com/apache/spark/commit/b1943cca4c1e5052a27f8991870e14ff11f1e994).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14210][SQL] Add a metric for time spent...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12007#discussion_r57671055
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -241,73 +256,89 @@ private[sql] case class DataSourceScan(
         // TODO: The abstractions between this class and SqlNewHadoopRDD makes it difficult to know
         // here which path to use. Fix this.
     
    -    ctx.currentVars = null
    -    val columns1 = (output zip colVars).map { case (attr, colVar) =>
    -      genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) }
    -    val scanBatches = ctx.freshName("processBatches")
    -    ctx.addNewFunction(scanBatches,
    -      s"""
    -      | private void $scanBatches() throws java.io.IOException {
    -      |  while (true) {
    -      |     int numRows = $batch.numRows();
    -      |     if ($idx == 0) {
    -      |       ${columnAssigns.mkString("", "\n", "\n")}
    -      |       $numOutputRows.add(numRows);
    -      |     }
    -      |
    -      |     // this loop is very perf sensitive and changes to it should be measured carefully
    -      |     while ($idx < numRows) {
    -      |       int $rowidx = $idx++;
    -      |       ${consume(ctx, columns1).trim}
    -      |       if (shouldStop()) return;
    -      |     }
    -      |
    -      |     if (!$input.hasNext()) {
    -      |       $batch = null;
    -      |       break;
    -      |     }
    -      |     $batch = ($columnarBatchClz)$input.next();
    -      |     $idx = 0;
    -      |   }
    -      | }""".stripMargin)
    -
         val exprRows =
    -      output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, x._1.nullable))
    +        output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, x._1.nullable))
         ctx.INPUT_ROW = row
         ctx.currentVars = null
    -    val columns2 = exprRows.map(_.gen(ctx))
    +    val columnsRowInput = exprRows.map(_.gen(ctx))
         val inputRow = if (outputUnsafeRows) row else null
         val scanRows = ctx.freshName("processRows")
         ctx.addNewFunction(scanRows,
           s"""
    -       | private void $scanRows(InternalRow $row) throws java.io.IOException {
    -       |   boolean firstRow = true;
    -       |   while (firstRow || $input.hasNext()) {
    -       |     if (firstRow) {
    -       |       firstRow = false;
    -       |     } else {
    -       |       $row = (InternalRow) $input.next();
    -       |     }
    -       |     $numOutputRows.add(1);
    -       |     ${consume(ctx, columns2, inputRow).trim}
    -       |     if (shouldStop()) return;
    -       |   }
    -       | }""".stripMargin)
    -
    -    val value = ctx.freshName("value")
    -    s"""
    -       | if ($batch != null) {
    -       |   $scanBatches();
    -       | } else if ($input.hasNext()) {
    -       |   Object $value = $input.next();
    -       |   if ($value instanceof $columnarBatchClz) {
    -       |     $batch = ($columnarBatchClz)$value;
    -       |     $scanBatches();
    -       |   } else {
    -       |     $scanRows((InternalRow) $value);
    -       |   }
    -       | }
    -     """.stripMargin
    +         | private void $scanRows(InternalRow $row) throws java.io.IOException {
    +         |   boolean firstRow = true;
    +         |   while (!shouldStop() && (firstRow || $input.hasNext())) {
    +         |     if (firstRow) {
    +         |       firstRow = false;
    +         |     } else {
    +         |       $row = (InternalRow) $input.next();
    +         |     }
    +         |     $numOutputRows.add(1);
    +         |     ${consume(ctx, columnsRowInput, inputRow).trim}
    +         |   }
    +         | }""".stripMargin)
    +
    +    // Timers for how long we spent inside the scan. We can only maintain this when using batches,
    +    // otherwise the overhead is too high.
    +    if (canProcessBatches()) {
    +      val scanTimeMetric = metricTerm(ctx, "scanTime")
    +      val getBatchStart = ctx.freshName("scanStart")
    +      val scanTimeTotalNs = ctx.freshName("scanTime")
    +      ctx.currentVars = null
    +      val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
    +        genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) }
    +      val scanBatches = ctx.freshName("processBatches")
    +      ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;")
    +
    +      ctx.addNewFunction(scanBatches,
    +        s"""
    +        | private void $scanBatches() throws java.io.IOException {
    +        |  while (true) {
    +        |     int numRows = $batch.numRows();
    +        |     if ($idx == 0) {
    +        |       ${columnAssigns.mkString("", "\n", "\n")}
    +        |       $numOutputRows.add(numRows);
    +        |     }
    +        |
    +        |     while (!shouldStop() && $idx < numRows) {
    +        |       int $rowidx = $idx++;
    +        |       ${consume(ctx, columnsBatchInput).trim}
    +        |     }
    +        |     if (shouldStop()) return;
    +        |
    +        |     long $getBatchStart = System.nanoTime();
    +        |     if (!$input.hasNext()) {
    +        |       $batch = null;
    +        |       $scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
    +        |       break;
    +        |     }
    +        |     $batch = ($columnarBatchClz)$input.next();
    +        |     $scanTimeTotalNs += System.nanoTime() - $getBatchStart;
    +        |     $idx = 0;
    +        |   }
    +        | }""".stripMargin)
    +
    +      val value = ctx.freshName("value")
    +      s"""
    +         | if ($batch != null) {
    +         |   $scanBatches();
    +         | } else if ($input.hasNext()) {
    +         |   Object $value = $input.next();
    +         |   if ($value instanceof $columnarBatchClz) {
    --- End diff --
    
    If it's not ready yet, we can do it later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14210][SQL] Add a metric for time spent...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12007#issuecomment-202590657
  
    **[Test build #54340 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/54340/consoleFull)** for PR 12007 at commit [`b17bac4`](https://github.com/apache/spark/commit/b17bac4dd880bc93455d85f9591468941a88ce97).
     * This patch passes all tests.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14210][SQL] Add a metric for time spent...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12007#issuecomment-202578966
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14210][SQL] Add a metric for time spent...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on the pull request:

    https://github.com/apache/spark/pull/12007#issuecomment-202529228
  
    Here's a screenshot of what this looks like.
    
    ![screen shot 2016-03-28 at 11 56 33 am](https://cloud.githubusercontent.com/assets/242468/14086908/51369016-f4dc-11e5-9cb7-fd06abc2e32d.png)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14210][SQL] Add a metric for time spent...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/12007#issuecomment-202705564
  
    LGTM, merging this into master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14210][SQL] Add a metric for time spent...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/12007


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14210][SQL] Add a metric for time spent...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12007#issuecomment-202591236
  
    Build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14210][SQL] Add a metric for time spent...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12007#issuecomment-202578969
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/54346/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14210][SQL] Add a metric for time spent...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12007#issuecomment-202591242
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/54340/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14210][SQL] Add a metric for time spent...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12007#discussion_r57756896
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -241,73 +256,89 @@ private[sql] case class DataSourceScan(
         // TODO: The abstractions between this class and SqlNewHadoopRDD makes it difficult to know
         // here which path to use. Fix this.
     
    -    ctx.currentVars = null
    -    val columns1 = (output zip colVars).map { case (attr, colVar) =>
    -      genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) }
    -    val scanBatches = ctx.freshName("processBatches")
    -    ctx.addNewFunction(scanBatches,
    -      s"""
    -      | private void $scanBatches() throws java.io.IOException {
    -      |  while (true) {
    -      |     int numRows = $batch.numRows();
    -      |     if ($idx == 0) {
    -      |       ${columnAssigns.mkString("", "\n", "\n")}
    -      |       $numOutputRows.add(numRows);
    -      |     }
    -      |
    -      |     // this loop is very perf sensitive and changes to it should be measured carefully
    -      |     while ($idx < numRows) {
    -      |       int $rowidx = $idx++;
    -      |       ${consume(ctx, columns1).trim}
    -      |       if (shouldStop()) return;
    -      |     }
    -      |
    -      |     if (!$input.hasNext()) {
    -      |       $batch = null;
    -      |       break;
    -      |     }
    -      |     $batch = ($columnarBatchClz)$input.next();
    -      |     $idx = 0;
    -      |   }
    -      | }""".stripMargin)
    -
         val exprRows =
    -      output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, x._1.nullable))
    +        output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, x._1.nullable))
         ctx.INPUT_ROW = row
         ctx.currentVars = null
    -    val columns2 = exprRows.map(_.gen(ctx))
    +    val columnsRowInput = exprRows.map(_.gen(ctx))
         val inputRow = if (outputUnsafeRows) row else null
         val scanRows = ctx.freshName("processRows")
         ctx.addNewFunction(scanRows,
           s"""
    -       | private void $scanRows(InternalRow $row) throws java.io.IOException {
    -       |   boolean firstRow = true;
    -       |   while (firstRow || $input.hasNext()) {
    -       |     if (firstRow) {
    -       |       firstRow = false;
    -       |     } else {
    -       |       $row = (InternalRow) $input.next();
    -       |     }
    -       |     $numOutputRows.add(1);
    -       |     ${consume(ctx, columns2, inputRow).trim}
    -       |     if (shouldStop()) return;
    -       |   }
    -       | }""".stripMargin)
    -
    -    val value = ctx.freshName("value")
    -    s"""
    -       | if ($batch != null) {
    -       |   $scanBatches();
    -       | } else if ($input.hasNext()) {
    -       |   Object $value = $input.next();
    -       |   if ($value instanceof $columnarBatchClz) {
    -       |     $batch = ($columnarBatchClz)$value;
    -       |     $scanBatches();
    -       |   } else {
    -       |     $scanRows((InternalRow) $value);
    -       |   }
    -       | }
    -     """.stripMargin
    +         | private void $scanRows(InternalRow $row) throws java.io.IOException {
    +         |   boolean firstRow = true;
    +         |   while (!shouldStop() && (firstRow || $input.hasNext())) {
    +         |     if (firstRow) {
    +         |       firstRow = false;
    +         |     } else {
    +         |       $row = (InternalRow) $input.next();
    +         |     }
    +         |     $numOutputRows.add(1);
    +         |     ${consume(ctx, columnsRowInput, inputRow).trim}
    +         |   }
    +         | }""".stripMargin)
    +
    +    // Timers for how long we spent inside the scan. We can only maintain this when using batches,
    +    // otherwise the overhead is too high.
    +    if (canProcessBatches()) {
    +      val scanTimeMetric = metricTerm(ctx, "scanTime")
    +      val getBatchStart = ctx.freshName("scanStart")
    +      val scanTimeTotalNs = ctx.freshName("scanTime")
    +      ctx.currentVars = null
    +      val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
    +        genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) }
    +      val scanBatches = ctx.freshName("processBatches")
    +      ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;")
    +
    +      ctx.addNewFunction(scanBatches,
    +        s"""
    +        | private void $scanBatches() throws java.io.IOException {
    +        |  while (true) {
    +        |     int numRows = $batch.numRows();
    +        |     if ($idx == 0) {
    +        |       ${columnAssigns.mkString("", "\n", "\n")}
    +        |       $numOutputRows.add(numRows);
    +        |     }
    +        |
    +        |     while (!shouldStop() && $idx < numRows) {
    +        |       int $rowidx = $idx++;
    +        |       ${consume(ctx, columnsBatchInput).trim}
    +        |     }
    +        |     if (shouldStop()) return;
    +        |
    +        |     long $getBatchStart = System.nanoTime();
    +        |     if (!$input.hasNext()) {
    +        |       $batch = null;
    +        |       $scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
    +        |       break;
    +        |     }
    +        |     $batch = ($columnarBatchClz)$input.next();
    +        |     $scanTimeTotalNs += System.nanoTime() - $getBatchStart;
    +        |     $idx = 0;
    +        |   }
    +        | }""".stripMargin)
    +
    +      val value = ctx.freshName("value")
    +      s"""
    +         | if ($batch != null) {
    +         |   $scanBatches();
    +         | } else if ($input.hasNext()) {
    +         |   Object $value = $input.next();
    +         |   if ($value instanceof $columnarBatchClz) {
    --- End diff --
    
    We can't. We know when it can't return batches but we don't have logic that it must. We need to add a check somewhere that makes sure the schema doesn't have nested types. We should do this though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14210][SQL] Add a metric for time spent...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/12007#issuecomment-202651433
  
    cc @sameeragarwal 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14210][SQL] Add a metric for time spent...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12007#issuecomment-202541200
  
    **[Test build #54346 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/54346/consoleFull)** for PR 12007 at commit [`b1943cc`](https://github.com/apache/spark/commit/b1943cca4c1e5052a27f8991870e14ff11f1e994).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14210][SQL] Add a metric for time spent...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12007#discussion_r57670804
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
    @@ -241,73 +256,89 @@ private[sql] case class DataSourceScan(
         // TODO: The abstractions between this class and SqlNewHadoopRDD makes it difficult to know
         // here which path to use. Fix this.
     
    -    ctx.currentVars = null
    -    val columns1 = (output zip colVars).map { case (attr, colVar) =>
    -      genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) }
    -    val scanBatches = ctx.freshName("processBatches")
    -    ctx.addNewFunction(scanBatches,
    -      s"""
    -      | private void $scanBatches() throws java.io.IOException {
    -      |  while (true) {
    -      |     int numRows = $batch.numRows();
    -      |     if ($idx == 0) {
    -      |       ${columnAssigns.mkString("", "\n", "\n")}
    -      |       $numOutputRows.add(numRows);
    -      |     }
    -      |
    -      |     // this loop is very perf sensitive and changes to it should be measured carefully
    -      |     while ($idx < numRows) {
    -      |       int $rowidx = $idx++;
    -      |       ${consume(ctx, columns1).trim}
    -      |       if (shouldStop()) return;
    -      |     }
    -      |
    -      |     if (!$input.hasNext()) {
    -      |       $batch = null;
    -      |       break;
    -      |     }
    -      |     $batch = ($columnarBatchClz)$input.next();
    -      |     $idx = 0;
    -      |   }
    -      | }""".stripMargin)
    -
         val exprRows =
    -      output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, x._1.nullable))
    +        output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, x._1.nullable))
         ctx.INPUT_ROW = row
         ctx.currentVars = null
    -    val columns2 = exprRows.map(_.gen(ctx))
    +    val columnsRowInput = exprRows.map(_.gen(ctx))
         val inputRow = if (outputUnsafeRows) row else null
         val scanRows = ctx.freshName("processRows")
         ctx.addNewFunction(scanRows,
           s"""
    -       | private void $scanRows(InternalRow $row) throws java.io.IOException {
    -       |   boolean firstRow = true;
    -       |   while (firstRow || $input.hasNext()) {
    -       |     if (firstRow) {
    -       |       firstRow = false;
    -       |     } else {
    -       |       $row = (InternalRow) $input.next();
    -       |     }
    -       |     $numOutputRows.add(1);
    -       |     ${consume(ctx, columns2, inputRow).trim}
    -       |     if (shouldStop()) return;
    -       |   }
    -       | }""".stripMargin)
    -
    -    val value = ctx.freshName("value")
    -    s"""
    -       | if ($batch != null) {
    -       |   $scanBatches();
    -       | } else if ($input.hasNext()) {
    -       |   Object $value = $input.next();
    -       |   if ($value instanceof $columnarBatchClz) {
    -       |     $batch = ($columnarBatchClz)$value;
    -       |     $scanBatches();
    -       |   } else {
    -       |     $scanRows((InternalRow) $value);
    -       |   }
    -       | }
    -     """.stripMargin
    +         | private void $scanRows(InternalRow $row) throws java.io.IOException {
    +         |   boolean firstRow = true;
    +         |   while (!shouldStop() && (firstRow || $input.hasNext())) {
    +         |     if (firstRow) {
    +         |       firstRow = false;
    +         |     } else {
    +         |       $row = (InternalRow) $input.next();
    +         |     }
    +         |     $numOutputRows.add(1);
    +         |     ${consume(ctx, columnsRowInput, inputRow).trim}
    +         |   }
    +         | }""".stripMargin)
    +
    +    // Timers for how long we spent inside the scan. We can only maintain this when using batches,
    +    // otherwise the overhead is too high.
    +    if (canProcessBatches()) {
    +      val scanTimeMetric = metricTerm(ctx, "scanTime")
    +      val getBatchStart = ctx.freshName("scanStart")
    +      val scanTimeTotalNs = ctx.freshName("scanTime")
    +      ctx.currentVars = null
    +      val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
    +        genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) }
    +      val scanBatches = ctx.freshName("processBatches")
    +      ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;")
    +
    +      ctx.addNewFunction(scanBatches,
    +        s"""
    +        | private void $scanBatches() throws java.io.IOException {
    +        |  while (true) {
    +        |     int numRows = $batch.numRows();
    +        |     if ($idx == 0) {
    +        |       ${columnAssigns.mkString("", "\n", "\n")}
    +        |       $numOutputRows.add(numRows);
    +        |     }
    +        |
    +        |     while (!shouldStop() && $idx < numRows) {
    +        |       int $rowidx = $idx++;
    +        |       ${consume(ctx, columnsBatchInput).trim}
    +        |     }
    +        |     if (shouldStop()) return;
    +        |
    +        |     long $getBatchStart = System.nanoTime();
    +        |     if (!$input.hasNext()) {
    +        |       $batch = null;
    +        |       $scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
    +        |       break;
    +        |     }
    +        |     $batch = ($columnarBatchClz)$input.next();
    +        |     $scanTimeTotalNs += System.nanoTime() - $getBatchStart;
    +        |     $idx = 0;
    +        |   }
    +        | }""".stripMargin)
    +
    +      val value = ctx.freshName("value")
    +      s"""
    +         | if ($batch != null) {
    +         |   $scanBatches();
    +         | } else if ($input.hasNext()) {
    +         |   Object $value = $input.next();
    +         |   if ($value instanceof $columnarBatchClz) {
    --- End diff --
    
    Should we remove this runtime check now?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14210][SQL] Add a metric for time spent...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/12007#issuecomment-202690321
  
    cc @davies would be great if this can make it into our nightly perf run


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14210][SQL] Add a metric for time spent...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12007#issuecomment-202531559
  
    **[Test build #54340 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/54340/consoleFull)** for PR 12007 at commit [`b17bac4`](https://github.com/apache/spark/commit/b17bac4dd880bc93455d85f9591468941a88ce97).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org