You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/07 05:12:26 UTC
spark git commit: [SPARK-20703][SQL][FOLLOW-UP] Associate metrics
with data writes onto DataFrameWriter operations
Repository: spark
Updated Branches:
refs/heads/master c09b31eb8 -> 5df99bd36
[SPARK-20703][SQL][FOLLOW-UP] Associate metrics with data writes onto DataFrameWriter operations
## What changes were proposed in this pull request?
Remove time metrics since it seems no way to measure it in non per-row tracking.
## How was this patch tested?
Existing tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Liang-Chi Hsieh <vi...@gmail.com>
Closes #18558 from viirya/SPARK-20703-followup.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5df99bd3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5df99bd3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5df99bd3
Branch: refs/heads/master
Commit: 5df99bd364561c6f4c02308149ba5eb71f89247e
Parents: c09b31e
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Fri Jul 7 13:12:20 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Jul 7 13:12:20 2017 +0800
----------------------------------------------------------------------
.../execution/command/DataWritingCommand.scala | 10 ---------
.../datasources/FileFormatWriter.scala | 22 +++-----------------
.../sql/hive/execution/SQLMetricsSuite.scala | 3 ---
3 files changed, 3 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/5df99bd3/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
index 0c381a2..700f7f8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
@@ -30,7 +30,6 @@ trait DataWritingCommand extends RunnableCommand {
override lazy val metrics: Map[String, SQLMetric] = {
val sparkContext = SparkContext.getActive.get
Map(
- "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing time (ms)"),
"numFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files"),
"numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of written output"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
@@ -47,23 +46,14 @@ trait DataWritingCommand extends RunnableCommand {
var numFiles = 0
var totalNumBytes: Long = 0L
var totalNumOutput: Long = 0L
- var totalWritingTime: Long = 0L
writeSummaries.foreach { summary =>
numPartitions += summary.updatedPartitions.size
numFiles += summary.numOutputFile
totalNumBytes += summary.numOutputBytes
totalNumOutput += summary.numOutputRows
- totalWritingTime += summary.totalWritingTime
}
- val avgWritingTime = if (numFiles > 0) {
- (totalWritingTime / numFiles).toLong
- } else {
- 0L
- }
-
- metrics("avgTime").add(avgWritingTime)
metrics("numFiles").add(numFiles)
metrics("numOutputBytes").add(totalNumBytes)
metrics("numOutputRows").add(totalNumOutput)
http://git-wip-us.apache.org/repos/asf/spark/blob/5df99bd3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 6486663..9eb9eae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -275,8 +275,6 @@ object FileFormatWriter extends Logging {
/**
* The data structures used to measure metrics during writing.
*/
- protected var totalWritingTime: Long = 0L
- protected var timeOnCurrentFile: Long = 0L
protected var numOutputRows: Long = 0L
protected var numOutputBytes: Long = 0L
@@ -343,9 +341,7 @@ object FileFormatWriter extends Logging {
}
val internalRow = iter.next()
- val startTime = System.nanoTime()
currentWriter.write(internalRow)
- timeOnCurrentFile += (System.nanoTime() - startTime)
recordsInFile += 1
}
releaseResources()
@@ -355,17 +351,13 @@ object FileFormatWriter extends Logging {
updatedPartitions = Set.empty,
numOutputFile = fileCounter + 1,
numOutputBytes = numOutputBytes,
- numOutputRows = numOutputRows,
- totalWritingTime = totalWritingTime)
+ numOutputRows = numOutputRows)
}
override def releaseResources(): Unit = {
if (currentWriter != null) {
try {
- val startTime = System.nanoTime()
currentWriter.close()
- totalWritingTime += (timeOnCurrentFile + System.nanoTime() - startTime) / 1000 / 1000
- timeOnCurrentFile = 0
numOutputBytes += getFileSize(taskAttemptContext.getConfiguration, currentPath)
} finally {
currentWriter = null
@@ -504,9 +496,7 @@ object FileFormatWriter extends Logging {
releaseResources()
newOutputWriter(currentPartColsAndBucketId, getPartPath, fileCounter, updatedPartitions)
}
- val startTime = System.nanoTime()
currentWriter.write(getOutputRow(row))
- timeOnCurrentFile += (System.nanoTime() - startTime)
recordsInFile += 1
}
if (currentPartColsAndBucketId != null) {
@@ -519,17 +509,13 @@ object FileFormatWriter extends Logging {
updatedPartitions = updatedPartitions.toSet,
numOutputFile = totalFileCounter,
numOutputBytes = numOutputBytes,
- numOutputRows = numOutputRows,
- totalWritingTime = totalWritingTime)
+ numOutputRows = numOutputRows)
}
override def releaseResources(): Unit = {
if (currentWriter != null) {
try {
- val startTime = System.nanoTime()
currentWriter.close()
- totalWritingTime += (timeOnCurrentFile + System.nanoTime() - startTime) / 1000 / 1000
- timeOnCurrentFile = 0
numOutputBytes += getFileSize(taskAttemptContext.getConfiguration, currentPath)
} finally {
currentWriter = null
@@ -547,11 +533,9 @@ object FileFormatWriter extends Logging {
* @param numOutputFile the total number of files.
* @param numOutputRows the number of output rows.
* @param numOutputBytes the bytes of output data.
- * @param totalWritingTime the total writing time in ms.
*/
case class ExecutedWriteSummary(
updatedPartitions: Set[String],
numOutputFile: Int,
numOutputRows: Long,
- numOutputBytes: Long,
- totalWritingTime: Long)
+ numOutputBytes: Long)
http://git-wip-us.apache.org/repos/asf/spark/blob/5df99bd3/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala
index 1ef1988..24c0385 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala
@@ -65,9 +65,6 @@ class SQLMetricsSuite extends SQLTestUtils with TestHiveSingleton {
val totalNumBytesMetric = executedNode.metrics.find(_.name == "bytes of written output").get
val totalNumBytes = metrics(totalNumBytesMetric.accumulatorId).replaceAll(",", "").toInt
assert(totalNumBytes > 0)
- val writingTimeMetric = executedNode.metrics.find(_.name == "average writing time (ms)").get
- val writingTime = metrics(writingTimeMetric.accumulatorId).replaceAll(",", "").toInt
- assert(writingTime >= 0)
}
private def testMetricsNonDynamicPartition(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org