You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2015/01/28 00:42:57 UTC

spark git commit: SPARK-5199. FS read metrics should support CombineFileSplits and track bytes from all FSs

Repository: spark
Updated Branches:
  refs/heads/master fdaad4eb0 -> b1b35ca2e


SPARK-5199. FS read metrics should support CombineFileSplits and track bytes from all FSs

...mbineFileSplits

Author: Sandy Ryza <sa...@cloudera.com>

Closes #4050 from sryza/sandy-spark-5199 and squashes the following commits:

864514b [Sandy Ryza] Add tests and fix bug
0d504f1 [Sandy Ryza] Prettify
915c7e6 [Sandy Ryza] Get metrics from all filesystems
cdbc3e8 [Sandy Ryza] SPARK-5199. Input metrics should show up for InputFormats that return CombineFileSplits


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b1b35ca2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b1b35ca2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b1b35ca2

Branch: refs/heads/master
Commit: b1b35ca2e440df40b253bf967bb93705d355c1c0
Parents: fdaad4e
Author: Sandy Ryza <sa...@cloudera.com>
Authored: Tue Jan 27 15:42:55 2015 -0800
Committer: Patrick Wendell <pa...@databricks.com>
Committed: Tue Jan 27 15:42:55 2015 -0800

----------------------------------------------------------------------
 .../apache/spark/deploy/SparkHadoopUtil.scala   | 16 ++--
 .../org/apache/spark/executor/TaskMetrics.scala |  1 -
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 12 ++-
 .../org/apache/spark/rdd/NewHadoopRDD.scala     | 15 +--
 .../org/apache/spark/rdd/PairRDDFunctions.scala | 11 +--
 .../spark/metrics/InputOutputMetricsSuite.scala | 97 +++++++++++++++++++-
 6 files changed, 120 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b1b35ca2/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 57f9faf..211e3ed 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -133,10 +133,9 @@ class SparkHadoopUtil extends Logging {
    * statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
    * Returns None if the required method can't be found.
    */
-  private[spark] def getFSBytesReadOnThreadCallback(path: Path, conf: Configuration)
-    : Option[() => Long] = {
+  private[spark] def getFSBytesReadOnThreadCallback(): Option[() => Long] = {
     try {
-      val threadStats = getFileSystemThreadStatistics(path, conf)
+      val threadStats = getFileSystemThreadStatistics()
       val getBytesReadMethod = getFileSystemThreadStatisticsMethod("getBytesRead")
       val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum
       val baselineBytesRead = f()
@@ -156,10 +155,9 @@ class SparkHadoopUtil extends Logging {
    * statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
    * Returns None if the required method can't be found.
    */
-  private[spark] def getFSBytesWrittenOnThreadCallback(path: Path, conf: Configuration)
-    : Option[() => Long] = {
+  private[spark] def getFSBytesWrittenOnThreadCallback(): Option[() => Long] = {
     try {
-      val threadStats = getFileSystemThreadStatistics(path, conf)
+      val threadStats = getFileSystemThreadStatistics()
       val getBytesWrittenMethod = getFileSystemThreadStatisticsMethod("getBytesWritten")
       val f = () => threadStats.map(getBytesWrittenMethod.invoke(_).asInstanceOf[Long]).sum
       val baselineBytesWritten = f()
@@ -172,10 +170,8 @@ class SparkHadoopUtil extends Logging {
     }
   }
 
-  private def getFileSystemThreadStatistics(path: Path, conf: Configuration): Seq[AnyRef] = {
-    val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
-    val scheme = qualifiedPath.toUri().getScheme()
-    val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
+  private def getFileSystemThreadStatistics(): Seq[AnyRef] = {
+    val stats = FileSystem.getAllStatistics()
     stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b1b35ca2/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index ddb5903..97912c6 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -19,7 +19,6 @@ package org.apache.spark.executor
 
 import java.util.concurrent.atomic.AtomicLong
 
-import org.apache.spark.executor.DataReadMethod
 import org.apache.spark.executor.DataReadMethod.DataReadMethod
 
 import scala.collection.mutable.ArrayBuffer

http://git-wip-us.apache.org/repos/asf/spark/blob/b1b35ca2/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 056aef0..c3e3931 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapred.Reporter
 import org.apache.hadoop.mapred.JobID
 import org.apache.hadoop.mapred.TaskAttemptID
 import org.apache.hadoop.mapred.TaskID
+import org.apache.hadoop.mapred.lib.CombineFileSplit
 import org.apache.hadoop.util.ReflectionUtils
 
 import org.apache.spark._
@@ -218,13 +219,13 @@ class HadoopRDD[K, V](
 
       // Find a function that will return the FileSystem bytes read by this thread. Do this before
       // creating RecordReader, because RecordReader's constructor might read some bytes
-      val bytesReadCallback = inputMetrics.bytesReadCallback.orElse(
+      val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
         split.inputSplit.value match {
-          case split: FileSplit =>
-            SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, jobConf)
+          case _: FileSplit | _: CombineFileSplit =>
+            SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
           case _ => None
         }
-      )
+      }
       inputMetrics.setBytesReadCallback(bytesReadCallback)
 
       var reader: RecordReader[K, V] = null
@@ -254,7 +255,8 @@ class HadoopRDD[K, V](
           reader.close()
           if (bytesReadCallback.isDefined) {
             inputMetrics.updateBytesRead()
-          } else if (split.inputSplit.value.isInstanceOf[FileSplit]) {
+          } else if (split.inputSplit.value.isInstanceOf[FileSplit] ||
+                     split.inputSplit.value.isInstanceOf[CombineFileSplit]) {
             // If we can't get the bytes read from the FS stats, fall back to the split size,
             // which may be inaccurate.
             try {

http://git-wip-us.apache.org/repos/asf/spark/blob/b1b35ca2/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 7b0e3c8..d86f95a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -25,7 +25,7 @@ import scala.reflect.ClassTag
 import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.input.WholeTextFileInputFormat
@@ -34,7 +34,7 @@ import org.apache.spark.Logging
 import org.apache.spark.Partition
 import org.apache.spark.SerializableWritable
 import org.apache.spark.{SparkContext, TaskContext}
-import org.apache.spark.executor.{DataReadMethod, InputMetrics}
+import org.apache.spark.executor.DataReadMethod
 import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
 import org.apache.spark.util.Utils
@@ -114,13 +114,13 @@ class NewHadoopRDD[K, V](
 
       // Find a function that will return the FileSystem bytes read by this thread. Do this before
       // creating RecordReader, because RecordReader's constructor might read some bytes
-      val bytesReadCallback = inputMetrics.bytesReadCallback.orElse(
+      val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
         split.serializableHadoopSplit.value match {
-          case split: FileSplit =>
-            SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, conf)
+          case _: FileSplit | _: CombineFileSplit =>
+            SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
           case _ => None
         }
-      )
+      }
       inputMetrics.setBytesReadCallback(bytesReadCallback)
 
       val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
@@ -163,7 +163,8 @@ class NewHadoopRDD[K, V](
           reader.close()
           if (bytesReadCallback.isDefined) {
             inputMetrics.updateBytesRead()
-          } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
+          } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
+                     split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
             // If we can't get the bytes read from the FS stats, fall back to the split size,
             // which may be inaccurate.
             try {

http://git-wip-us.apache.org/repos/asf/spark/blob/b1b35ca2/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 0f37d83..49b88a9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -990,7 +990,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       val committer = format.getOutputCommitter(hadoopContext)
       committer.setupTask(hadoopContext)
 
-      val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context, config)
+      val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)
 
       val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
       try {
@@ -1061,7 +1061,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       // around by taking a mod. We expect that no task will be attempted 2 billion times.
       val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt
 
-      val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context, config)
+      val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)
 
       writer.setup(context.stageId, context.partitionId, taskAttemptId)
       writer.open()
@@ -1086,11 +1086,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     writer.commitJob()
   }
 
-  private def initHadoopOutputMetrics(context: TaskContext, config: Configuration)
-    : (OutputMetrics, Option[() => Long]) = {
-    val bytesWrittenCallback = Option(config.get("mapreduce.output.fileoutputformat.outputdir"))
-      .map(new Path(_))
-      .flatMap(SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(_, config))
+  private def initHadoopOutputMetrics(context: TaskContext): (OutputMetrics, Option[() => Long]) = {
+    val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback()
     val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
     if (bytesWrittenCallback.isDefined) {
       context.taskMetrics.outputMetrics = Some(outputMetrics)

http://git-wip-us.apache.org/repos/asf/spark/blob/b1b35ca2/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
index 10a3999..81db66a 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
@@ -26,7 +26,16 @@ import org.scalatest.FunSuite
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.{LongWritable, Text}
-import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}
+import org.apache.hadoop.mapred.{FileSplit => OldFileSplit, InputSplit => OldInputSplit, JobConf,
+  LineRecordReader => OldLineRecordReader, RecordReader => OldRecordReader, Reporter,
+  TextInputFormat => OldTextInputFormat}
+import org.apache.hadoop.mapred.lib.{CombineFileInputFormat => OldCombineFileInputFormat,
+  CombineFileSplit => OldCombineFileSplit, CombineFileRecordReader => OldCombineFileRecordReader}
+import org.apache.hadoop.mapreduce.{InputSplit => NewInputSplit, RecordReader => NewRecordReader,
+  TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat => NewCombineFileInputFormat,
+  CombineFileRecordReader => NewCombineFileRecordReader, CombineFileSplit => NewCombineFileSplit,
+  FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat}
 
 import org.apache.spark.SharedSparkContext
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -202,7 +211,7 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
     val fs = FileSystem.getLocal(new Configuration())
     val outPath = new Path(fs.getWorkingDirectory, "outdir")
 
-    if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(outPath, fs.getConf).isDefined) {
+    if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) {
       val taskBytesWritten = new ArrayBuffer[Long]()
       sc.addSparkListener(new SparkListener() {
         override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
@@ -225,4 +234,88 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
       }
     }
   }
+
+  test("input metrics with old CombineFileInputFormat") {
+    val bytesRead = runAndReturnBytesRead {
+      sc.hadoopFile(tmpFilePath, classOf[OldCombineTextInputFormat], classOf[LongWritable],
+        classOf[Text], 2).count()
+    }
+    assert(bytesRead >= tmpFile.length())
+  }
+
+  test("input metrics with new CombineFileInputFormat") {
+    val bytesRead = runAndReturnBytesRead {
+      sc.newAPIHadoopFile(tmpFilePath, classOf[NewCombineTextInputFormat], classOf[LongWritable],
+        classOf[Text], new Configuration()).count()
+    }
+    assert(bytesRead >= tmpFile.length())
+  }
+}
+
+/**
+ * Hadoop 2 has a version of this, but we can't use it for backwards compatibility
+ */
+class OldCombineTextInputFormat extends OldCombineFileInputFormat[LongWritable, Text] {
+  override def getRecordReader(split: OldInputSplit, conf: JobConf, reporter: Reporter)
+  : OldRecordReader[LongWritable, Text] = {
+    new OldCombineFileRecordReader[LongWritable, Text](conf,
+      split.asInstanceOf[OldCombineFileSplit], reporter, classOf[OldCombineTextRecordReaderWrapper]
+        .asInstanceOf[Class[OldRecordReader[LongWritable, Text]]])
+  }
+}
+
+class OldCombineTextRecordReaderWrapper(
+    split: OldCombineFileSplit,
+    conf: Configuration,
+    reporter: Reporter,
+    idx: Integer) extends OldRecordReader[LongWritable, Text] {
+
+  val fileSplit = new OldFileSplit(split.getPath(idx),
+    split.getOffset(idx),
+    split.getLength(idx),
+    split.getLocations())
+
+  val delegate: OldLineRecordReader = new OldTextInputFormat().getRecordReader(fileSplit,
+    conf.asInstanceOf[JobConf], reporter).asInstanceOf[OldLineRecordReader]
+
+  override def next(key: LongWritable, value: Text): Boolean = delegate.next(key, value)
+  override def createKey(): LongWritable = delegate.createKey()
+  override def createValue(): Text = delegate.createValue()
+  override def getPos(): Long = delegate.getPos
+  override def close(): Unit = delegate.close()
+  override def getProgress(): Float = delegate.getProgress
+}
+
+/**
+ * Hadoop 2 has a version of this, but we can't use it for backwards compatibility
+ */
+class NewCombineTextInputFormat extends NewCombineFileInputFormat[LongWritable,Text] {
+  def createRecordReader(split: NewInputSplit, context: TaskAttemptContext)
+  : NewRecordReader[LongWritable, Text] = {
+    new NewCombineFileRecordReader[LongWritable,Text](split.asInstanceOf[NewCombineFileSplit],
+      context, classOf[NewCombineTextRecordReaderWrapper])
+  }
 }
+
+class NewCombineTextRecordReaderWrapper(
+    split: NewCombineFileSplit,
+    context: TaskAttemptContext,
+    idx: Integer) extends NewRecordReader[LongWritable, Text] {
+
+  val fileSplit = new NewFileSplit(split.getPath(idx),
+    split.getOffset(idx),
+    split.getLength(idx),
+    split.getLocations())
+
+  val delegate = new NewTextInputFormat().createRecordReader(fileSplit, context)
+
+  override def initialize(split: NewInputSplit, context: TaskAttemptContext): Unit = {
+    delegate.initialize(fileSplit, context)
+  }
+
+  override def nextKeyValue(): Boolean = delegate.nextKeyValue()
+  override def getCurrentKey(): LongWritable = delegate.getCurrentKey
+  override def getCurrentValue(): Text = delegate.getCurrentValue
+  override def getProgress(): Float = delegate.getProgress
+  override def close(): Unit = delegate.close()
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org