You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/10/27 18:04:29 UTC

git commit: SPARK-2621. Update task InputMetrics incrementally

Repository: spark
Updated Branches:
  refs/heads/master c9e05ca27 -> dea302ddb


SPARK-2621. Update task InputMetrics incrementally

The patch takes advantage an API provided in Hadoop 2.5 that allows getting accurate data on Hadoop FileSystem bytes read.  It eliminates the old method, which naively accepts the split size as the input bytes.  An impact of this change will be that input metrics go away when using against Hadoop versions earlier thatn 2.5.  I can add this back in, but my opinion is that no metrics are better than inaccurate metrics.

This is difficult to write a test for because we don't usually build against a version of Hadoop that contains the function we need.  I've tested it manually on a pseudo-distributed cluster.

Author: Sandy Ryza <sa...@cloudera.com>

Closes #2087 from sryza/sandy-spark-2621 and squashes the following commits:

23010b8 [Sandy Ryza] Missing style fixes
74fc9bb [Sandy Ryza] Make getFSBytesReadOnThreadCallback private
1ab662d [Sandy Ryza] Clear things up a bit
984631f [Sandy Ryza] Switch from pull to push model and add test
7ef7b22 [Sandy Ryza] Add missing curly braces
219abc9 [Sandy Ryza] Fall back to split size
90dbc14 [Sandy Ryza] SPARK-2621. Update task InputMetrics incrementally


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dea302dd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dea302dd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dea302dd

Branch: refs/heads/master
Commit: dea302ddbd26b1f20fb8a3979bd1d8e1717479f8
Parents: c9e05ca
Author: Sandy Ryza <sa...@cloudera.com>
Authored: Mon Oct 27 10:04:24 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Oct 27 10:04:24 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/deploy/SparkHadoopUtil.scala   | 30 +++++++++++
 .../org/apache/spark/executor/TaskMetrics.scala |  1 -
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 48 ++++++++++++++----
 .../org/apache/spark/rdd/NewHadoopRDD.scala     | 48 ++++++++++++++----
 .../scala/org/apache/spark/util/Utils.scala     | 11 ++++
 .../spark/metrics/InputMetricsSuite.scala       | 53 ++++++++++++++++++++
 6 files changed, 170 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dea302dd/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index fe0ad9e..e28eaad 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -20,12 +20,15 @@ package org.apache.spark.deploy
 import java.security.PrivilegedExceptionAction
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.FileSystem.Statistics
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.security.Credentials
 import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.spark.{Logging, SparkContext, SparkConf, SparkException}
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.Utils
 
 import scala.collection.JavaConversions._
 
@@ -121,6 +124,33 @@ class SparkHadoopUtil extends Logging {
     UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
   }
 
+  /**
+   * Returns a function that can be called to find Hadoop FileSystem bytes read. If
+   * getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will
+   * return the bytes read on r since t.  Reflection is required because thread-level FileSystem
+   * statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
+   * Returns None if the required method can't be found.
+   */
+  private[spark] def getFSBytesReadOnThreadCallback(path: Path, conf: Configuration)
+    : Option[() => Long] = {
+    val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
+    val scheme = qualifiedPath.toUri().getScheme()
+    val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
+    try {
+      val threadStats = stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
+      val statisticsDataClass =
+        Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
+      val getBytesReadMethod = statisticsDataClass.getDeclaredMethod("getBytesRead")
+      val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum
+      val baselineBytesRead = f()
+      Some(() => f() - baselineBytesRead)
+    } catch {
+      case e: NoSuchMethodException => {
+        logDebug("Couldn't find method for retrieving thread-level FileSystem input data", e)
+        None
+      }
+    }
+  }
 }
 
 object SparkHadoopUtil {

http://git-wip-us.apache.org/repos/asf/spark/blob/dea302dd/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 3e49b62..57bc2b4 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -169,7 +169,6 @@ case class InputMetrics(readMethod: DataReadMethod.Value) {
   var bytesRead: Long = 0L
 }
 
