You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/10/03 23:53:38 UTC
spark git commit: [SPARK-20466][CORE] HadoopRDD#addLocalConfiguration
throws NPE
Repository: spark
Updated Branches:
refs/heads/master e65b6b7ca -> e36ec38d8
[SPARK-20466][CORE] HadoopRDD#addLocalConfiguration throws NPE
## What changes were proposed in this pull request?
Fix for SPARK-20466, full description of the issue in the JIRA. To summarize, `HadoopRDD` uses a metadata cache to cache `JobConf` objects. The cache uses soft-references, which means the JVM can delete entries from the cache whenever there is GC pressure. `HadoopRDD#getJobConf` had a bug where it would check if the cache contained the `JobConf`, if it did it would get the `JobConf` from the cache and return it. This doesn't work when soft-references are used as the JVM can delete the entry between the existence check and the get call.
## How was this patch tested?
Haven't thought of a good way to test this yet given the issue only occurs sometimes, and happens during high GC pressure. Was thinking of using mocks to verify `#getJobConf` is doing the right thing. I deleted the method `HadoopRDD#containsCachedMetadata` so that we don't hit this issue again.
Author: Sahil Takiar <st...@cloudera.com>
Closes #19413 from sahilTakiar/master.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e36ec38d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e36ec38d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e36ec38d
Branch: refs/heads/master
Commit: e36ec38d89472df0dfe12222b6af54cd6eea8e98
Parents: e65b6b7
Author: Sahil Takiar <st...@cloudera.com>
Authored: Tue Oct 3 16:53:32 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Tue Oct 3 16:53:32 2017 -0700
----------------------------------------------------------------------
.../scala/org/apache/spark/rdd/HadoopRDD.scala | 33 +++++++++++---------
1 file changed, 18 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e36ec38d/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 76ea8b8..23b3442 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -157,20 +157,25 @@ class HadoopRDD[K, V](
if (conf.isInstanceOf[JobConf]) {
logDebug("Re-using user-broadcasted JobConf")
conf.asInstanceOf[JobConf]
- } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
- logDebug("Re-using cached JobConf")
- HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
} else {
- // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
- // local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
- // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
- // Synchronize to prevent ConcurrentModificationException (SPARK-1097, HADOOP-10456).
- HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
- logDebug("Creating new JobConf and caching it for later re-use")
- val newJobConf = new JobConf(conf)
- initLocalJobConfFuncOpt.foreach(f => f(newJobConf))
- HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
- newJobConf
+ Option(HadoopRDD.getCachedMetadata(jobConfCacheKey))
+ .map { conf =>
+ logDebug("Re-using cached JobConf")
+ conf.asInstanceOf[JobConf]
+ }
+ .getOrElse {
+ // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in
+ // the local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
+ // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary
+ // objects. Synchronize to prevent ConcurrentModificationException (SPARK-1097,
+ // HADOOP-10456).
+ HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
+ logDebug("Creating new JobConf and caching it for later re-use")
+ val newJobConf = new JobConf(conf)
+ initLocalJobConfFuncOpt.foreach(f => f(newJobConf))
+ HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+ newJobConf
+ }
}
}
}
@@ -360,8 +365,6 @@ private[spark] object HadoopRDD extends Logging {
*/
def getCachedMetadata(key: String): Any = SparkEnv.get.hadoopJobMetadata.get(key)
- def containsCachedMetadata(key: String): Boolean = SparkEnv.get.hadoopJobMetadata.containsKey(key)
-
private def putCachedMetadata(key: String, value: Any): Unit =
SparkEnv.get.hadoopJobMetadata.put(key, value)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org