You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/08/25 08:45:50 UTC

spark git commit: [SPARK-17193][CORE] HadoopRDD NPE at DEBUG log level when getLocationInfo == null

Repository: spark
Updated Branches:
  refs/heads/master 5f02d2e5b -> 2bcd5d5ce


[SPARK-17193][CORE] HadoopRDD NPE at DEBUG log level when getLocationInfo == null

## What changes were proposed in this pull request?

Handle null from Hadoop getLocationInfo directly instead of catching (and logging) exception

## How was this patch tested?

Jenkins tests

Author: Sean Owen <so...@cloudera.com>

Closes #14760 from srowen/SPARK-17193.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2bcd5d5c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2bcd5d5c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2bcd5d5c

Branch: refs/heads/master
Commit: 2bcd5d5ce3eaf0eb1600a12a2b55ddb40927533b
Parents: 5f02d2e
Author: Sean Owen <so...@cloudera.com>
Authored: Thu Aug 25 09:45:49 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Aug 25 09:45:49 2016 +0100

----------------------------------------------------------------------
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 26 +++++++++-----------
 .../org/apache/spark/rdd/NewHadoopRDD.scala     |  2 +-
 2 files changed, 13 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2bcd5d5c/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index fd3a14b..4640b5d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -22,7 +22,6 @@ import java.text.SimpleDateFormat
 import java.util.Date
 
 import scala.collection.immutable.Map
-import scala.collection.mutable.ListBuffer
 import scala.reflect.ClassTag
 
 import org.apache.hadoop.conf.{Configurable, Configuration}
@@ -317,7 +316,7 @@ class HadoopRDD[K, V](
         try {
           val lsplit = c.inputSplitWithLocationInfo.cast(hsplit)
           val infos = c.getLocationInfo.invoke(lsplit).asInstanceOf[Array[AnyRef]]
-          Some(HadoopRDD.convertSplitLocationInfo(infos))
+          HadoopRDD.convertSplitLocationInfo(infos)
         } catch {
           case e: Exception =>
             logDebug("Failed to use InputSplitWithLocations.", e)
@@ -419,21 +418,20 @@ private[spark] object HadoopRDD extends Logging {
       None
   }
 
-  private[spark] def convertSplitLocationInfo(infos: Array[AnyRef]): Seq[String] = {
-    val out = ListBuffer[String]()
-    infos.foreach { loc =>
-      val locationStr = HadoopRDD.SPLIT_INFO_REFLECTIONS.get.
-        getLocation.invoke(loc).asInstanceOf[String]
+  private[spark] def convertSplitLocationInfo(infos: Array[AnyRef]): Option[Seq[String]] = {
+    Option(infos).map(_.flatMap { loc =>
+      val reflections = HadoopRDD.SPLIT_INFO_REFLECTIONS.get
+      val locationStr = reflections.getLocation.invoke(loc).asInstanceOf[String]
       if (locationStr != "localhost") {
-        if (HadoopRDD.SPLIT_INFO_REFLECTIONS.get.isInMemory.
-                invoke(loc).asInstanceOf[Boolean]) {
-          logDebug("Partition " + locationStr + " is cached by Hadoop.")
-          out += new HDFSCacheTaskLocation(locationStr).toString
+        if (reflections.isInMemory.invoke(loc).asInstanceOf[Boolean]) {
+          logDebug(s"Partition $locationStr is cached by Hadoop.")
+          Some(HDFSCacheTaskLocation(locationStr).toString)
         } else {
-          out += new HostTaskLocation(locationStr).toString
+          Some(HostTaskLocation(locationStr).toString)
         }
+      } else {
+        None
       }
-    }
-    out.seq
+    })
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2bcd5d5c/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index be919e6..1c7aec9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -255,7 +255,7 @@ class NewHadoopRDD[K, V](
       case Some(c) =>
         try {
           val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]]
-          Some(HadoopRDD.convertSplitLocationInfo(infos))
+          HadoopRDD.convertSplitLocationInfo(infos)
         } catch {
           case e : Exception =>
             logDebug("Failed to use InputSplit#getLocationInfo.", e)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org