-
 /**
  * :: DeveloperApi ::
  * Metrics pertaining to shuffle data read in a given task.

http://git-wip-us.apache.org/repos/asf/spark/blob/dea302dd/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 7751417..946fb56 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -46,7 +46,6 @@ import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
 import org.apache.spark.util.{NextIterator, Utils}
 import org.apache.spark.scheduler.{HostTaskLocation, HDFSCacheTaskLocation}
 
-
 /**
  * A Spark split class that wraps around a Hadoop InputSplit.
  */
@@ -224,18 +223,18 @@ class HadoopRDD[K, V](
       val key: K = reader.createKey()
       val value: V = reader.createValue()
 
-      // Set the task input metrics.
       val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
-      try {
-        /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't
-         * always at record boundaries, so tasks may need to read into other splits to complete
-         * a record. */
-        inputMetrics.bytesRead = split.inputSplit.value.getLength()
-      } catch {
-        case e: java.io.IOException =>
-          logWarning("Unable to get input size to set InputMetrics for task", e)
+      // Find a function that will return the FileSystem bytes read by this thread.
+      val bytesReadCallback = if (split.inputSplit.value.isInstanceOf[FileSplit]) {
+        SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
+          split.inputSplit.value.asInstanceOf[FileSplit].getPath, jobConf)
+      } else {
+        None
+      }
+      if (bytesReadCallback.isDefined) {
+        context.taskMetrics.inputMetrics = Some(inputMetrics)
       }
-      context.taskMetrics.inputMetrics = Some(inputMetrics)
+      var recordsSinceMetricsUpdate = 0
 
       override def getNext() = {
         try {
@@ -244,12 +243,36 @@ class HadoopRDD[K, V](
           case eof: EOFException =>
             finished = true
         }
+
+        // Update bytes read metric every few records
+        if (recordsSinceMetricsUpdate == HadoopRDD.RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES
+            && bytesReadCallback.isDefined) {
+          recordsSinceMetricsUpdate = 0
+          val bytesReadFn = bytesReadCallback.get
+          inputMetrics.bytesRead = bytesReadFn()
+        } else {
+          recordsSinceMetricsUpdate += 1
+        }
         (key, value)
       }
 
       override def close() {
         try {
           reader.close()
+          if (bytesReadCallback.isDefined) {
+            val bytesReadFn = bytesReadCallback.get
+            inputMetrics.bytesRead = bytesReadFn()
+          } else if (split.inputSplit.value.isInstanceOf[FileSplit]) {
+            // If we can't get the bytes read from the FS stats, fall back to the split size,
+            // which may be inaccurate.
+            try {
+              inputMetrics.bytesRead = split.inputSplit.value.getLength
+              context.taskMetrics.inputMetrics = Some(inputMetrics)
+            } catch {
+              case e: java.io.IOException =>
+                logWarning("Unable to get input size to set InputMetrics for task", e)
+            }
+          }
         } catch {
           case e: Exception => {
             if (!Utils.inShutdown()) {
@@ -302,6 +325,9 @@ private[spark] object HadoopRDD extends Logging {
    */
   val CONFIGURATION_INSTANTIATION_LOCK = new Object()
 
+  /** Update the input bytes read metric each time this number of records has been read */
+  val RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES = 256
+
   /**
    * The three methods below are helpers for accessing the local map, a property of the SparkEnv of
    * the local process.

http://git-wip-us.apache.org/repos/asf/spark/blob/dea302dd/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 0cccdef..3245632 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -25,6 +25,7 @@ import scala.reflect.ClassTag
 import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.input.WholeTextFileInputFormat
@@ -36,6 +37,7 @@ import org.apache.spark.{SparkContext, TaskContext}
 import org.apache.spark.executor.{DataReadMethod, InputMetrics}
 import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
 import org.apache.spark.util.Utils
+import org.apache.spark.deploy.SparkHadoopUtil
 
 private[spark] class NewHadoopPartition(
     rddId: Int,
@@ -118,21 +120,22 @@ class NewHadoopRDD[K, V](
       reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
 
       val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
-      try {
-        /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't
-         * always at record boundaries, so tasks may need to read into other splits to complete
-         * a record. */
-        inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength()
-      } catch {
-        case e: Exception =>
-          logWarning("Unable to get input split size in order to set task input bytes", e)
+      // Find a function that will return the FileSystem bytes read by this thread.
+      val bytesReadCallback = if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
+        SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
+          split.serializableHadoopSplit.value.asInstanceOf[FileSplit].getPath, conf)
+      } else {
+        None
+      }
+      if (bytesReadCallback.isDefined) {
+        context.taskMetrics.inputMetrics = Some(inputMetrics)
       }
-      context.taskMetrics.inputMetrics = Some(inputMetrics)
 
       // Register an on-task-completion callback to close the input stream.
       context.addTaskCompletionListener(context => close())
       var havePair = false
       var finished = false
+      var recordsSinceMetricsUpdate = 0
 
       override def hasNext: Boolean = {
         if (!finished && !havePair) {
@@ -147,12 +150,39 @@ class NewHadoopRDD[K, V](
           throw new java.util.NoSuchElementException("End of stream")
         }
         havePair = false
+
+        // Update bytes read metric every few records
+        if (recordsSinceMetricsUpdate == HadoopRDD.RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES
+            && bytesReadCallback.isDefined) {
+          recordsSinceMetricsUpdate = 0
+          val bytesReadFn = bytesReadCallback.get
+          inputMetrics.bytesRead = bytesReadFn()
+        } else {
+          recordsSinceMetricsUpdate += 1
+        }
+
         (reader.getCurrentKey, reader.getCurrentValue)
       }
 
       private def close() {
         try {
           reader.close()
+
+          // Update metrics with final amount
+          if (bytesReadCallback.isDefined) {
+            val bytesReadFn = bytesReadCallback.get
+            inputMetrics.bytesRead = bytesReadFn()
+          } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
+            // If we can't get the bytes read from the FS stats, fall back to the split size,
+            // which may be inaccurate.
+            try {
+              inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength
+              context.taskMetrics.inputMetrics = Some(inputMetrics)
+            } catch {
+              case e: java.io.IOException =>
+                logWarning("Unable to get input size to set InputMetrics for task", e)
+            }
+          }
         } catch {
           case e: Exception => {
             if (!Utils.inShutdown()) {

http://git-wip-us.apache.org/repos/asf/spark/blob/dea302dd/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 84ed5db..93ac9f1 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1673,6 +1673,17 @@ private[spark] object Utils extends Logging {
     PropertyConfigurator.configure(pro)
   }
 
+  def invoke(
+      clazz: Class[_],
+      obj: AnyRef,
+      methodName: String,
+      args: (Class[_], AnyRef)*): AnyRef = {
+    val (types, values) = args.unzip
+    val method = clazz.getDeclaredMethod(methodName, types: _*)
+    method.setAccessible(true)
+    method.invoke(obj, values.toSeq: _*)
+  }
+
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/dea302dd/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala
new file mode 100644
index 0000000..33bd1af
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SharedSparkContext
+import org.apache.spark.scheduler.{SparkListenerTaskEnd, SparkListener}
+
+import scala.collection.mutable.ArrayBuffer
+
+import java.io.{FileWriter, PrintWriter, File}
+
+class InputMetricsSuite extends FunSuite with SharedSparkContext {
+  test("input metrics when reading text file") {
+    val file = new File(getClass.getSimpleName + ".txt")
+    val pw = new PrintWriter(new FileWriter(file))
+    pw.println("some stuff")
+    pw.println("some other stuff")
+    pw.println("yet more stuff")
+    pw.println("too much stuff")
+    pw.close()
+    file.deleteOnExit()
+
+    val taskBytesRead = new ArrayBuffer[Long]()
+    sc.addSparkListener(new SparkListener() {
+      override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+        taskBytesRead += taskEnd.taskMetrics.inputMetrics.get.bytesRead
+      }
+    })
+    sc.textFile("file://" + file.getAbsolutePath, 2).count()
+
+    // Wait for task end events to come in
+    sc.listenerBus.waitUntilEmpty(500)
+    assert(taskBytesRead.length == 2)
+    assert(taskBytesRead.sum == file.length())
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org