You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/12/12 02:04:17 UTC

[spark] branch master updated: [SPARK-26193][SQL][FOLLOW UP] Read metrics rename and display text changes

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bd8da37  [SPARK-26193][SQL][FOLLOW UP] Read metrics rename and display text changes
bd8da37 is described below

commit bd8da3799dd160771ebb3ea55b7678b644248425
Author: Yuanjian Li <xy...@gmail.com>
AuthorDate: Wed Dec 12 10:03:50 2018 +0800

    [SPARK-26193][SQL][FOLLOW UP] Read metrics rename and display text changes
    
    ## What changes were proposed in this pull request?
    Follow up pr for #23207, include following changes:
    
    - Rename `SQLShuffleMetricsReporter` to `SQLShuffleReadMetricsReporter` to make it match with write side naming.
    - Display text changes for read side for naming consistent.
    - Rename function in `ShuffleWriteProcessor`.
    - Delete `private[spark]` in execution package.
    
    ## How was this patch tested?
    
    Existing tests.
    
    Closes #23286 from xuanyuanking/SPARK-26193-follow.
    
    Authored-by: Yuanjian Li <xy...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../apache/spark/scheduler/ShuffleMapTask.scala    |  2 +-
 .../spark/shuffle/ShuffleWriteProcessor.scala      |  2 +-
 .../spark/sql/execution/ShuffledRowRDD.scala       |  6 ++--
 .../execution/exchange/ShuffleExchangeExec.scala   |  4 +--
 .../org/apache/spark/sql/execution/limit.scala     |  6 ++--
 .../metric/SQLShuffleMetricsReporter.scala         | 36 +++++++++++-----------
 .../sql/execution/UnsafeRowSerializerSuite.scala   |  4 +--
 .../sql/execution/metric/SQLMetricsSuite.scala     | 20 ++++++------
 8 files changed, 40 insertions(+), 40 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 2a8d1dd..35664ff 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -92,7 +92,7 @@ private[spark] class ShuffleMapTask(
       threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
     } else 0L
 
