You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/06 07:47:14 UTC
spark git commit: [SPARK-20703][SQL] Associate metrics with data
writes onto DataFrameWriter operations
Repository: spark
Updated Branches:
refs/heads/master 5800144a5 -> 6ff05a66f
[SPARK-20703][SQL] Associate metrics with data writes onto DataFrameWriter operations
## What changes were proposed in this pull request?
Right now in the UI, after SPARK-20213, we can show the operations to write data out. However, there is no way to associate metrics with data writes. We should show relative metrics on the operations.
#### Supported commands
This change supports updating metrics for file-based data writing operations, including `InsertIntoHadoopFsRelationCommand`, `InsertIntoHiveTable`.
Supported metrics:
* number of written files
* number of dynamic partitions
* total bytes of written data
* total number of output rows
* average writing data out time (ms)
* (TODO) min/med/max number of output rows per file/partition
* (TODO) min/med/max bytes of written data per file/partition
#### Commands not supported
`InsertIntoDataSourceCommand`, `SaveIntoDataSourceCommand`:
The two commands uses DataSource APIs to write data out, i.e., the logic of writing data out is delegated to the DataSource implementations, such as `InsertableRelation.insert` and `CreatableRelationProvider.createRelation`. So we can't obtain metrics from delegated methods for now.
`CreateHiveTableAsSelectCommand`, `CreateDataSourceTableAsSelectCommand` :
The two commands invokes other commands to write data out. The invoked commands can even write to non file-based data source. We leave them as future TODO.
#### How to update metrics of writing files out
A `RunnableCommand` which wants to update metrics, needs to override its `metrics` and provide the metrics data structure to `ExecutedCommandExec`.
The metrics are prepared during the execution of `FileFormatWriter`. The callback function passed to `FileFormatWriter` will accept the metrics and update accordingly.
There is a metrics updating function in `RunnableCommand`. In runtime, the function will be bound to the spark context and `metrics` of `ExecutedCommandExec` and pass to `FileFormatWriter`.
## How was this patch tested?
Updated unit tests.
Author: Liang-Chi Hsieh <vi...@gmail.com>
Closes #18159 from viirya/SPARK-20703-2.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6ff05a66
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6ff05a66
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6ff05a66
Branch: refs/heads/master
Commit: 6ff05a66fe83e721063efe5c28d2ffeb850fecc7
Parents: 5800144
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Thu Jul 6 15:47:09 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Jul 6 15:47:09 2017 +0800
----------------------------------------------------------------------
.../scala/org/apache/spark/util/Utils.scala | 9 ++
.../execution/command/DataWritingCommand.scala | 75 ++++++++++
.../spark/sql/execution/command/commands.scala | 12 ++
.../datasources/FileFormatWriter.scala | 121 +++++++++++++---
.../InsertIntoHadoopFsRelationCommand.scala | 18 ++-
.../sql/sources/PartitionedWriteSuite.scala | 21 +--
.../hive/execution/InsertIntoHiveTable.scala | 8 +-
.../sql/hive/execution/SQLMetricsSuite.scala | 139 +++++++++++++++++++
8 files changed, 362 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/6ff05a66/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 26f61e2..b4caf68 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1003,6 +1003,15 @@ private[spark] object Utils extends Logging {
}
/**
+ * Lists files recursively.
+ */
+ def recursiveList(f: File): Array[File] = {
+ require(f.isDirectory)
+ val current = f.listFiles
+ current ++ current.filter(_.isDirectory).flatMap(recursiveList)
+ }
+
+ /**
* Delete a file or directory and its contents recursively.
* Don't follow directories if they are symlinks.
* Throws an exception if deletion is unsuccessful.
http://git-wip-us.apache.org/repos/asf/spark/blob/6ff05a66/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
new file mode 100644
index 0000000..0c381a2
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+
+/**
+ * A special `RunnableCommand` which writes data out and updates metrics.
+ */
+trait DataWritingCommand extends RunnableCommand {
+
+ override lazy val metrics: Map[String, SQLMetric] = {
+ val sparkContext = SparkContext.getActive.get
+ Map(
+ "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing time (ms)"),
+ "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files"),
+ "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of written output"),
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
+ "numParts" -> SQLMetrics.createMetric(sparkContext, "number of dynamic part")
+ )
+ }
+
+ /**
+ * Callback function that update metrics collected from the writing operation.
+ */
+ protected def updateWritingMetrics(writeSummaries: Seq[ExecutedWriteSummary]): Unit = {
+ val sparkContext = SparkContext.getActive.get
+ var numPartitions = 0
+ var numFiles = 0
+ var totalNumBytes: Long = 0L
+ var totalNumOutput: Long = 0L
+ var totalWritingTime: Long = 0L
+
+ writeSummaries.foreach { summary =>
+ numPartitions += summary.updatedPartitions.size
+ numFiles += summary.numOutputFile
+ totalNumBytes += summary.numOutputBytes
+ totalNumOutput += summary.numOutputRows
+ totalWritingTime += summary.totalWritingTime
+ }
+
+ val avgWritingTime = if (numFiles > 0) {
+ (totalWritingTime / numFiles).toLong
+ } else {
+ 0L
+ }
+
+ metrics("avgTime").add(avgWritingTime)
+ metrics("numFiles").add(numFiles)
+ metrics("numOutputBytes").add(totalNumBytes)
+ metrics("numOutputRows").add(totalNumOutput)
+ metrics("numParts").add(numPartitions)
+
+ val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+ SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toList)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/6ff05a66/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 81bc93e..7cd4bae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.debug._
+import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types._
@@ -37,6 +38,11 @@ import org.apache.spark.sql.types._
* wrapped in `ExecutedCommand` during execution.
*/
trait RunnableCommand extends logical.Command {
+
+ // The map used to record the metrics of running the command. This will be passed to
+ // `ExecutedCommand` during query planning.
+ lazy val metrics: Map[String, SQLMetric] = Map.empty
+
def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = {
throw new NotImplementedError
}
@@ -49,8 +55,14 @@ trait RunnableCommand extends logical.Command {
/**
* A physical operator that executes the run method of a `RunnableCommand` and
* saves the result to prevent multiple executions.
+ *
+ * @param cmd the `RunnableCommand` this operator will run.
+ * @param children the children physical plans ran by the `RunnableCommand`.
*/
case class ExecutedCommandExec(cmd: RunnableCommand, children: Seq[SparkPlan]) extends SparkPlan {
+
+ override lazy val metrics: Map[String, SQLMetric] = cmd.metrics
+
/**
* A concrete command should override this lazy field to wrap up any side effects caused by the
* command or any other computation that should be evaluated exactly once. The value of this field
http://git-wip-us.apache.org/repos/asf/spark/blob/6ff05a66/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 0daffa9..6486663 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -22,7 +22,7 @@ import java.util.{Date, UUID}
import scala.collection.mutable
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
@@ -82,7 +82,7 @@ object FileFormatWriter extends Logging {
}
/** The result of a successful write task. */
- private case class WriteTaskResult(commitMsg: TaskCommitMessage, updatedPartitions: Set[String])
+ private case class WriteTaskResult(commitMsg: TaskCommitMessage, summary: ExecutedWriteSummary)
/**
* Basic work flow of this command is:
@@ -104,7 +104,7 @@ object FileFormatWriter extends Logging {
hadoopConf: Configuration,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
- refreshFunction: (Seq[TablePartitionSpec]) => Unit,
+ refreshFunction: (Seq[ExecutedWriteSummary]) => Unit,
options: Map[String, String]): Unit = {
val job = Job.getInstance(hadoopConf)
@@ -196,12 +196,10 @@ object FileFormatWriter extends Logging {
})
val commitMsgs = ret.map(_.commitMsg)
- val updatedPartitions = ret.flatMap(_.updatedPartitions)
- .distinct.map(PartitioningUtils.parsePathFragment)
committer.commitJob(job, commitMsgs)
logInfo(s"Job ${job.getJobID} committed.")
- refreshFunction(updatedPartitions)
+ refreshFunction(ret.map(_.summary))
} catch { case cause: Throwable =>
logError(s"Aborting job ${job.getJobID}.", cause)
committer.abortJob(job)
@@ -247,9 +245,9 @@ object FileFormatWriter extends Logging {
try {
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
// Execute the task to write rows out and commit the task.
- val outputPartitions = writeTask.execute(iterator)
+ val summary = writeTask.execute(iterator)
writeTask.releaseResources()
- WriteTaskResult(committer.commitTask(taskAttemptContext), outputPartitions)
+ WriteTaskResult(committer.commitTask(taskAttemptContext), summary)
})(catchBlock = {
// If there is an error, release resource and then abort the task
try {
@@ -273,12 +271,36 @@ object FileFormatWriter extends Logging {
* automatically trigger task aborts.
*/
private trait ExecuteWriteTask {
+
/**
- * Writes data out to files, and then returns the list of partition strings written out.
- * The list of partitions is sent back to the driver and used to update the catalog.
+ * The data structures used to measure metrics during writing.
*/
- def execute(iterator: Iterator[InternalRow]): Set[String]
+ protected var totalWritingTime: Long = 0L
+ protected var timeOnCurrentFile: Long = 0L
+ protected var numOutputRows: Long = 0L
+ protected var numOutputBytes: Long = 0L
+
+ /**
+ * Writes data out to files, and then returns the summary of relative information which
+ * includes the list of partition strings written out. The list of partitions is sent back
+ * to the driver and used to update the catalog. Other information will be sent back to the
+ * driver too and used to update the metrics in UI.
+ */
+ def execute(iterator: Iterator[InternalRow]): ExecutedWriteSummary
def releaseResources(): Unit
+
+ /**
+ * A helper function used to determine the size in bytes of a written file.
+ */
+ protected def getFileSize(conf: Configuration, filePath: String): Long = {
+ if (filePath != null) {
+ val path = new Path(filePath)
+ val fs = path.getFileSystem(conf)
+ fs.getFileStatus(path).getLen()
+ } else {
+ 0L
+ }
+ }
}
/** Writes data to a single directory (used for non-dynamic-partition writes). */
@@ -288,24 +310,26 @@ object FileFormatWriter extends Logging {
committer: FileCommitProtocol) extends ExecuteWriteTask {
private[this] var currentWriter: OutputWriter = _
+ private[this] var currentPath: String = _
private def newOutputWriter(fileCounter: Int): Unit = {
val ext = description.outputWriterFactory.getFileExtension(taskAttemptContext)
- val tmpFilePath = committer.newTaskTempFile(
+ currentPath = committer.newTaskTempFile(
taskAttemptContext,
None,
f"-c$fileCounter%03d" + ext)
currentWriter = description.outputWriterFactory.newInstance(
- path = tmpFilePath,
+ path = currentPath,
dataSchema = description.dataColumns.toStructType,
context = taskAttemptContext)
}
- override def execute(iter: Iterator[InternalRow]): Set[String] = {
+ override def execute(iter: Iterator[InternalRow]): ExecutedWriteSummary = {
var fileCounter = 0
var recordsInFile: Long = 0L
newOutputWriter(fileCounter)
+
while (iter.hasNext) {
if (description.maxRecordsPerFile > 0 && recordsInFile >= description.maxRecordsPerFile) {
fileCounter += 1
@@ -314,21 +338,35 @@ object FileFormatWriter extends Logging {
recordsInFile = 0
releaseResources()
+ numOutputRows += recordsInFile
newOutputWriter(fileCounter)
}
val internalRow = iter.next()
+ val startTime = System.nanoTime()
currentWriter.write(internalRow)
+ timeOnCurrentFile += (System.nanoTime() - startTime)
recordsInFile += 1
}
releaseResources()
- Set.empty
+ numOutputRows += recordsInFile
+
+ ExecutedWriteSummary(
+ updatedPartitions = Set.empty,
+ numOutputFile = fileCounter + 1,
+ numOutputBytes = numOutputBytes,
+ numOutputRows = numOutputRows,
+ totalWritingTime = totalWritingTime)
}
override def releaseResources(): Unit = {
if (currentWriter != null) {
try {
+ val startTime = System.nanoTime()
currentWriter.close()
+ totalWritingTime += (timeOnCurrentFile + System.nanoTime() - startTime) / 1000 / 1000
+ timeOnCurrentFile = 0
+ numOutputBytes += getFileSize(taskAttemptContext.getConfiguration, currentPath)
} finally {
currentWriter = null
}
@@ -348,6 +386,8 @@ object FileFormatWriter extends Logging {
// currentWriter is initialized whenever we see a new key
private var currentWriter: OutputWriter = _
+ private var currentPath: String = _
+
/** Expressions that given partition columns build a path string like: col1=val/col2=val/... */
private def partitionPathExpression: Seq[Expression] = {
desc.partitionColumns.zipWithIndex.flatMap { case (c, i) =>
@@ -403,19 +443,19 @@ object FileFormatWriter extends Logging {
case _ =>
None
}
- val path = if (customPath.isDefined) {
+ currentPath = if (customPath.isDefined) {
committer.newTaskTempFileAbsPath(taskAttemptContext, customPath.get, ext)
} else {
committer.newTaskTempFile(taskAttemptContext, partDir, ext)
}
currentWriter = desc.outputWriterFactory.newInstance(
- path = path,
+ path = currentPath,
dataSchema = desc.dataColumns.toStructType,
context = taskAttemptContext)
}
- override def execute(iter: Iterator[InternalRow]): Set[String] = {
+ override def execute(iter: Iterator[InternalRow]): ExecutedWriteSummary = {
val getPartitionColsAndBucketId = UnsafeProjection.create(
desc.partitionColumns ++ desc.bucketIdExpression, desc.allColumns)
@@ -429,15 +469,22 @@ object FileFormatWriter extends Logging {
// If anything below fails, we should abort the task.
var recordsInFile: Long = 0L
var fileCounter = 0
+ var totalFileCounter = 0
var currentPartColsAndBucketId: UnsafeRow = null
val updatedPartitions = mutable.Set[String]()
+
for (row <- iter) {
val nextPartColsAndBucketId = getPartitionColsAndBucketId(row)
if (currentPartColsAndBucketId != nextPartColsAndBucketId) {
+ if (currentPartColsAndBucketId != null) {
+ totalFileCounter += (fileCounter + 1)
+ }
+
// See a new partition or bucket - write to a new partition dir (or a new bucket file).
currentPartColsAndBucketId = nextPartColsAndBucketId.copy()
logDebug(s"Writing partition: $currentPartColsAndBucketId")
+ numOutputRows += recordsInFile
recordsInFile = 0
fileCounter = 0
@@ -447,6 +494,8 @@ object FileFormatWriter extends Logging {
recordsInFile >= desc.maxRecordsPerFile) {
// Exceeded the threshold in terms of the number of records per file.
// Create a new file by increasing the file counter.
+
+ numOutputRows += recordsInFile
recordsInFile = 0
fileCounter += 1
assert(fileCounter < MAX_FILE_COUNTER,
@@ -455,18 +504,33 @@ object FileFormatWriter extends Logging {
releaseResources()
newOutputWriter(currentPartColsAndBucketId, getPartPath, fileCounter, updatedPartitions)
}
-
+ val startTime = System.nanoTime()
currentWriter.write(getOutputRow(row))
+ timeOnCurrentFile += (System.nanoTime() - startTime)
recordsInFile += 1
}
+ if (currentPartColsAndBucketId != null) {
+ totalFileCounter += (fileCounter + 1)
+ }
releaseResources()
- updatedPartitions.toSet
+ numOutputRows += recordsInFile
+
+ ExecutedWriteSummary(
+ updatedPartitions = updatedPartitions.toSet,
+ numOutputFile = totalFileCounter,
+ numOutputBytes = numOutputBytes,
+ numOutputRows = numOutputRows,
+ totalWritingTime = totalWritingTime)
}
override def releaseResources(): Unit = {
if (currentWriter != null) {
try {
+ val startTime = System.nanoTime()
currentWriter.close()
+ totalWritingTime += (timeOnCurrentFile + System.nanoTime() - startTime) / 1000 / 1000
+ timeOnCurrentFile = 0
+ numOutputBytes += getFileSize(taskAttemptContext.getConfiguration, currentPath)
} finally {
currentWriter = null
}
@@ -474,3 +538,20 @@ object FileFormatWriter extends Logging {
}
}
}
+
+/**
+ * Wrapper class for the metrics of writing data out.
+ *
+ * @param updatedPartitions the partitions updated during writing data out. Only valid
+ * for dynamic partition.
+ * @param numOutputFile the total number of files.
+ * @param numOutputRows the number of output rows.
+ * @param numOutputBytes the bytes of output data.
+ * @param totalWritingTime the total writing time in ms.
+ */
+case class ExecutedWriteSummary(
+ updatedPartitions: Set[String],
+ numOutputFile: Int,
+ numOutputRows: Long,
+ numOutputBytes: Long,
+ totalWritingTime: Long)
http://git-wip-us.apache.org/repos/asf/spark/blob/6ff05a66/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index ab26f2a..0031567 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -21,6 +21,7 @@ import java.io.IOException
import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.spark.SparkContext
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTablePartition}
@@ -29,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
/**
* A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending.
@@ -53,7 +55,7 @@ case class InsertIntoHadoopFsRelationCommand(
mode: SaveMode,
catalogTable: Option[CatalogTable],
fileIndex: Option[FileIndex])
- extends RunnableCommand {
+ extends DataWritingCommand {
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
override def children: Seq[LogicalPlan] = query :: Nil
@@ -123,8 +125,16 @@ case class InsertIntoHadoopFsRelationCommand(
if (doInsertion) {
- // Callback for updating metastore partition metadata after the insertion job completes.
- def refreshPartitionsCallback(updatedPartitions: Seq[TablePartitionSpec]): Unit = {
+ // Callback for updating metric and metastore partition metadata
+ // after the insertion job completes.
+ def refreshCallback(summary: Seq[ExecutedWriteSummary]): Unit = {
+ val updatedPartitions = summary.flatMap(_.updatedPartitions)
+ .distinct.map(PartitioningUtils.parsePathFragment)
+
+ // Updating metrics.
+ updateWritingMetrics(summary)
+
+ // Updating metastore partition metadata.
if (partitionsTrackedByCatalog) {
val newPartitions = updatedPartitions.toSet -- initialMatchingPartitions
if (newPartitions.nonEmpty) {
@@ -154,7 +164,7 @@ case class InsertIntoHadoopFsRelationCommand(
hadoopConf = hadoopConf,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
- refreshFunction = refreshPartitionsCallback,
+ refreshFunction = refreshCallback,
options = options)
// refresh cached files in FileIndex
http://git-wip-us.apache.org/repos/asf/spark/blob/6ff05a66/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
index a2f3afe..6f998aa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
@@ -91,15 +91,15 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext {
withTempDir { f =>
spark.range(start = 0, end = 4, step = 1, numPartitions = 1)
.write.option("maxRecordsPerFile", 1).mode("overwrite").parquet(f.getAbsolutePath)
- assert(recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 4)
+ assert(Utils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 4)
spark.range(start = 0, end = 4, step = 1, numPartitions = 1)
.write.option("maxRecordsPerFile", 2).mode("overwrite").parquet(f.getAbsolutePath)
- assert(recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 2)
+ assert(Utils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 2)
spark.range(start = 0, end = 4, step = 1, numPartitions = 1)
.write.option("maxRecordsPerFile", -1).mode("overwrite").parquet(f.getAbsolutePath)
- assert(recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 1)
+ assert(Utils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 1)
}
}
@@ -111,7 +111,7 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext {
.option("maxRecordsPerFile", 1)
.mode("overwrite")
.parquet(f.getAbsolutePath)
- assert(recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 4)
+ assert(Utils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 4)
}
}
@@ -138,14 +138,14 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext {
val df = Seq((1, ts)).toDF("i", "ts")
withTempPath { f =>
df.write.partitionBy("ts").parquet(f.getAbsolutePath)
- val files = recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet"))
+ val files = Utils.recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet"))
assert(files.length == 1)
checkPartitionValues(files.head, "2016-12-01 00:00:00")
}
withTempPath { f =>
df.write.option(DateTimeUtils.TIMEZONE_OPTION, "GMT")
.partitionBy("ts").parquet(f.getAbsolutePath)
- val files = recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet"))
+ val files = Utils.recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet"))
assert(files.length == 1)
// use timeZone option "GMT" to format partition value.
checkPartitionValues(files.head, "2016-12-01 08:00:00")
@@ -153,18 +153,11 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext {
withTempPath { f =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "GMT") {
df.write.partitionBy("ts").parquet(f.getAbsolutePath)
- val files = recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet"))
+ val files = Utils.recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet"))
assert(files.length == 1)
// if there isn't timeZone option, then use session local timezone.
checkPartitionValues(files.head, "2016-12-01 08:00:00")
}
}
}
-
- /** Lists files recursively. */
- private def recursiveList(f: File): Array[File] = {
- require(f.isDirectory)
- val current = f.listFiles
- current ++ current.filter(_.isDirectory).flatMap(recursiveList)
- }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/6ff05a66/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 223d375..cd263e8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -31,14 +31,16 @@ import org.apache.hadoop.hive.ql.exec.TaskRunner
import org.apache.hadoop.hive.ql.ErrorMsg
import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.spark.SparkContext
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.command.{CommandUtils, RunnableCommand}
+import org.apache.spark.sql.execution.command.{CommandUtils, DataWritingCommand}
import org.apache.spark.sql.execution.datasources.FileFormatWriter
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.hive.client.{HiveClientImpl, HiveVersion}
@@ -80,7 +82,7 @@ case class InsertIntoHiveTable(
partition: Map[String, Option[String]],
query: LogicalPlan,
overwrite: Boolean,
- ifPartitionNotExists: Boolean) extends RunnableCommand {
+ ifPartitionNotExists: Boolean) extends DataWritingCommand {
override def children: Seq[LogicalPlan] = query :: Nil
@@ -354,7 +356,7 @@ case class InsertIntoHiveTable(
hadoopConf = hadoopConf,
partitionColumns = partitionAttributes,
bucketSpec = None,
- refreshFunction = _ => (),
+ refreshFunction = updateWritingMetrics,
options = Map.empty)
if (partition.nonEmpty) {
http://git-wip-us.apache.org/repos/asf/spark/blob/6ff05a66/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala
new file mode 100644
index 0000000..1ef1988
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.File
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.util.Utils
+
+class SQLMetricsSuite extends SQLTestUtils with TestHiveSingleton {
+ import spark.implicits._
+
+ /**
+ * Get execution metrics for the SQL execution and verify metrics values.
+ *
+ * @param metricsValues the expected metric values (numFiles, numPartitions, numOutputRows).
+ * @param func the function can produce execution id after running.
+ */
+ private def verifyWriteDataMetrics(metricsValues: Seq[Int])(func: => Unit): Unit = {
+ val previousExecutionIds = spark.sharedState.listener.executionIdToData.keySet
+ // Run the given function to trigger query execution.
+ func
+ spark.sparkContext.listenerBus.waitUntilEmpty(10000)
+ val executionIds =
+ spark.sharedState.listener.executionIdToData.keySet.diff(previousExecutionIds)
+ assert(executionIds.size == 1)
+ val executionId = executionIds.head
+
+ val executionData = spark.sharedState.listener.getExecution(executionId).get
+ val executedNode = executionData.physicalPlanGraph.nodes.head
+
+ val metricsNames = Seq(
+ "number of written files",
+ "number of dynamic part",
+ "number of output rows")
+
+ val metrics = spark.sharedState.listener.getExecutionMetrics(executionId)
+
+ metricsNames.zip(metricsValues).foreach { case (metricsName, expected) =>
+ val sqlMetric = executedNode.metrics.find(_.name == metricsName)
+ assert(sqlMetric.isDefined)
+ val accumulatorId = sqlMetric.get.accumulatorId
+ val metricValue = metrics(accumulatorId).replaceAll(",", "").toInt
+ assert(metricValue == expected)
+ }
+
+ val totalNumBytesMetric = executedNode.metrics.find(_.name == "bytes of written output").get
+ val totalNumBytes = metrics(totalNumBytesMetric.accumulatorId).replaceAll(",", "").toInt
+ assert(totalNumBytes > 0)
+ val writingTimeMetric = executedNode.metrics.find(_.name == "average writing time (ms)").get
+ val writingTime = metrics(writingTimeMetric.accumulatorId).replaceAll(",", "").toInt
+ assert(writingTime >= 0)
+ }
+
+ private def testMetricsNonDynamicPartition(
+ dataFormat: String,
+ tableName: String): Unit = {
+ withTable(tableName) {
+ Seq((1, 2)).toDF("i", "j")
+ .write.format(dataFormat).mode("overwrite").saveAsTable(tableName)
+
+ val tableLocation =
+ new File(spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).location)
+
+ // 2 files, 100 rows, 0 dynamic partition.
+ verifyWriteDataMetrics(Seq(2, 0, 100)) {
+ (0 until 100).map(i => (i, i + 1)).toDF("i", "j").repartition(2)
+ .write.format(dataFormat).mode("overwrite").insertInto(tableName)
+ }
+ assert(Utils.recursiveList(tableLocation).count(_.getName.startsWith("part-")) == 2)
+ }
+ }
+
+ private def testMetricsDynamicPartition(
+ provider: String,
+ dataFormat: String,
+ tableName: String): Unit = {
+ withTempPath { dir =>
+ spark.sql(
+ s"""
+ |CREATE TABLE $tableName(a int, b int)
+ |USING $provider
+ |PARTITIONED BY(a)
+ |LOCATION '${dir.toURI}'
+ """.stripMargin)
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
+
+ val df = spark.range(start = 0, end = 40, step = 1, numPartitions = 1)
+ .selectExpr("id a", "id b")
+
+ // 40 files, 80 rows, 40 dynamic partitions.
+ verifyWriteDataMetrics(Seq(40, 40, 80)) {
+ df.union(df).repartition(2, $"a")
+ .write
+ .format(dataFormat)
+ .mode("overwrite")
+ .insertInto(tableName)
+ }
+ assert(Utils.recursiveList(dir).count(_.getName.startsWith("part-")) == 40)
+ }
+ }
+
+ test("writing data out metrics: parquet") {
+ testMetricsNonDynamicPartition("parquet", "t1")
+ }
+
+ test("writing data out metrics with dynamic partition: parquet") {
+ testMetricsDynamicPartition("parquet", "parquet", "t1")
+ }
+
+ test("writing data out metrics: hive") {
+ testMetricsNonDynamicPartition("hive", "t1")
+ }
+
+ test("writing data out metrics dynamic partition: hive") {
+ withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
+ testMetricsDynamicPartition("hive", "hive", "t1")
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org