You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/08/25 08:45:50 UTC
spark git commit: [SPARK-17193][CORE] HadoopRDD NPE at DEBUG log
level when getLocationInfo == null
Repository: spark
Updated Branches:
refs/heads/master 5f02d2e5b -> 2bcd5d5ce
[SPARK-17193][CORE] HadoopRDD NPE at DEBUG log level when getLocationInfo == null
## What changes were proposed in this pull request?
Handle null from Hadoop getLocationInfo directly instead of catching (and logging) exception
## How was this patch tested?
Jenkins tests
Author: Sean Owen <so...@cloudera.com>
Closes #14760 from srowen/SPARK-17193.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2bcd5d5c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2bcd5d5c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2bcd5d5c
Branch: refs/heads/master
Commit: 2bcd5d5ce3eaf0eb1600a12a2b55ddb40927533b
Parents: 5f02d2e
Author: Sean Owen <so...@cloudera.com>
Authored: Thu Aug 25 09:45:49 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Aug 25 09:45:49 2016 +0100
----------------------------------------------------------------------
.../scala/org/apache/spark/rdd/HadoopRDD.scala | 26 +++++++++-----------
.../org/apache/spark/rdd/NewHadoopRDD.scala | 2 +-
2 files changed, 13 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/2bcd5d5c/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index fd3a14b..4640b5d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -22,7 +22,6 @@ import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.immutable.Map
-import scala.collection.mutable.ListBuffer
import scala.reflect.ClassTag
import org.apache.hadoop.conf.{Configurable, Configuration}
@@ -317,7 +316,7 @@ class HadoopRDD[K, V](
try {
val lsplit = c.inputSplitWithLocationInfo.cast(hsplit)
val infos = c.getLocationInfo.invoke(lsplit).asInstanceOf[Array[AnyRef]]
- Some(HadoopRDD.convertSplitLocationInfo(infos))
+ HadoopRDD.convertSplitLocationInfo(infos)
} catch {
case e: Exception =>
logDebug("Failed to use InputSplitWithLocations.", e)
@@ -419,21 +418,20 @@ private[spark] object HadoopRDD extends Logging {
None
}
- private[spark] def convertSplitLocationInfo(infos: Array[AnyRef]): Seq[String] = {
- val out = ListBuffer[String]()
- infos.foreach { loc =>
- val locationStr = HadoopRDD.SPLIT_INFO_REFLECTIONS.get.
- getLocation.invoke(loc).asInstanceOf[String]
+ private[spark] def convertSplitLocationInfo(infos: Array[AnyRef]): Option[Seq[String]] = {
+ Option(infos).map(_.flatMap { loc =>
+ val reflections = HadoopRDD.SPLIT_INFO_REFLECTIONS.get
+ val locationStr = reflections.getLocation.invoke(loc).asInstanceOf[String]
if (locationStr != "localhost") {
- if (HadoopRDD.SPLIT_INFO_REFLECTIONS.get.isInMemory.
- invoke(loc).asInstanceOf[Boolean]) {
- logDebug("Partition " + locationStr + " is cached by Hadoop.")
- out += new HDFSCacheTaskLocation(locationStr).toString
+ if (reflections.isInMemory.invoke(loc).asInstanceOf[Boolean]) {
+ logDebug(s"Partition $locationStr is cached by Hadoop.")
+ Some(HDFSCacheTaskLocation(locationStr).toString)
} else {
- out += new HostTaskLocation(locationStr).toString
+ Some(HostTaskLocation(locationStr).toString)
}
+ } else {
+ None
}
- }
- out.seq
+ })
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/2bcd5d5c/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index be919e6..1c7aec9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -255,7 +255,7 @@ class NewHadoopRDD[K, V](
case Some(c) =>
try {
val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]]
- Some(HadoopRDD.convertSplitLocationInfo(infos))
+ HadoopRDD.convertSplitLocationInfo(infos)
} catch {
case e : Exception =>
logDebug("Failed to use InputSplit#getLocationInfo.", e)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org