You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2016/02/04 02:43:16 UTC

spark git commit: [SPARK-13152][CORE] Fix task metrics deprecation warning

Repository: spark
Updated Branches:
  refs/heads/master de0914522 -> a8e2ba776


[SPARK-13152][CORE] Fix task metrics deprecation warning

Make an internal non-deprecated version of incBytesRead and incRecordsRead so we don't have unecessary deprecation warnings in our build.

Right now incBytesRead and incRecordsRead are marked as deprecated and for internal use only. We should make private[spark] versions which are not deprecated and switch to those internally so as to not clutter up the warning messages when building.

cc andrewor14 who did the initial deprecation

Author: Holden Karau <ho...@us.ibm.com>

Closes #11056 from holdenk/SPARK-13152-fix-task-metrics-deprecation-warnings.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a8e2ba77
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a8e2ba77
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a8e2ba77

Branch: refs/heads/master
Commit: a8e2ba776b20c8054918af646d8228bba1b87c9b
Parents: de09145
Author: Holden Karau <ho...@us.ibm.com>
Authored: Wed Feb 3 17:43:14 2016 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Wed Feb 3 17:43:14 2016 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/CacheManager.scala         | 4 ++--
 .../src/main/scala/org/apache/spark/executor/InputMetrics.scala | 5 +++++
 core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala        | 4 ++--
 core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala     | 4 ++--
 core/src/main/scala/org/apache/spark/util/JsonProtocol.scala    | 4 ++--
 .../spark/sql/execution/datasources/SqlNewHadoopRDD.scala       | 4 ++--
 6 files changed, 15 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a8e2ba77/core/src/main/scala/org/apache/spark/CacheManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index fa8e2b9..923ff41 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -44,12 +44,12 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
       case Some(blockResult) =>
         // Partition is already materialized, so just return its values
         val existingMetrics = context.taskMetrics().registerInputMetrics(blockResult.readMethod)
-        existingMetrics.incBytesRead(blockResult.bytes)
+        existingMetrics.incBytesReadInternal(blockResult.bytes)
 
         val iter = blockResult.data.asInstanceOf[Iterator[T]]
         new InterruptibleIterator[T](context, iter) {
           override def next(): T = {
-            existingMetrics.incRecordsRead(1)
+            existingMetrics.incRecordsReadInternal(1)
             delegate.next()
           }
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/a8e2ba77/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
index ed9e157..6d30d3c 100644
--- a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
@@ -81,10 +81,15 @@ class InputMetrics private (
    */
   def readMethod: DataReadMethod.Value = DataReadMethod.withName(_readMethod.localValue)
 
+  // Once incBytesRead & intRecordsRead is ready to be removed from the public API
+  // we can remove the internal versions and make the previous public API private.
+  // This has been done to suppress warnings when building.
   @deprecated("incrementing input metrics is for internal use only", "2.0.0")
   def incBytesRead(v: Long): Unit = _bytesRead.add(v)
+  private[spark] def incBytesReadInternal(v: Long): Unit = _bytesRead.add(v)
   @deprecated("incrementing input metrics is for internal use only", "2.0.0")
   def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
+  private[spark] def incRecordsReadInternal(v: Long): Unit = _recordsRead.add(v)
   private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v)
   private[spark] def setReadMethod(v: DataReadMethod.Value): Unit =
     _readMethod.setValue(v.toString)

http://git-wip-us.apache.org/repos/asf/spark/blob/a8e2ba77/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index e2ebd7f..805cd9f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -260,7 +260,7 @@ class HadoopRDD[K, V](
             finished = true
         }
         if (!finished) {
-          inputMetrics.incRecordsRead(1)
+          inputMetrics.incRecordsReadInternal(1)
         }
         if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
           updateBytesRead()
@@ -292,7 +292,7 @@ class HadoopRDD[K, V](
             // If we can't get the bytes read from the FS stats, fall back to the split size,
             // which may be inaccurate.
             try {
-              inputMetrics.incBytesRead(split.inputSplit.value.getLength)
+              inputMetrics.incBytesReadInternal(split.inputSplit.value.getLength)
             } catch {
               case e: java.io.IOException =>
                 logWarning("Unable to get input size to set InputMetrics for task", e)

http://git-wip-us.apache.org/repos/asf/spark/blob/a8e2ba77/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index e71d340..f23da39 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -188,7 +188,7 @@ class NewHadoopRDD[K, V](
         }
         havePair = false
         if (!finished) {
-          inputMetrics.incRecordsRead(1)
+          inputMetrics.incRecordsReadInternal(1)
         }
         if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
           updateBytesRead()
@@ -219,7 +219,7 @@ class NewHadoopRDD[K, V](
             // If we can't get the bytes read from the FS stats, fall back to the split size,
             // which may be inaccurate.
             try {
-              inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
+              inputMetrics.incBytesReadInternal(split.serializableHadoopSplit.value.getLength)
             } catch {
               case e: java.io.IOException =>
                 logWarning("Unable to get input size to set InputMetrics for task", e)

http://git-wip-us.apache.org/repos/asf/spark/blob/a8e2ba77/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index a2487ee..38e6478 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -811,8 +811,8 @@ private[spark] object JsonProtocol {
     Utils.jsonOption(json \ "Input Metrics").foreach { inJson =>
       val readMethod = DataReadMethod.withName((inJson \ "Data Read Method").extract[String])
       val inputMetrics = metrics.registerInputMetrics(readMethod)
-      inputMetrics.incBytesRead((inJson \ "Bytes Read").extract[Long])
-      inputMetrics.incRecordsRead((inJson \ "Records Read").extractOpt[Long].getOrElse(0L))
+      inputMetrics.incBytesReadInternal((inJson \ "Bytes Read").extract[Long])
+      inputMetrics.incRecordsReadInternal((inJson \ "Records Read").extractOpt[Long].getOrElse(0L))
     }
 
     // Updated blocks

http://git-wip-us.apache.org/repos/asf/spark/blob/a8e2ba77/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
index 9703b16..3605150 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
@@ -214,7 +214,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
         }
         havePair = false
         if (!finished) {
-          inputMetrics.incRecordsRead(1)
+          inputMetrics.incRecordsReadInternal(1)
         }
         if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
           updateBytesRead()
@@ -246,7 +246,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
             // If we can't get the bytes read from the FS stats, fall back to the split size,
             // which may be inaccurate.
             try {
-              inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
+              inputMetrics.incBytesReadInternal(split.serializableHadoopSplit.value.getLength)
             } catch {
               case e: java.io.IOException =>
                 logWarning("Unable to get input size to set InputMetrics for task", e)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org