You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/10/03 23:53:38 UTC

spark git commit: [SPARK-20466][CORE] HadoopRDD#addLocalConfiguration throws NPE

Repository: spark
Updated Branches:
  refs/heads/master e65b6b7ca -> e36ec38d8


[SPARK-20466][CORE] HadoopRDD#addLocalConfiguration throws NPE

## What changes were proposed in this pull request?

Fix for SPARK-20466, full description of the issue in the JIRA. To summarize, `HadoopRDD` uses a metadata cache to cache `JobConf` objects. The cache uses soft-references, which means the JVM can delete entries from the cache whenever there is GC pressure. `HadoopRDD#getJobConf` had a bug where it would check if the cache contained the `JobConf`, if it did it would get the `JobConf` from the cache and return it. This doesn't work when soft-references are used as the JVM can delete the entry between the existence check and the get call.

## How was this patch tested?

Haven't thought of a good way to test this yet given the issue only occurs sometimes, and happens during high GC pressure. Was thinking of using mocks to verify `#getJobConf` is doing the right thing. I deleted the method `HadoopRDD#containsCachedMetadata` so that we don't hit this issue again.

Author: Sahil Takiar <st...@cloudera.com>

Closes #19413 from sahilTakiar/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e36ec38d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e36ec38d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e36ec38d

Branch: refs/heads/master
Commit: e36ec38d89472df0dfe12222b6af54cd6eea8e98
Parents: e65b6b7
Author: Sahil Takiar <st...@cloudera.com>
Authored: Tue Oct 3 16:53:32 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Tue Oct 3 16:53:32 2017 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 33 +++++++++++---------
 1 file changed, 18 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e36ec38d/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 76ea8b8..23b3442 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -157,20 +157,25 @@ class HadoopRDD[K, V](
       if (conf.isInstanceOf[JobConf]) {
         logDebug("Re-using user-broadcasted JobConf")
         conf.asInstanceOf[JobConf]
-      } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
-        logDebug("Re-using cached JobConf")
-        HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
       } else {
-        // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
-        // local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
-        // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
-        // Synchronize to prevent ConcurrentModificationException (SPARK-1097, HADOOP-10456).
-        HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
-          logDebug("Creating new JobConf and caching it for later re-use")
-          val newJobConf = new JobConf(conf)
-          initLocalJobConfFuncOpt.foreach(f => f(newJobConf))
-          HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
-          newJobConf
+        Option(HadoopRDD.getCachedMetadata(jobConfCacheKey))
+          .map { conf =>
+            logDebug("Re-using cached JobConf")
+            conf.asInstanceOf[JobConf]
+          }
+          .getOrElse {
+            // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in
+            // the local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
+            // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary
+            // objects. Synchronize to prevent ConcurrentModificationException (SPARK-1097,
+            // HADOOP-10456).
+            HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
+              logDebug("Creating new JobConf and caching it for later re-use")
+              val newJobConf = new JobConf(conf)
+              initLocalJobConfFuncOpt.foreach(f => f(newJobConf))
+              HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+              newJobConf
+          }
         }
       }
     }
@@ -360,8 +365,6 @@ private[spark] object HadoopRDD extends Logging {
    */
   def getCachedMetadata(key: String): Any = SparkEnv.get.hadoopJobMetadata.get(key)
 
-  def containsCachedMetadata(key: String): Boolean = SparkEnv.get.hadoopJobMetadata.containsKey(key)
-
   private def putCachedMetadata(key: String, value: Any): Unit =
     SparkEnv.get.hadoopJobMetadata.put(key, value)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org