You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/03/21 17:06:30 UTC
spark git commit: [SPARK-23288][SS] Fix output metrics with parquet
sink
Repository: spark
Updated Branches:
refs/heads/master 98d0ea3f6 -> 918c7e99a
[SPARK-23288][SS] Fix output metrics with parquet sink
## What changes were proposed in this pull request?
Output metrics were not filled when parquet sink used.
This PR fixes this problem by passing a `BasicWriteJobStatsTracker` in `FileStreamSink`.
## How was this patch tested?
Additional unit test added.
Author: Gabor Somogyi <ga...@gmail.com>
Closes #20745 from gaborgsomogyi/SPARK-23288.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/918c7e99
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/918c7e99
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/918c7e99
Branch: refs/heads/master
Commit: 918c7e99afdcea05c36626e230636c4f8aabf82c
Parents: 98d0ea3
Author: Gabor Somogyi <ga...@gmail.com>
Authored: Wed Mar 21 10:06:26 2018 -0700
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Mar 21 10:06:26 2018 -0700
----------------------------------------------------------------------
.../execution/command/DataWritingCommand.scala | 11 +----
.../datasources/BasicWriteStatsTracker.scala | 25 ++++++++--
.../execution/streaming/FileStreamSink.scala | 10 +++-
.../sql/streaming/FileStreamSinkSuite.scala | 52 ++++++++++++++++++++
4 files changed, 82 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/918c7e99/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
index e56f810..e11dbd2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.command
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.SparkContext
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
@@ -45,15 +44,7 @@ trait DataWritingCommand extends Command {
// Output columns of the analyzed input query plan
def outputColumns: Seq[Attribute]
- lazy val metrics: Map[String, SQLMetric] = {
- val sparkContext = SparkContext.getActive.get
- Map(
- "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files"),
- "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of written output"),
- "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
- "numParts" -> SQLMetrics.createMetric(sparkContext, "number of dynamic part")
- )
- }
+ lazy val metrics: Map[String, SQLMetric] = BasicWriteJobStatsTracker.metrics
def basicWriteJobStatsTracker(hadoopConf: Configuration): BasicWriteJobStatsTracker = {
val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
http://git-wip-us.apache.org/repos/asf/spark/blob/918c7e99/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
index 9dbbe99..69c03d8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
@@ -153,12 +153,29 @@ class BasicWriteJobStatsTracker(
totalNumOutput += summary.numRows
}
- metrics("numFiles").add(numFiles)
- metrics("numOutputBytes").add(totalNumBytes)
- metrics("numOutputRows").add(totalNumOutput)
- metrics("numParts").add(numPartitions)
+ metrics(BasicWriteJobStatsTracker.NUM_FILES_KEY).add(numFiles)
+ metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_BYTES_KEY).add(totalNumBytes)
+ metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_ROWS_KEY).add(totalNumOutput)
+ metrics(BasicWriteJobStatsTracker.NUM_PARTS_KEY).add(numPartitions)
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toList)
}
}
+
+object BasicWriteJobStatsTracker {
+ private val NUM_FILES_KEY = "numFiles"
+ private val NUM_OUTPUT_BYTES_KEY = "numOutputBytes"
+ private val NUM_OUTPUT_ROWS_KEY = "numOutputRows"
+ private val NUM_PARTS_KEY = "numParts"
+
+ def metrics: Map[String, SQLMetric] = {
+ val sparkContext = SparkContext.getActive.get
+ Map(
+ NUM_FILES_KEY -> SQLMetrics.createMetric(sparkContext, "number of written files"),
+ NUM_OUTPUT_BYTES_KEY -> SQLMetrics.createMetric(sparkContext, "bytes of written output"),
+ NUM_OUTPUT_ROWS_KEY -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
+ NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of dynamic part")
+ )
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/918c7e99/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index 87a17ce..b3d12f6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -26,7 +26,8 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.datasources.{FileFormat, FileFormatWriter}
+import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormat, FileFormatWriter}
+import org.apache.spark.util.SerializableConfiguration
object FileStreamSink extends Logging {
// The name of the subdirectory that is used to store metadata about which files are valid.
@@ -97,6 +98,11 @@ class FileStreamSink(
new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toUri.toString)
private val hadoopConf = sparkSession.sessionState.newHadoopConf()
+ private def basicWriteJobStatsTracker: BasicWriteJobStatsTracker = {
+ val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
+ new BasicWriteJobStatsTracker(serializableHadoopConf, BasicWriteJobStatsTracker.metrics)
+ }
+
override def addBatch(batchId: Long, data: DataFrame): Unit = {
if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
logInfo(s"Skipping already committed batch $batchId")
@@ -131,7 +137,7 @@ class FileStreamSink(
hadoopConf = hadoopConf,
partitionColumns = partitionColumns,
bucketSpec = None,
- statsTrackers = Nil,
+ statsTrackers = Seq(basicWriteJobStatsTracker),
options = options)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/918c7e99/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 31e5527..cf41d7e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -21,6 +21,7 @@ import java.util.Locale
import org.apache.hadoop.fs.Path
+import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.{AnalysisException, DataFrame}
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources._
@@ -405,4 +406,55 @@ class FileStreamSinkSuite extends StreamTest {
}
}
}
+
+ test("SPARK-23288 writing and checking output metrics") {
+ Seq("parquet", "orc", "text", "json").foreach { format =>
+ val inputData = MemoryStream[String]
+ val df = inputData.toDF()
+
+ withTempDir { outputDir =>
+ withTempDir { checkpointDir =>
+
+ var query: StreamingQuery = null
+
+ var numTasks = 0
+ var recordsWritten: Long = 0L
+ var bytesWritten: Long = 0L
+ try {
+ spark.sparkContext.addSparkListener(new SparkListener() {
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+ val outputMetrics = taskEnd.taskMetrics.outputMetrics
+ recordsWritten += outputMetrics.recordsWritten
+ bytesWritten += outputMetrics.bytesWritten
+ numTasks += 1
+ }
+ })
+
+ query =
+ df.writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .format(format)
+ .start(outputDir.getCanonicalPath)
+
+ inputData.addData("1", "2", "3")
+ inputData.addData("4", "5")
+
+ failAfter(streamingTimeout) {
+ query.processAllAvailable()
+ }
+ spark.sparkContext.listenerBus.waitUntilEmpty(streamingTimeout.toMillis)
+
+ assert(numTasks > 0)
+ assert(recordsWritten === 5)
+ // This is heavily file type/version specific but should be filled
+ assert(bytesWritten > 0)
+ } finally {
+ if (query != null) {
+ query.stop()
+ }
+ }
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org