You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by gatorsmile <gi...@git.apache.org> on 2017/08/31 05:59:05 UTC

[GitHub] spark pull request #19092: [SPARK-21878] [SQL] [TEST] Create SQLMetricsTestU...

Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19092#discussion_r136251106
  
    --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala ---
    @@ -17,112 +17,10 @@
     
     package org.apache.spark.sql.hive.execution
     
    -import java.io.File
    -
    -import org.apache.spark.sql.catalyst.TableIdentifier
    -import org.apache.spark.sql.functions._
    +import org.apache.spark.sql.execution.metric.SQLMetricsTestUtils
     import org.apache.spark.sql.hive.test.TestHiveSingleton
    -import org.apache.spark.sql.test.SQLTestUtils
    -import org.apache.spark.util.Utils
    -
    -class SQLMetricsSuite extends SQLTestUtils with TestHiveSingleton {
    -  import spark.implicits._
    -
    -  /**
    -   * Get execution metrics for the SQL execution and verify metrics values.
    -   *
    -   * @param metricsValues the expected metric values (numFiles, numPartitions, numOutputRows).
    -   * @param func the function can produce execution id after running.
    -   */
    -  private def verifyWriteDataMetrics(metricsValues: Seq[Int])(func: => Unit): Unit = {
    -    val previousExecutionIds = spark.sharedState.listener.executionIdToData.keySet
    -    // Run the given function to trigger query execution.
    -    func
    -    spark.sparkContext.listenerBus.waitUntilEmpty(10000)
    -    val executionIds =
    -      spark.sharedState.listener.executionIdToData.keySet.diff(previousExecutionIds)
    -    assert(executionIds.size == 1)
    -    val executionId = executionIds.head
    -
    -    val executionData = spark.sharedState.listener.getExecution(executionId).get
    -    val executedNode = executionData.physicalPlanGraph.nodes.head
    -
    -    val metricsNames = Seq(
    -      "number of written files",
    -      "number of dynamic part",
    -      "number of output rows")
    -
    -    val metrics = spark.sharedState.listener.getExecutionMetrics(executionId)
    -
    -    metricsNames.zip(metricsValues).foreach { case (metricsName, expected) =>
    -      val sqlMetric = executedNode.metrics.find(_.name == metricsName)
    -      assert(sqlMetric.isDefined)
    -      val accumulatorId = sqlMetric.get.accumulatorId
    -      val metricValue = metrics(accumulatorId).replaceAll(",", "").toInt
    -      assert(metricValue == expected)
    -    }
    -
    -    val totalNumBytesMetric = executedNode.metrics.find(_.name == "bytes of written output").get
    -    val totalNumBytes = metrics(totalNumBytesMetric.accumulatorId).replaceAll(",", "").toInt
    -    assert(totalNumBytes > 0)
    -  }
    -
    -  private def testMetricsNonDynamicPartition(
    -      dataFormat: String,
    -      tableName: String): Unit = {
    -    withTable(tableName) {
    -      Seq((1, 2)).toDF("i", "j")
    -        .write.format(dataFormat).mode("overwrite").saveAsTable(tableName)
    -
    -      val tableLocation =
    -        new File(spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).location)
     
    -      // 2 files, 100 rows, 0 dynamic partition.
    -      verifyWriteDataMetrics(Seq(2, 0, 100)) {
    -        (0 until 100).map(i => (i, i + 1)).toDF("i", "j").repartition(2)
    -          .write.format(dataFormat).mode("overwrite").insertInto(tableName)
    -      }
    -      assert(Utils.recursiveList(tableLocation).count(_.getName.startsWith("part-")) == 2)
    -    }
    -  }
    -
    -  private def testMetricsDynamicPartition(
    -      provider: String,
    -      dataFormat: String,
    -      tableName: String): Unit = {
    -    withTempPath { dir =>
    -      spark.sql(
    -        s"""
    -           |CREATE TABLE $tableName(a int, b int)
    -           |USING $provider
    -           |PARTITIONED BY(a)
    -           |LOCATION '${dir.toURI}'
    -         """.stripMargin)
    -      val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
    -      assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
    -
    -      val df = spark.range(start = 0, end = 40, step = 1, numPartitions = 1)
    -        .selectExpr("id a", "id b")
    -
    -      // 40 files, 80 rows, 40 dynamic partitions.
    -      verifyWriteDataMetrics(Seq(40, 40, 80)) {
    -        df.union(df).repartition(2, $"a")
    -          .write
    -          .format(dataFormat)
    -          .mode("overwrite")
    -          .insertInto(tableName)
    -      }
    -      assert(Utils.recursiveList(dir).count(_.getName.startsWith("part-")) == 40)
    -    }
    -  }
    -
    -  test("writing data out metrics: parquet") {
    -    testMetricsNonDynamicPartition("parquet", "t1")
    -  }
    -
    -  test("writing data out metrics with dynamic partition: parquet") {
    --- End diff --
    
    The above two test cases are moved to `sql/execution/metric/SQLMetricsSuite.scala`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org