-    dep.shuffleWriterProcessor.writeProcess(rdd, dep, partitionId, context, partition)
+    dep.shuffleWriterProcessor.write(rdd, dep, partitionId, context, partition)
   }
 
   override def preferredLocations: Seq[TaskLocation] = preferredLocs
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala
index f521315..5b0c7e9 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala
@@ -41,7 +41,7 @@ private[spark] class ShuffleWriteProcessor extends Serializable with Logging {
    * get from [[ShuffleManager]] and triggers rdd compute, finally return the [[MapStatus]] for
    * this task.
    */
-  def writeProcess(
+  def write(
       rdd: RDD[_],
       dep: ShuffleDependency[_, _, _],
       partitionId: Int,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
index 9b05faa..079ff25 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
@@ -22,7 +22,7 @@ import java.util.Arrays
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleMetricsReporter}
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsReporter}
 
 /**
  * The [[Partition]] used by [[ShuffledRowRDD]]. A post-shuffle partition
@@ -157,9 +157,9 @@ class ShuffledRowRDD(
   override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
     val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition]
     val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics()
-    // `SQLShuffleMetricsReporter` will update its own metrics for SQL exchange operator,
+    // `SQLShuffleReadMetricsReporter` will update its own metrics for SQL exchange operator,
     // as well as the `tempMetrics` for basic shuffle metrics.
-    val sqlMetricsReporter = new SQLShuffleMetricsReporter(tempMetrics, metrics)
+    val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, metrics)
     // The range of pre-shuffle partitions that we are fetching at here is
     // [startPreShufflePartitionIndex, endPreShufflePartitionIndex - 1].
     val reader =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
index 0c20205..da7b0c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Uns
 import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleMetricsReporter, SQLShuffleWriteMetricsReporter}
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.MutablePair
@@ -50,7 +50,7 @@ case class ShuffleExchangeExec(
   private lazy val writeMetrics =
     SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
   private lazy val readMetrics =
-    SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
+    SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
   override lazy val metrics = Map(
     "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size")
   ) ++ readMetrics ++ writeMetrics
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index 1f2fdde..bfaf080 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGe
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
-import org.apache.spark.sql.execution.metric.{SQLShuffleMetricsReporter, SQLShuffleWriteMetricsReporter}
+import org.apache.spark.sql.execution.metric.{SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
 
 /**
  * Take the first `limit` elements and collect them to a single partition.
@@ -41,7 +41,7 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode
   private lazy val writeMetrics =
     SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
   private lazy val readMetrics =
-    SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
+    SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
   override lazy val metrics = readMetrics ++ writeMetrics
   protected override def doExecute(): RDD[InternalRow] = {
     val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit))
@@ -165,7 +165,7 @@ case class TakeOrderedAndProjectExec(
   private lazy val writeMetrics =
     SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
   private lazy val readMetrics =
-    SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
+    SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
   override lazy val metrics = readMetrics ++ writeMetrics
 
   protected override def doExecute(): RDD[InternalRow] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
index ff7941e..2c0ea80 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
@@ -27,23 +27,23 @@ import org.apache.spark.shuffle.ShuffleWriteMetricsReporter
  * @param metrics All metrics in current SparkPlan. This param should not empty and
  *   contains all shuffle metrics defined in createShuffleReadMetrics.
  */
-private[spark] class SQLShuffleMetricsReporter(
+class SQLShuffleReadMetricsReporter(
     tempMetrics: TempShuffleReadMetrics,
     metrics: Map[String, SQLMetric]) extends TempShuffleReadMetrics {
   private[this] val _remoteBlocksFetched =
-    metrics(SQLShuffleMetricsReporter.REMOTE_BLOCKS_FETCHED)
+    metrics(SQLShuffleReadMetricsReporter.REMOTE_BLOCKS_FETCHED)
   private[this] val _localBlocksFetched =
-    metrics(SQLShuffleMetricsReporter.LOCAL_BLOCKS_FETCHED)
+    metrics(SQLShuffleReadMetricsReporter.LOCAL_BLOCKS_FETCHED)
   private[this] val _remoteBytesRead =
-    metrics(SQLShuffleMetricsReporter.REMOTE_BYTES_READ)
+    metrics(SQLShuffleReadMetricsReporter.REMOTE_BYTES_READ)
   private[this] val _remoteBytesReadToDisk =
-    metrics(SQLShuffleMetricsReporter.REMOTE_BYTES_READ_TO_DISK)
+    metrics(SQLShuffleReadMetricsReporter.REMOTE_BYTES_READ_TO_DISK)
   private[this] val _localBytesRead =
-    metrics(SQLShuffleMetricsReporter.LOCAL_BYTES_READ)
+    metrics(SQLShuffleReadMetricsReporter.LOCAL_BYTES_READ)
   private[this] val _fetchWaitTime =
-    metrics(SQLShuffleMetricsReporter.FETCH_WAIT_TIME)
+    metrics(SQLShuffleReadMetricsReporter.FETCH_WAIT_TIME)
   private[this] val _recordsRead =
-    metrics(SQLShuffleMetricsReporter.RECORDS_READ)
+    metrics(SQLShuffleReadMetricsReporter.RECORDS_READ)
 
   override def incRemoteBlocksFetched(v: Long): Unit = {
     _remoteBlocksFetched.add(v)
@@ -75,7 +75,7 @@ private[spark] class SQLShuffleMetricsReporter(
   }
 }
 
-private[spark] object SQLShuffleMetricsReporter {
+object SQLShuffleReadMetricsReporter {
   val REMOTE_BLOCKS_FETCHED = "remoteBlocksFetched"
   val LOCAL_BLOCKS_FETCHED = "localBlocksFetched"
   val REMOTE_BYTES_READ = "remoteBytesRead"
@@ -88,8 +88,8 @@ private[spark] object SQLShuffleMetricsReporter {
    * Create all shuffle read relative metrics and return the Map.
    */
   def createShuffleReadMetrics(sc: SparkContext): Map[String, SQLMetric] = Map(
-    REMOTE_BLOCKS_FETCHED -> SQLMetrics.createMetric(sc, "remote blocks fetched"),
-    LOCAL_BLOCKS_FETCHED -> SQLMetrics.createMetric(sc, "local blocks fetched"),
+    REMOTE_BLOCKS_FETCHED -> SQLMetrics.createMetric(sc, "remote blocks read"),
+    LOCAL_BLOCKS_FETCHED -> SQLMetrics.createMetric(sc, "local blocks read"),
     REMOTE_BYTES_READ -> SQLMetrics.createSizeMetric(sc, "remote bytes read"),
     REMOTE_BYTES_READ_TO_DISK -> SQLMetrics.createSizeMetric(sc, "remote bytes read to disk"),
     LOCAL_BYTES_READ -> SQLMetrics.createSizeMetric(sc, "local bytes read"),
@@ -102,7 +102,7 @@ private[spark] object SQLShuffleMetricsReporter {
  * @param metricsReporter Other reporter need to be updated in this SQLShuffleWriteMetricsReporter.
  * @param metrics Shuffle write metrics in current SparkPlan.
  */
-private[spark] class SQLShuffleWriteMetricsReporter(
+class SQLShuffleWriteMetricsReporter(
     metricsReporter: ShuffleWriteMetricsReporter,
     metrics: Map[String, SQLMetric]) extends ShuffleWriteMetricsReporter {
   private[this] val _bytesWritten =
@@ -112,29 +112,29 @@ private[spark] class SQLShuffleWriteMetricsReporter(
   private[this] val _writeTime =
     metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)
 
-  override private[spark] def incBytesWritten(v: Long): Unit = {
+  override def incBytesWritten(v: Long): Unit = {
     metricsReporter.incBytesWritten(v)
     _bytesWritten.add(v)
   }
-  override private[spark] def decRecordsWritten(v: Long): Unit = {
+  override def decRecordsWritten(v: Long): Unit = {
     metricsReporter.decBytesWritten(v)
     _recordsWritten.set(_recordsWritten.value - v)
   }
-  override private[spark] def incRecordsWritten(v: Long): Unit = {
+  override def incRecordsWritten(v: Long): Unit = {
     metricsReporter.incRecordsWritten(v)
     _recordsWritten.add(v)
   }
-  override private[spark] def incWriteTime(v: Long): Unit = {
+  override def incWriteTime(v: Long): Unit = {
     metricsReporter.incWriteTime(v)
     _writeTime.add(v)
   }
-  override private[spark] def decBytesWritten(v: Long): Unit = {
+  override def decBytesWritten(v: Long): Unit = {
     metricsReporter.decBytesWritten(v)
     _bytesWritten.set(_bytesWritten.value - v)
   }
 }
 
-private[spark] object SQLShuffleWriteMetricsReporter {
+object SQLShuffleWriteMetricsReporter {
   val SHUFFLE_BYTES_WRITTEN = "shuffleBytesWritten"
   val SHUFFLE_RECORDS_WRITTEN = "shuffleRecordsWritten"
   val SHUFFLE_WRITE_TIME = "shuffleWriteTime"
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index 1ad5713..ca86922 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{LocalSparkSession, Row, SparkSession}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
-import org.apache.spark.sql.execution.metric.SQLShuffleMetricsReporter
+import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter
 import org.apache.spark.sql.types._
 import org.apache.spark.storage.ShuffleBlockId
 import org.apache.spark.util.collection.ExternalSorter
@@ -140,7 +140,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkSession {
         new UnsafeRowSerializer(2))
     val shuffled = new ShuffledRowRDD(
       dependency,
-      SQLShuffleMetricsReporter.createShuffleReadMetrics(spark.sparkContext))
+      SQLShuffleReadMetricsReporter.createShuffleReadMetrics(spark.sparkContext))
     shuffled.count()
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index f649549..47265df 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -96,8 +96,8 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
         "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))
     val shuffleExpected1 = Map(
       "records read" -> 2L,
-      "local blocks fetched" -> 2L,
-      "remote blocks fetched" -> 0L,
+      "local blocks read" -> 2L,
+      "remote blocks read" -> 0L,
       "shuffle records written" -> 2L)
     testSparkPlanMetrics(df, 1, Map(
       2L -> (("HashAggregate", expected1(0))),
@@ -114,8 +114,8 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
         "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))
     val shuffleExpected2 = Map(
       "records read" -> 4L,
-      "local blocks fetched" -> 4L,
-      "remote blocks fetched" -> 0L,
+      "local blocks read" -> 4L,
+      "remote blocks read" -> 0L,
       "shuffle records written" -> 4L)
     testSparkPlanMetrics(df2, 1, Map(
       2L -> (("HashAggregate", expected2(0))),
@@ -175,8 +175,8 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
       1L -> (("Exchange", Map(
         "shuffle records written" -> 2L,
         "records read" -> 2L,
-        "local blocks fetched" -> 2L,
-        "remote blocks fetched" -> 0L))),
+        "local blocks read" -> 2L,
+        "remote blocks read" -> 0L))),
       0L -> (("ObjectHashAggregate", Map("number of output rows" -> 1L))))
     )
 
@@ -187,8 +187,8 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
       1L -> (("Exchange", Map(
         "shuffle records written" -> 4L,
         "records read" -> 4L,
-        "local blocks fetched" -> 4L,
-        "remote blocks fetched" -> 0L))),
+        "local blocks read" -> 4L,
+        "remote blocks read" -> 0L))),
       0L -> (("ObjectHashAggregate", Map("number of output rows" -> 3L))))
     )
   }
@@ -216,8 +216,8 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
           "number of output rows" -> 4L))),
         2L -> (("Exchange", Map(
           "records read" -> 4L,
-          "local blocks fetched" -> 2L,
-          "remote blocks fetched" -> 0L,
+          "local blocks read" -> 2L,
+          "remote blocks read" -> 0L,
           "shuffle records written" -> 2L))))
       )
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org