You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/04/19 08:48:29 UTC
spark git commit: [SPARK-14595][SQL] add input metrics for FileScanRDD
Repository: spark
Updated Branches:
refs/heads/master 6f8800689 -> d4b94ead9
[SPARK-14595][SQL] add input metrics for FileScanRDD
## What changes were proposed in this pull request?
This is roughly based on the input metrics logic in `SqlNewHadoopRDD`
## How was this patch tested?
Not sure how to write a test, I manually verified it in Spark UI.
Author: Wenchen Fan <we...@databricks.com>
Closes #12352 from cloud-fan/metrics.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d4b94ead
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d4b94ead
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d4b94ead
Branch: refs/heads/master
Commit: d4b94ead92177a18d78a9701cfde9979641d2a18
Parents: 6f88006
Author: Wenchen Fan <we...@databricks.com>
Authored: Mon Apr 18 23:48:22 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Apr 18 23:48:22 2016 -0700
----------------------------------------------------------------------
.../sql/execution/datasources/FileScanRDD.scala | 60 +++++++++++++++++---
1 file changed, 53 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d4b94ead/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 468e101..f86911e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -18,14 +18,16 @@
package org.apache.spark.sql.execution.datasources
import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{InputFileNameHolder, RDD}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch
/**
* A single file that should be read, along with partition column values that
* need to be prepended to each row. The reading should start at the first
- * valid record found after `offset`.
+ * valid record found after `start`.
*/
case class PartitionedFile(
partitionValues: InternalRow,
@@ -53,33 +55,77 @@ class FileScanRDD(
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val iterator = new Iterator[Object] with AutoCloseable {
+ private val inputMetrics = context.taskMetrics().inputMetrics
+ private val existingBytesRead = inputMetrics.bytesRead
+
+ // Find a function that will return the FileSystem bytes read by this thread. Do this before
+ // apply readFunction, because it might read some bytes.
+ private val getBytesReadCallback: Option[() => Long] =
+ SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
+
+ // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
+ // If we do a coalesce, however, we are likely to compute multiple partitions in the same
+ // task and in the same thread, in which case we need to avoid override values written by
+ // previous partitions (SPARK-13071).
+ private def updateBytesRead(): Unit = {
+ getBytesReadCallback.foreach { getBytesRead =>
+ inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
+ }
+ }
+
+ // If we can't get the bytes read from the FS stats, fall back to the file size,
+ // which may be inaccurate.
+ private def updateBytesReadWithFileSize(): Unit = {
+ if (getBytesReadCallback.isEmpty && currentFile != null) {
+ inputMetrics.incBytesRead(currentFile.length)
+ }
+ }
+
private[this] val files = split.asInstanceOf[FilePartition].files.toIterator
+ private[this] var currentFile: PartitionedFile = null
private[this] var currentIterator: Iterator[Object] = null
def hasNext = (currentIterator != null && currentIterator.hasNext) || nextIterator()
- def next() = currentIterator.next()
+ def next() = {
+ val nextElement = currentIterator.next()
+ // TODO: we should have a better separation of row based and batch based scan, so that we
+ // don't need to run this `if` for every record.
+ if (nextElement.isInstanceOf[ColumnarBatch]) {
+ inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows())
+ } else {
+ inputMetrics.incRecordsRead(1)
+ }
+ if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
+ updateBytesRead()
+ }
+ nextElement
+ }
/** Advances to the next file. Returns true if a new non-empty iterator is available. */
private def nextIterator(): Boolean = {
+ updateBytesReadWithFileSize()
if (files.hasNext) {
- val nextFile = files.next()
- logInfo(s"Reading File $nextFile")
- InputFileNameHolder.setInputFileName(nextFile.filePath)
- currentIterator = readFunction(nextFile)
+ currentFile = files.next()
+ logInfo(s"Reading File $currentFile")
+ InputFileNameHolder.setInputFileName(currentFile.filePath)
+ currentIterator = readFunction(currentFile)
hasNext
} else {
+ currentFile = null
InputFileNameHolder.unsetInputFileName()
false
}
}
override def close() = {
+ updateBytesRead()
+ updateBytesReadWithFileSize()
InputFileNameHolder.unsetInputFileName()
}
}
// Register an on-task-completion callback to close the input stream.
- context.addTaskCompletionListener(context => iterator.close())
+ context.addTaskCompletionListener(_ => iterator.close())
iterator.asInstanceOf[Iterator[InternalRow]] // This is an erasure hack.
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org