You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/03/21 17:06:30 UTC

spark git commit: [SPARK-23288][SS] Fix output metrics with parquet sink

Repository: spark
Updated Branches:
  refs/heads/master 98d0ea3f6 -> 918c7e99a


[SPARK-23288][SS] Fix output metrics with parquet sink

## What changes were proposed in this pull request?

Output metrics were not filled when parquet sink used.

This PR fixes this problem by passing a `BasicWriteJobStatsTracker` in `FileStreamSink`.

## How was this patch tested?

Additional unit test added.

Author: Gabor Somogyi <ga...@gmail.com>

Closes #20745 from gaborgsomogyi/SPARK-23288.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/918c7e99
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/918c7e99
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/918c7e99

Branch: refs/heads/master
Commit: 918c7e99afdcea05c36626e230636c4f8aabf82c
Parents: 98d0ea3
Author: Gabor Somogyi <ga...@gmail.com>
Authored: Wed Mar 21 10:06:26 2018 -0700
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Mar 21 10:06:26 2018 -0700

----------------------------------------------------------------------
 .../execution/command/DataWritingCommand.scala  | 11 +----
 .../datasources/BasicWriteStatsTracker.scala    | 25 ++++++++--
 .../execution/streaming/FileStreamSink.scala    | 10 +++-
 .../sql/streaming/FileStreamSinkSuite.scala     | 52 ++++++++++++++++++++
 4 files changed, 82 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/918c7e99/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
index e56f810..e11dbd2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.command
 
 import org.apache.hadoop.conf.Configuration
 
-import org.apache.spark.SparkContext
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
@@ -45,15 +44,7 @@ trait DataWritingCommand extends Command {
   // Output columns of the analyzed input query plan
   def outputColumns: Seq[Attribute]
 
-  lazy val metrics: Map[String, SQLMetric] = {
-    val sparkContext = SparkContext.getActive.get
-    Map(
-      "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files"),
-      "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of written output"),
-      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
-      "numParts" -> SQLMetrics.createMetric(sparkContext, "number of dynamic part")
-    )
-  }
+  lazy val metrics: Map[String, SQLMetric] = BasicWriteJobStatsTracker.metrics
 
   def basicWriteJobStatsTracker(hadoopConf: Configuration): BasicWriteJobStatsTracker = {
     val serializableHadoopConf = new SerializableConfiguration(hadoopConf)

http://git-wip-us.apache.org/repos/asf/spark/blob/918c7e99/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
index 9dbbe99..69c03d8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
@@ -153,12 +153,29 @@ class BasicWriteJobStatsTracker(
       totalNumOutput += summary.numRows
     }
 
-    metrics("numFiles").add(numFiles)
-    metrics("numOutputBytes").add(totalNumBytes)
-    metrics("numOutputRows").add(totalNumOutput)
-    metrics("numParts").add(numPartitions)
+    metrics(BasicWriteJobStatsTracker.NUM_FILES_KEY).add(numFiles)
+    metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_BYTES_KEY).add(totalNumBytes)
+    metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_ROWS_KEY).add(totalNumOutput)
+    metrics(BasicWriteJobStatsTracker.NUM_PARTS_KEY).add(numPartitions)
 
     val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
     SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toList)
   }
 }
+
+object BasicWriteJobStatsTracker {
+  private val NUM_FILES_KEY = "numFiles"
+  private val NUM_OUTPUT_BYTES_KEY = "numOutputBytes"
+  private val NUM_OUTPUT_ROWS_KEY = "numOutputRows"
+  private val NUM_PARTS_KEY = "numParts"
+
+  def metrics: Map[String, SQLMetric] = {
+    val sparkContext = SparkContext.getActive.get
+    Map(
+      NUM_FILES_KEY -> SQLMetrics.createMetric(sparkContext, "number of written files"),
+      NUM_OUTPUT_BYTES_KEY -> SQLMetrics.createMetric(sparkContext, "bytes of written output"),
+      NUM_OUTPUT_ROWS_KEY -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
+      NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of dynamic part")
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/918c7e99/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index 87a17ce..b3d12f6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -26,7 +26,8 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.datasources.{FileFormat, FileFormatWriter}
+import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormat, FileFormatWriter}
+import org.apache.spark.util.SerializableConfiguration
 
 object FileStreamSink extends Logging {
   // The name of the subdirectory that is used to store metadata about which files are valid.
@@ -97,6 +98,11 @@ class FileStreamSink(
     new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toUri.toString)
   private val hadoopConf = sparkSession.sessionState.newHadoopConf()
 
+  private def basicWriteJobStatsTracker: BasicWriteJobStatsTracker = {
+    val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
+    new BasicWriteJobStatsTracker(serializableHadoopConf, BasicWriteJobStatsTracker.metrics)
+  }
+
   override def addBatch(batchId: Long, data: DataFrame): Unit = {
     if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
       logInfo(s"Skipping already committed batch $batchId")
@@ -131,7 +137,7 @@ class FileStreamSink(
         hadoopConf = hadoopConf,
         partitionColumns = partitionColumns,
         bucketSpec = None,
-        statsTrackers = Nil,
+        statsTrackers = Seq(basicWriteJobStatsTracker),
         options = options)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/918c7e99/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 31e5527..cf41d7e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -21,6 +21,7 @@ import java.util.Locale
 
 import org.apache.hadoop.fs.Path
 
+import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
 import org.apache.spark.sql.{AnalysisException, DataFrame}
 import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.datasources._
@@ -405,4 +406,55 @@ class FileStreamSinkSuite extends StreamTest {
       }
     }
   }
+
+  test("SPARK-23288 writing and checking output metrics") {
+    Seq("parquet", "orc", "text", "json").foreach { format =>
+      val inputData = MemoryStream[String]
+      val df = inputData.toDF()
+
+      withTempDir { outputDir =>
+        withTempDir { checkpointDir =>
+
+          var query: StreamingQuery = null
+
+          var numTasks = 0
+          var recordsWritten: Long = 0L
+          var bytesWritten: Long = 0L
+          try {
+            spark.sparkContext.addSparkListener(new SparkListener() {
+              override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+                val outputMetrics = taskEnd.taskMetrics.outputMetrics
+                recordsWritten += outputMetrics.recordsWritten
+                bytesWritten += outputMetrics.bytesWritten
+                numTasks += 1
+              }
+            })
+
+            query =
+              df.writeStream
+                .option("checkpointLocation", checkpointDir.getCanonicalPath)
+                .format(format)
+                .start(outputDir.getCanonicalPath)
+
+            inputData.addData("1", "2", "3")
+            inputData.addData("4", "5")
+
+            failAfter(streamingTimeout) {
+              query.processAllAvailable()
+            }
+            spark.sparkContext.listenerBus.waitUntilEmpty(streamingTimeout.toMillis)
+
+            assert(numTasks > 0)
+            assert(recordsWritten === 5)
+            // This is heavily file type/version specific but should be filled
+            assert(bytesWritten > 0)
+          } finally {
+            if (query != null) {
+              query.stop()
+            }
+          }
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org