You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2016/08/19 15:11:32 UTC

spark git commit: [SPARK-11227][CORE] UnknownHostException can be thrown when NameNode HA is enabled.

Repository: spark
Updated Branches:
  refs/heads/master e98eb2146 -> 071eaaf9d


[SPARK-11227][CORE] UnknownHostException can be thrown when NameNode HA is enabled.

## What changes were proposed in this pull request?

If the following conditions are satisfied, executors don't load properties in `hdfs-site.xml` and UnknownHostException can be thrown.

(1) NameNode HA is enabled
(2) spark.eventLogging is disabled or logging path is NOT on HDFS
(3) Using Standalone or Mesos for the cluster manager
(4) There are no code to load `HdfsCondition` class in the driver regardless of directly or indirectly.
(5) The tasks access to HDFS

(There might be some more conditions...)

For example, following code causes UnknownHostException when the conditions above are satisfied.
```
sc.textFile("<path on HDFS>").collect

```

```
java.lang.IllegalArgumentException: java.net.UnknownHostException: hacluster
	at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
	at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
	at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
	at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
	at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
	at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2653)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:92)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2687)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2669)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:371)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:170)
	at org.apache.hadoop.mapred.JobConf.getWorkingDirectory(JobConf.java:656)
	at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:438)
	at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:411)
	at org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$32.apply(SparkContext.scala:986)
	at org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$32.apply(SparkContext.scala:986)
	at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:177)
	at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:177)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:177)
	at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:213)
	at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:209)
	at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:85)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.UnknownHostException: hacluster
```

But following code doesn't cause the Exception because `textFile` method loads `HdfsConfiguration` indirectly.

```
sc.textFile("<path on HDFS>").collect
```

When a job includes some operations which access to HDFS, the object of `org.apache.hadoop.Configuration` is wrapped by `SerializableConfiguration`,  serialized and broadcasted from driver to executors and each executor deserialize the object with `loadDefaults` false so HDFS related properties should be set before broadcasted.

## How was this patch tested?
Tested manually on my standalone cluster.

Author: Kousuke Saruta <sa...@oss.nttdata.co.jp>

Closes #13738 from sarutak/SPARK-11227.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/071eaaf9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/071eaaf9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/071eaaf9

Branch: refs/heads/master
Commit: 071eaaf9d2b63589f2e66e5279a16a5a484de6f5
Parents: e98eb21
Author: Kousuke Saruta <sa...@oss.nttdata.co.jp>
Authored: Fri Aug 19 10:11:25 2016 -0500
Committer: Tom Graves <tg...@yahoo-inc.com>
Committed: Fri Aug 19 10:11:25 2016 -0500

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 22 +++++++++++++++++++-
 1 file changed, 21 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/071eaaf9/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 60f042f..2eaeab1 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -35,7 +35,7 @@ import scala.util.control.NonFatal
 import com.google.common.collect.MapMaker
 import org.apache.commons.lang3.SerializationUtils
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
   FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
 import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat,
@@ -961,6 +961,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
       valueClass: Class[V],
       minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
     assertNotStopped()
+
+    // This is a hack to enforce loading hdfs-site.xml.
+    // See SPARK-11227 for details.
+    FileSystem.getLocal(conf)
+
     // Add necessary security credentials to the JobConf before broadcasting it.
     SparkHadoopUtil.get.addCredentials(conf)
     new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minPartitions)
@@ -981,6 +986,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
       valueClass: Class[V],
       minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
     assertNotStopped()
+
+    // This is a hack to enforce loading hdfs-site.xml.
+    // See SPARK-11227 for details.
+    FileSystem.get(new URI(path), hadoopConfiguration)
+
     // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
     val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
     val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
@@ -1065,6 +1075,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
       vClass: Class[V],
       conf: Configuration = hadoopConfiguration): RDD[(K, V)] = withScope {
     assertNotStopped()
+
+    // This is a hack to enforce loading hdfs-site.xml.
+    // See SPARK-11227 for details.
+    FileSystem.get(new URI(path), hadoopConfiguration)
+
     // The call to NewHadoopJob automatically adds security credentials to conf,
     // so we don't need to explicitly add them ourselves
     val job = NewHadoopJob.getInstance(conf)
@@ -1099,6 +1114,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
       kClass: Class[K],
       vClass: Class[V]): RDD[(K, V)] = withScope {
     assertNotStopped()
+
+    // This is a hack to enforce loading hdfs-site.xml.
+    // See SPARK-11227 for details.
+    FileSystem.getLocal(conf)
+
     // Add necessary security credentials to the JobConf. Required to access secure HDFS.
     val jconf = new JobConf(conf)
     SparkHadoopUtil.get.addCredentials(jconf)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org