You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/04/19 08:48:29 UTC

spark git commit: [SPARK-14595][SQL] add input metrics for FileScanRDD

Repository: spark
Updated Branches:
  refs/heads/master 6f8800689 -> d4b94ead9


[SPARK-14595][SQL] add input metrics for FileScanRDD

## What changes were proposed in this pull request?

This is roughly based on the input metrics logic in `SqlNewHadoopRDD`

## How was this patch tested?

Not sure how to write a test, I manually verified it in Spark UI.

Author: Wenchen Fan <we...@databricks.com>

Closes #12352 from cloud-fan/metrics.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d4b94ead
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d4b94ead
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d4b94ead

Branch: refs/heads/master
Commit: d4b94ead92177a18d78a9701cfde9979641d2a18
Parents: 6f88006
Author: Wenchen Fan <we...@databricks.com>
Authored: Mon Apr 18 23:48:22 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Apr 18 23:48:22 2016 -0700

----------------------------------------------------------------------
 .../sql/execution/datasources/FileScanRDD.scala | 60 +++++++++++++++++---
 1 file changed, 53 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d4b94ead/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 468e101..f86911e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -18,14 +18,16 @@
 package org.apache.spark.sql.execution.datasources
 
 import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.{InputFileNameHolder, RDD}
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch
 
 /**
  * A single file that should be read, along with partition column values that
  * need to be prepended to each row.  The reading should start at the first
- * valid record found after `offset`.
+ * valid record found after `start`.
  */
 case class PartitionedFile(
     partitionValues: InternalRow,
@@ -53,33 +55,77 @@ class FileScanRDD(
 
   override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
     val iterator = new Iterator[Object] with AutoCloseable {
+      private val inputMetrics = context.taskMetrics().inputMetrics
+      private val existingBytesRead = inputMetrics.bytesRead
+
+      // Find a function that will return the FileSystem bytes read by this thread. Do this before
+      // apply readFunction, because it might read some bytes.
+      private val getBytesReadCallback: Option[() => Long] =
+        SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
+
+      // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
+      // If we do a coalesce, however, we are likely to compute multiple partitions in the same
+      // task and in the same thread, in which case we need to avoid override values written by
+      // previous partitions (SPARK-13071).
+      private def updateBytesRead(): Unit = {
+        getBytesReadCallback.foreach { getBytesRead =>
+          inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
+        }
+      }
+
+      // If we can't get the bytes read from the FS stats, fall back to the file size,
+      // which may be inaccurate.
+      private def updateBytesReadWithFileSize(): Unit = {
+        if (getBytesReadCallback.isEmpty && currentFile != null) {
+          inputMetrics.incBytesRead(currentFile.length)
+        }
+      }
+
       private[this] val files = split.asInstanceOf[FilePartition].files.toIterator
+      private[this] var currentFile: PartitionedFile = null
       private[this] var currentIterator: Iterator[Object] = null
 
       def hasNext = (currentIterator != null && currentIterator.hasNext) || nextIterator()
-      def next() = currentIterator.next()
+      def next() = {
+        val nextElement = currentIterator.next()
+        // TODO: we should have a better separation of row based and batch based scan, so that we
+        // don't need to run this `if` for every record.
+        if (nextElement.isInstanceOf[ColumnarBatch]) {
+          inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows())
+        } else {
+          inputMetrics.incRecordsRead(1)
+        }
+        if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
+          updateBytesRead()
+        }
+        nextElement
+      }
 
       /** Advances to the next file. Returns true if a new non-empty iterator is available. */
       private def nextIterator(): Boolean = {
+        updateBytesReadWithFileSize()
         if (files.hasNext) {
-          val nextFile = files.next()
-          logInfo(s"Reading File $nextFile")
-          InputFileNameHolder.setInputFileName(nextFile.filePath)
-          currentIterator = readFunction(nextFile)
+          currentFile = files.next()
+          logInfo(s"Reading File $currentFile")
+          InputFileNameHolder.setInputFileName(currentFile.filePath)
+          currentIterator = readFunction(currentFile)
           hasNext
         } else {
+          currentFile = null
           InputFileNameHolder.unsetInputFileName()
           false
         }
       }
 
       override def close() = {
+        updateBytesRead()
+        updateBytesReadWithFileSize()
         InputFileNameHolder.unsetInputFileName()
       }
     }
 
     // Register an on-task-completion callback to close the input stream.
-    context.addTaskCompletionListener(context => iterator.close())
+    context.addTaskCompletionListener(_ => iterator.close())
 
     iterator.asInstanceOf[Iterator[InternalRow]] // This is an erasure hack.
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org