You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/02/08 12:20:14 UTC

[1/2] spark git commit: [SPARK-19464][CORE][YARN][TEST-HADOOP2.6] Remove support for Hadoop 2.5 and earlier

Repository: spark
Updated Branches:
  refs/heads/master d60dde26f -> e8d3fca45


http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 7deaf0a..dd2180a 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -23,8 +23,6 @@ import java.util.Properties
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{HashMap => MutableHashMap}
-import scala.reflect.ClassTag
-import scala.util.Try
 
 import org.apache.commons.lang3.SerializationUtils
 import org.apache.hadoop.conf.Configuration
@@ -67,19 +65,18 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
   }
 
   test("default Yarn application classpath") {
-    getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP))
+    getDefaultYarnApplicationClasspath should be(Fixtures.knownDefYarnAppCP)
   }
 
   test("default MR application classpath") {
-    getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP))
+    getDefaultMRApplicationClasspath should be(Fixtures.knownDefMRAppCP)
   }
 
   test("resultant classpath for an application that defines a classpath for YARN") {
     withAppConf(Fixtures.mapYARNAppConf) { conf =>
       val env = newEnv
       populateHadoopClasspath(conf, env)
-      classpath(env) should be(
-        flatten(Fixtures.knownYARNAppCP, getDefaultMRApplicationClasspath))
+      classpath(env) should be(Fixtures.knownYARNAppCP +: getDefaultMRApplicationClasspath)
     }
   }
 
@@ -87,8 +84,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
     withAppConf(Fixtures.mapMRAppConf) { conf =>
       val env = newEnv
       populateHadoopClasspath(conf, env)
-      classpath(env) should be(
-        flatten(getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP))
+      classpath(env) should be(getDefaultYarnApplicationClasspath :+ Fixtures.knownMRAppCP)
     }
   }
 
@@ -96,7 +92,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
     withAppConf(Fixtures.mapAppConf) { conf =>
       val env = newEnv
       populateHadoopClasspath(conf, env)
-      classpath(env) should be(flatten(Fixtures.knownYARNAppCP, Fixtures.knownMRAppCP))
+      classpath(env) should be(Array(Fixtures.knownYARNAppCP, Fixtures.knownMRAppCP))
     }
   }
 
@@ -104,14 +100,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
   private val USER = "local:/userJar"
   private val ADDED = "local:/addJar1,local:/addJar2,/addJar3"
 
-  private val PWD =
-    if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
-      "{{PWD}}"
-    } else if (Utils.isWindows) {
-      "%PWD%"
-    } else {
-      Environment.PWD.$()
-    }
+  private val PWD = "{{PWD}}"
 
   test("Local jar URIs") {
     val conf = new Configuration()
@@ -388,26 +377,18 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
   object Fixtures {
 
     val knownDefYarnAppCP: Seq[String] =
-      getFieldValue[Array[String], Seq[String]](classOf[YarnConfiguration],
-                                                "DEFAULT_YARN_APPLICATION_CLASSPATH",
-                                                Seq[String]())(a => a.toSeq)
-
+      YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH.toSeq
 
     val knownDefMRAppCP: Seq[String] =
-      getFieldValue2[String, Array[String], Seq[String]](
-        classOf[MRJobConfig],
-        "DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH",
-        Seq[String]())(a => a.split(","))(a => a.toSeq)
+      MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH.split(",").toSeq
 
-    val knownYARNAppCP = Some(Seq("/known/yarn/path"))
+    val knownYARNAppCP = "/known/yarn/path"
 
-    val knownMRAppCP = Some(Seq("/known/mr/path"))
+    val knownMRAppCP = "/known/mr/path"
 
-    val mapMRAppConf =
-      Map("mapreduce.application.classpath" -> knownMRAppCP.map(_.mkString(":")).get)
+    val mapMRAppConf = Map("mapreduce.application.classpath" -> knownMRAppCP)
 
-    val mapYARNAppConf =
-      Map(YarnConfiguration.YARN_APPLICATION_CLASSPATH -> knownYARNAppCP.map(_.mkString(":")).get)
+    val mapYARNAppConf = Map(YarnConfiguration.YARN_APPLICATION_CLASSPATH -> knownYARNAppCP)
 
     val mapAppConf = mapYARNAppConf ++ mapMRAppConf
   }
@@ -423,28 +404,6 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
   def classpath(env: MutableHashMap[String, String]): Array[String] =
     env(Environment.CLASSPATH.name).split(":|;|<CPS>")
 
-  def flatten(a: Option[Seq[String]], b: Option[Seq[String]]): Array[String] =
-    (a ++ b).flatten.toArray
-
-  def getFieldValue[A, B](clazz: Class[_], field: String, defaults: => B)(mapTo: A => B): B = {
-    Try(clazz.getField(field))
-      .map(_.get(null).asInstanceOf[A])
-      .toOption
-      .map(mapTo)
-      .getOrElse(defaults)
-  }
-
-  def getFieldValue2[A: ClassTag, A1: ClassTag, B](
-        clazz: Class[_],
-        field: String,
-        defaults: => B)(mapTo: A => B)(mapTo1: A1 => B): B = {
-    Try(clazz.getField(field)).map(_.get(null)).map {
-      case v: A => mapTo(v)
-      case v1: A1 => mapTo1(v1)
-      case _ => defaults
-    }.toOption.getOrElse(defaults)
-  }
-
   private def createClient(
       sparkConf: SparkConf,
       conf: Configuration = new Configuration(),

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index 7fbbe12..a057618 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -22,8 +22,6 @@ import java.nio.charset.StandardCharsets
 
 import com.google.common.io.{ByteStreams, Files}
 import org.apache.hadoop.io.Text
-import org.apache.hadoop.yarn.api.ApplicationConstants
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.scalatest.Matchers
@@ -147,28 +145,6 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
 
   }
 
-  test("test expandEnvironment result") {
-    val target = Environment.PWD
-    if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
-      YarnSparkHadoopUtil.expandEnvironment(target) should be ("{{" + target + "}}")
-    } else if (Utils.isWindows) {
-      YarnSparkHadoopUtil.expandEnvironment(target) should be ("%" + target + "%")
-    } else {
-      YarnSparkHadoopUtil.expandEnvironment(target) should be ("$" + target)
-    }
-
-  }
-
-  test("test getClassPathSeparator result") {
-    if (classOf[ApplicationConstants].getFields().exists(_.getName == "CLASS_PATH_SEPARATOR")) {
-      YarnSparkHadoopUtil.getClassPathSeparator() should be ("<CPS>")
-    } else if (Utils.isWindows) {
-      YarnSparkHadoopUtil.getClassPathSeparator() should be (";")
-    } else {
-      YarnSparkHadoopUtil.getClassPathSeparator() should be (":")
-    }
-  }
-
   test("check different hadoop utils based on env variable") {
     try {
       System.setProperty("SPARK_YARN_MODE", "true")

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index d197daf..14f721d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -74,23 +74,21 @@ class FileScanRDD(
 
       // Find a function that will return the FileSystem bytes read by this thread. Do this before
       // apply readFunction, because it might read some bytes.
-      private val getBytesReadCallback: Option[() => Long] =
+      private val getBytesReadCallback =
         SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
 
-      // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
+      // We get our input bytes from thread-local Hadoop FileSystem statistics.
       // If we do a coalesce, however, we are likely to compute multiple partitions in the same
       // task and in the same thread, in which case we need to avoid override values written by
       // previous partitions (SPARK-13071).
       private def updateBytesRead(): Unit = {
-        getBytesReadCallback.foreach { getBytesRead =>
-          inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
-        }
+        inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback())
       }
 
       // If we can't get the bytes read from the FS stats, fall back to the file size,
       // which may be inaccurate.
       private def updateBytesReadWithFileSize(): Unit = {
-        if (getBytesReadCallback.isEmpty && currentFile != null) {
+        if (currentFile != null) {
           inputMetrics.incBytesRead(currentFile.length)
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala
index 938af25..c3dd693 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala
@@ -59,10 +59,6 @@ class RecordReaderIterator[T](
   override def close(): Unit = {
     if (rowReader != null) {
       try {
-        // Close the reader and release it. Note: it's very important that we don't close the
-        // reader more than once, since that exposes us to MAPREDUCE-5918 when running against
-        // older Hadoop 2.x releases. That bug can lead to non-deterministic corruption issues
-        // when reading compressed input.
         rowReader.close()
       } finally {
         rowReader = null

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 1b41352..bfdc2cb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -151,7 +151,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
           IOUtils.closeQuietly(output)
         }
       } catch {
-        case e: IOException if isFileAlreadyExistsException(e) =>
+        case e: FileAlreadyExistsException =>
           // Failed to create "tempPath". There are two cases:
           // 1. Someone is creating "tempPath" too.
           // 2. This is a restart. "tempPath" has already been created but not moved to the final
@@ -190,7 +190,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
       val crcPath = new Path(tempPath.getParent(), s".${tempPath.getName()}.crc")
       if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
     } catch {
-      case e: IOException if isFileAlreadyExistsException(e) =>
+      case e: FileAlreadyExistsException =>
         // If "rename" fails, it means some other "HDFSMetadataLog" has committed the batch.
         // So throw an exception to tell the user this is not a valid behavior.
         throw new ConcurrentModificationException(
@@ -206,13 +206,6 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     }
   }
 
-  private def isFileAlreadyExistsException(e: IOException): Boolean = {
-    e.isInstanceOf[FileAlreadyExistsException] ||
-      // Old Hadoop versions don't throw FileAlreadyExistsException. Although it's fixed in
-      // HADOOP-9361 in Hadoop 2.5, we still need to support old Hadoop versions.
-      (e.getMessage != null && e.getMessage.startsWith("File already exists: "))
-  }
-
   /**
    * @return the deserialized metadata in a batch file, or None if file not exist.
    * @throws IllegalArgumentException when path does not point to a batch file.

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index 63fdd6b..d2487a2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -52,7 +52,7 @@ private[hive] object IsolatedClientLoader extends Logging {
       barrierPrefixes: Seq[String] = Seq.empty): IsolatedClientLoader = synchronized {
     val resolvedVersion = hiveVersion(hiveMetastoreVersion)
     // We will first try to share Hadoop classes. If we cannot resolve the Hadoop artifact
-    // with the given version, we will use Hadoop 2.4.0 and then will not share Hadoop classes.
+    // with the given version, we will use Hadoop 2.6 and then will not share Hadoop classes.
     var sharesHadoopClasses = true
     val files = if (resolvedVersions.contains((resolvedVersion, hadoopVersion))) {
       resolvedVersions((resolvedVersion, hadoopVersion))
@@ -63,17 +63,14 @@ private[hive] object IsolatedClientLoader extends Logging {
         } catch {
           case e: RuntimeException if e.getMessage.contains("hadoop") =>
             // If the error message contains hadoop, it is probably because the hadoop
-            // version cannot be resolved (e.g. it is a vendor specific version like
-            // 2.0.0-cdh4.1.1). If it is the case, we will try just
-            // "org.apache.hadoop:hadoop-client:2.4.0". "org.apache.hadoop:hadoop-client:2.4.0"
-            // is used just because we used to hard code it as the hadoop artifact to download.
-            logWarning(s"Failed to resolve Hadoop artifacts for the version ${hadoopVersion}. " +
-              s"We will change the hadoop version from ${hadoopVersion} to 2.4.0 and try again. " +
+            // version cannot be resolved.
+            logWarning(s"Failed to resolve Hadoop artifacts for the version $hadoopVersion. " +
+              s"We will change the hadoop version from $hadoopVersion to 2.6.0 and try again. " +
               "Hadoop classes will not be shared between Spark and Hive metastore client. " +
               "It is recommended to set jars used by Hive metastore client through " +
               "spark.sql.hive.metastore.jars in the production environment.")
             sharesHadoopClasses = false
-            (downloadVersion(resolvedVersion, "2.4.0", ivyPath), "2.4.0")
+            (downloadVersion(resolvedVersion, "2.6.5", ivyPath), "2.6.5")
         }
       resolvedVersions.put((resolvedVersion, actualHadoopVersion), downloadedFiles)
       resolvedVersions((resolvedVersion, actualHadoopVersion))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: [SPARK-19464][CORE][YARN][TEST-HADOOP2.6] Remove support for Hadoop 2.5 and earlier

Posted by sr...@apache.org.
[SPARK-19464][CORE][YARN][TEST-HADOOP2.6] Remove support for Hadoop 2.5 and earlier

## What changes were proposed in this pull request?

- Remove support for Hadoop 2.5 and earlier
- Remove reflection and code constructs only needed to support multiple versions at once
- Update docs to reflect newer versions
- Remove older versions' builds and profiles.

## How was this patch tested?

Existing tests

Author: Sean Owen <so...@cloudera.com>

Closes #16810 from srowen/SPARK-19464.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e8d3fca4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e8d3fca4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e8d3fca4

Branch: refs/heads/master
Commit: e8d3fca4502d5f5b8f38525b5fdabe80ccf9a8ec
Parents: d60dde2
Author: Sean Owen <so...@cloudera.com>
Authored: Wed Feb 8 12:20:07 2017 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Feb 8 12:20:07 2017 +0000

----------------------------------------------------------------------
 .travis.yml                                     |   2 +-
 appveyor.yml                                    |   2 +-
 assembly/README                                 |   2 +-
 .../apache/spark/deploy/SparkHadoopUtil.scala   |  59 ++-----
 .../io/SparkHadoopMapReduceWriter.scala         |  31 ++--
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |  66 ++-----
 .../org/apache/spark/rdd/NewHadoopRDD.scala     |  23 +--
 .../org/apache/spark/rdd/PairRDDFunctions.scala |  15 +-
 .../spark/metrics/InputOutputMetricsSuite.scala |  74 ++++----
 dev/appveyor-install-dependencies.ps1           |   2 +-
 dev/create-release/release-build.sh             |   5 +-
 dev/deps/spark-deps-hadoop-2.2                  | 166 ------------------
 dev/deps/spark-deps-hadoop-2.3                  | 174 -------------------
 dev/deps/spark-deps-hadoop-2.4                  | 174 -------------------
 dev/make-distribution.sh                        |  14 --
 dev/run-tests.py                                |   3 -
 dev/test-dependencies.sh                        |   3 -
 docs/building-spark.md                          |  61 ++-----
 docs/running-on-yarn.md                         |   2 +-
 pom.xml                                         |  39 +----
 project/SparkBuild.scala                        |  38 +---
 resource-managers/yarn/pom.xml                  |  22 ---
 .../spark/deploy/yarn/ApplicationMaster.scala   |  17 +-
 .../org/apache/spark/deploy/yarn/Client.scala   | 169 ++++--------------
 .../spark/deploy/yarn/ExecutorRunnable.scala    |  10 +-
 .../spark/deploy/yarn/YarnAllocator.scala       |  19 +-
 .../apache/spark/deploy/yarn/YarnRMClient.scala |  31 +---
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala |  53 ++----
 .../apache/spark/deploy/yarn/ClientSuite.scala  |  65 ++-----
 .../deploy/yarn/YarnSparkHadoopUtilSuite.scala  |  24 ---
 .../sql/execution/datasources/FileScanRDD.scala |  10 +-
 .../datasources/RecordReaderIterator.scala      |   4 -
 .../execution/streaming/HDFSMetadataLog.scala   |  11 +-
 .../sql/hive/client/IsolatedClientLoader.scala  |  13 +-
 34 files changed, 202 insertions(+), 1201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 8739849..d94872d 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -44,7 +44,7 @@ notifications:
 # 5. Run maven install before running lint-java.
 install:
   - export MAVEN_SKIP_RC=1
-  - build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Phadoop-2.3 -Pkinesis-asl -Phive -Phive-thriftserver install
+  - build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install
 
 # 6. Run lint-java.
 script:

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/appveyor.yml
----------------------------------------------------------------------
diff --git a/appveyor.yml b/appveyor.yml
index 5e75683..6bc66c0 100644
--- a/appveyor.yml
+++ b/appveyor.yml
@@ -43,7 +43,7 @@ install:
   - cmd: R -e "packageVersion('survival')"
 
 build_script:
-  - cmd: mvn -DskipTests -Phadoop-2.6 -Psparkr -Phive -Phive-thriftserver package
+  - cmd: mvn -DskipTests -Psparkr -Phive -Phive-thriftserver package
 
 test_script:
   - cmd: .\bin\spark-submit2.cmd --conf spark.hadoop.fs.default.name="file:///" R\pkg\tests\run-all.R

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/assembly/README
----------------------------------------------------------------------
diff --git a/assembly/README b/assembly/README
index 14a5ff8..d5dafab 100644
--- a/assembly/README
+++ b/assembly/README
@@ -9,4 +9,4 @@ This module is off by default. To activate it specify the profile in the command
 
 If you need to build an assembly for a different version of Hadoop the
 hadoop-version system property needs to be set as in this example:
-  -Dhadoop.version=2.0.6-alpha
+  -Dhadoop.version=2.7.3

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 2315607..941e2d1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.deploy
 
 import java.io.IOException
-import java.lang.reflect.Method
 import java.security.PrivilegedExceptionAction
 import java.text.DateFormat
 import java.util.{Arrays, Comparator, Date, Locale}
@@ -29,7 +28,6 @@ import scala.util.control.NonFatal
 import com.google.common.primitives.Longs
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
-import org.apache.hadoop.fs.FileSystem.Statistics
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 import org.apache.hadoop.security.token.{Token, TokenIdentifier}
@@ -140,54 +138,29 @@ class SparkHadoopUtil extends Logging {
   /**
    * Returns a function that can be called to find Hadoop FileSystem bytes read. If
    * getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will
-   * return the bytes read on r since t.  Reflection is required because thread-level FileSystem
-   * statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
-   * Returns None if the required method can't be found.
+   * return the bytes read on r since t.
+   *
+   * @return None if the required method can't be found.
    */
-  private[spark] def getFSBytesReadOnThreadCallback(): Option[() => Long] = {
-    try {
-      val threadStats = getFileSystemThreadStatistics()
-      val getBytesReadMethod = getFileSystemThreadStatisticsMethod("getBytesRead")
-      val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum
-      val baselineBytesRead = f()
-      Some(() => f() - baselineBytesRead)
-    } catch {
-      case e @ (_: NoSuchMethodException | _: ClassNotFoundException) =>
-        logDebug("Couldn't find method for retrieving thread-level FileSystem input data", e)
-        None
-    }
+  private[spark] def getFSBytesReadOnThreadCallback(): () => Long = {
+    val threadStats = FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics)
+    val f = () => threadStats.map(_.getBytesRead).sum
+    val baselineBytesRead = f()
+    () => f() - baselineBytesRead
   }
 
   /**
    * Returns a function that can be called to find Hadoop FileSystem bytes written. If
    * getFSBytesWrittenOnThreadCallback is called from thread r at time t, the returned callback will
-   * return the bytes written on r since t.  Reflection is required because thread-level FileSystem
-   * statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
-   * Returns None if the required method can't be found.
+   * return the bytes written on r since t.
+   *
+   * @return None if the required method can't be found.
    */
-  private[spark] def getFSBytesWrittenOnThreadCallback(): Option[() => Long] = {
-    try {
-      val threadStats = getFileSystemThreadStatistics()
-      val getBytesWrittenMethod = getFileSystemThreadStatisticsMethod("getBytesWritten")
-      val f = () => threadStats.map(getBytesWrittenMethod.invoke(_).asInstanceOf[Long]).sum
-      val baselineBytesWritten = f()
-      Some(() => f() - baselineBytesWritten)
-    } catch {
-      case e @ (_: NoSuchMethodException | _: ClassNotFoundException) =>
-        logDebug("Couldn't find method for retrieving thread-level FileSystem output data", e)
-        None
-    }
-  }
-
-  private def getFileSystemThreadStatistics(): Seq[AnyRef] = {
-    FileSystem.getAllStatistics.asScala.map(
-      Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
-  }
-
-  private def getFileSystemThreadStatisticsMethod(methodName: String): Method = {
-    val statisticsDataClass =
-      Utils.classForName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
-    statisticsDataClass.getDeclaredMethod(methodName)
+  private[spark] def getFSBytesWrittenOnThreadCallback(): () => Long = {
+    val threadStats = FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics)
+    val f = () => threadStats.map(_.getBytesWritten).sum
+    val baselineBytesWritten = f()
+    () => f() - baselineBytesWritten
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
index 63918ef..1e0a1e6 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
@@ -125,8 +125,7 @@ object SparkHadoopMapReduceWriter extends Logging {
     val taskContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
     committer.setupTask(taskContext)
 
-    val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] =
-      SparkHadoopWriterUtils.initHadoopOutputMetrics(context)
+    val (outputMetrics, callback) = SparkHadoopWriterUtils.initHadoopOutputMetrics(context)
 
     // Initiate the writer.
     val taskFormat = outputFormat.newInstance()
@@ -149,8 +148,7 @@ object SparkHadoopMapReduceWriter extends Logging {
           writer.write(pair._1, pair._2)
 
           // Update bytes written metric every few records
-          SparkHadoopWriterUtils.maybeUpdateOutputMetrics(
-            outputMetricsAndBytesWrittenCallback, recordsWritten)
+          SparkHadoopWriterUtils.maybeUpdateOutputMetrics(outputMetrics, callback, recordsWritten)
           recordsWritten += 1
         }
         if (writer != null) {
@@ -171,11 +169,8 @@ object SparkHadoopMapReduceWriter extends Logging {
         }
       })
 
-      outputMetricsAndBytesWrittenCallback.foreach {
-        case (om, callback) =>
-          om.setBytesWritten(callback())
-          om.setRecordsWritten(recordsWritten)
-      }
+      outputMetrics.setBytesWritten(callback())
+      outputMetrics.setRecordsWritten(recordsWritten)
 
       ret
     } catch {
@@ -222,24 +217,18 @@ object SparkHadoopWriterUtils {
   // TODO: these don't seem like the right abstractions.
   // We should abstract the duplicate code in a less awkward way.
 
-  // return type: (output metrics, bytes written callback), defined only if the latter is defined
-  def initHadoopOutputMetrics(
-      context: TaskContext): Option[(OutputMetrics, () => Long)] = {
+  def initHadoopOutputMetrics(context: TaskContext): (OutputMetrics, () => Long) = {
     val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback()
-    bytesWrittenCallback.map { b =>
-      (context.taskMetrics().outputMetrics, b)
-    }
+    (context.taskMetrics().outputMetrics, bytesWrittenCallback)
   }
 
   def maybeUpdateOutputMetrics(
-      outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)],
+      outputMetrics: OutputMetrics,
+      callback: () => Long,
       recordsWritten: Long): Unit = {
     if (recordsWritten % RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) {
-      outputMetricsAndBytesWrittenCallback.foreach {
-        case (om, callback) =>
-          om.setBytesWritten(callback())
-          om.setRecordsWritten(recordsWritten)
-      }
+      outputMetrics.setBytesWritten(callback())
+      outputMetrics.setRecordsWritten(recordsWritten)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index a83e139..5fa6a7e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -25,15 +25,7 @@ import scala.collection.immutable.Map
 import scala.reflect.ClassTag
 
 import org.apache.hadoop.conf.{Configurable, Configuration}
-import org.apache.hadoop.mapred.FileSplit
-import org.apache.hadoop.mapred.InputFormat
-import org.apache.hadoop.mapred.InputSplit
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.JobID
-import org.apache.hadoop.mapred.RecordReader
-import org.apache.hadoop.mapred.Reporter
-import org.apache.hadoop.mapred.TaskAttemptID
-import org.apache.hadoop.mapred.TaskID
+import org.apache.hadoop.mapred._
 import org.apache.hadoop.mapred.lib.CombineFileSplit
 import org.apache.hadoop.mapreduce.TaskType
 import org.apache.hadoop.util.ReflectionUtils
@@ -47,7 +39,7 @@ import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
 import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
 import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation}
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{NextIterator, SerializableConfiguration, ShutdownHookManager, Utils}
+import org.apache.spark.util.{NextIterator, SerializableConfiguration, ShutdownHookManager}
 
 /**
  * A Spark split class that wraps around a Hadoop InputSplit.
@@ -229,11 +221,11 @@ class HadoopRDD[K, V](
       // creating RecordReader, because RecordReader's constructor might read some bytes
       private val getBytesReadCallback: Option[() => Long] = split.inputSplit.value match {
         case _: FileSplit | _: CombineFileSplit =>
-          SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
+          Some(SparkHadoopUtil.get.getFSBytesReadOnThreadCallback())
         case _ => None
       }
 
-      // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
+      // We get our input bytes from thread-local Hadoop FileSystem statistics.
       // If we do a coalesce, however, we are likely to compute multiple partitions in the same
       // task and in the same thread, in which case we need to avoid override values written by
       // previous partitions (SPARK-13071).
@@ -280,13 +272,9 @@ class HadoopRDD[K, V](
         (key, value)
       }
 
-      override def close() {
+      override def close(): Unit = {
         if (reader != null) {
           InputFileBlockHolder.unset()
-          // Close the reader and release it. Note: it's very important that we don't close the
-          // reader more than once, since that exposes us to MAPREDUCE-5918 when running against
-          // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
-          // corruption issues when reading compressed input.
           try {
             reader.close()
           } catch {
@@ -326,18 +314,10 @@ class HadoopRDD[K, V](
 
   override def getPreferredLocations(split: Partition): Seq[String] = {
     val hsplit = split.asInstanceOf[HadoopPartition].inputSplit.value
-    val locs: Option[Seq[String]] = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
-      case Some(c) =>
-        try {
-          val lsplit = c.inputSplitWithLocationInfo.cast(hsplit)
-          val infos = c.getLocationInfo.invoke(lsplit).asInstanceOf[Array[AnyRef]]
-          HadoopRDD.convertSplitLocationInfo(infos)
-        } catch {
-          case e: Exception =>
-            logDebug("Failed to use InputSplitWithLocations.", e)
-            None
-        }
-      case None => None
+    val locs = hsplit match {
+      case lsplit: InputSplitWithLocationInfo =>
+        HadoopRDD.convertSplitLocationInfo(lsplit.getLocationInfo)
+      case _ => None
     }
     locs.getOrElse(hsplit.getLocations.filter(_ != "localhost"))
   }
@@ -413,32 +393,12 @@ private[spark] object HadoopRDD extends Logging {
     }
   }
 
-  private[spark] class SplitInfoReflections {
-    val inputSplitWithLocationInfo =
-      Utils.classForName("org.apache.hadoop.mapred.InputSplitWithLocationInfo")
-    val getLocationInfo = inputSplitWithLocationInfo.getMethod("getLocationInfo")
-    val newInputSplit = Utils.classForName("org.apache.hadoop.mapreduce.InputSplit")
-    val newGetLocationInfo = newInputSplit.getMethod("getLocationInfo")
-    val splitLocationInfo = Utils.classForName("org.apache.hadoop.mapred.SplitLocationInfo")
-    val isInMemory = splitLocationInfo.getMethod("isInMemory")
-    val getLocation = splitLocationInfo.getMethod("getLocation")
-  }
-
-  private[spark] val SPLIT_INFO_REFLECTIONS: Option[SplitInfoReflections] = try {
-    Some(new SplitInfoReflections)
-  } catch {
-    case e: Exception =>
-      logDebug("SplitLocationInfo and other new Hadoop classes are " +
-          "unavailable. Using the older Hadoop location info code.", e)
-      None
-  }
-
-  private[spark] def convertSplitLocationInfo(infos: Array[AnyRef]): Option[Seq[String]] = {
+  private[spark] def convertSplitLocationInfo(
+       infos: Array[SplitLocationInfo]): Option[Seq[String]] = {
     Option(infos).map(_.flatMap { loc =>
-      val reflections = HadoopRDD.SPLIT_INFO_REFLECTIONS.get
-      val locationStr = reflections.getLocation.invoke(loc).asInstanceOf[String]
+      val locationStr = loc.getLocation
       if (locationStr != "localhost") {
-        if (reflections.isInMemory.invoke(loc).asInstanceOf[Boolean]) {
+        if (loc.isInMemory) {
           logDebug(s"Partition $locationStr is cached by Hadoop.")
           Some(HDFSCacheTaskLocation(locationStr).toString)
         } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 733e85f..ce3a9a2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -152,11 +152,11 @@ class NewHadoopRDD[K, V](
       private val getBytesReadCallback: Option[() => Long] =
         split.serializableHadoopSplit.value match {
           case _: FileSplit | _: CombineFileSplit =>
-            SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
+            Some(SparkHadoopUtil.get.getFSBytesReadOnThreadCallback())
           case _ => None
         }
 
-      // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
+      // We get our input bytes from thread-local Hadoop FileSystem statistics.
       // If we do a coalesce, however, we are likely to compute multiple partitions in the same
       // task and in the same thread, in which case we need to avoid override values written by
       // previous partitions (SPARK-13071).
@@ -231,13 +231,9 @@ class NewHadoopRDD[K, V](
         (reader.getCurrentKey, reader.getCurrentValue)
       }
 
-      private def close() {
+      private def close(): Unit = {
         if (reader != null) {
           InputFileBlockHolder.unset()
-          // Close the reader and release it. Note: it's very important that we don't close the
-          // reader more than once, since that exposes us to MAPREDUCE-5918 when running against
-          // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
-          // corruption issues when reading compressed input.
           try {
             reader.close()
           } catch {
@@ -277,18 +273,7 @@ class NewHadoopRDD[K, V](
 
   override def getPreferredLocations(hsplit: Partition): Seq[String] = {
     val split = hsplit.asInstanceOf[NewHadoopPartition].serializableHadoopSplit.value
-    val locs = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
-      case Some(c) =>
-        try {
-          val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]]
-          HadoopRDD.convertSplitLocationInfo(infos)
-        } catch {
-          case e : Exception =>
-            logDebug("Failed to use InputSplit#getLocationInfo.", e)
-            None
-        }
-      case None => None
-    }
+    val locs = HadoopRDD.convertSplitLocationInfo(split.getLocationInfo)
     locs.getOrElse(split.getLocations.filter(_ != "localhost"))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 41093bd..567a318 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -37,8 +37,7 @@ import org.apache.spark._
 import org.apache.spark.Partitioner.defaultPartitioner
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.executor.OutputMetrics
-import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol, SparkHadoopMapReduceWriter, SparkHadoopWriterUtils}
+import org.apache.spark.internal.io.{SparkHadoopMapReduceWriter, SparkHadoopWriterUtils}
 import org.apache.spark.internal.Logging
 import org.apache.spark.partial.{BoundedDouble, PartialResult}
 import org.apache.spark.serializer.Serializer
@@ -1126,8 +1125,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       // around by taking a mod. We expect that no task will be attempted 2 billion times.
       val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt
 
-      val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] =
-        SparkHadoopWriterUtils.initHadoopOutputMetrics(context)
+      val (outputMetrics, callback) = SparkHadoopWriterUtils.initHadoopOutputMetrics(context)
 
       writer.setup(context.stageId, context.partitionId, taskAttemptId)
       writer.open()
@@ -1139,16 +1137,13 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
           writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
 
           // Update bytes written metric every few records
-          SparkHadoopWriterUtils.maybeUpdateOutputMetrics(
-            outputMetricsAndBytesWrittenCallback, recordsWritten)
+          SparkHadoopWriterUtils.maybeUpdateOutputMetrics(outputMetrics, callback, recordsWritten)
           recordsWritten += 1
         }
       }(finallyBlock = writer.close())
       writer.commit()
-      outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) =>
-        om.setBytesWritten(callback())
-        om.setRecordsWritten(recordsWritten)
-      }
+      outputMetrics.setBytesWritten(callback())
+      outputMetrics.setRecordsWritten(recordsWritten)
     }
 
     self.context.runJob(self, writeToFile)

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
index a73b300..becf382 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
@@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutput
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.{SharedSparkContext, SparkFunSuite}
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
 import org.apache.spark.util.Utils
 
@@ -186,10 +185,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
     sc.listenerBus.waitUntilEmpty(500)
     assert(inputRead == numRecords)
 
-    // Only supported on newer Hadoop
-    if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) {
-      assert(outputWritten == numBuckets)
-    }
+    assert(outputWritten == numBuckets)
     assert(shuffleRead == shuffleWritten)
   }
 
@@ -262,57 +258,49 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
   }
 
   test("output metrics on records written") {
-    // Only supported on newer Hadoop
-    if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) {
-      val file = new File(tmpDir, getClass.getSimpleName)
-      val filePath = "file://" + file.getAbsolutePath
+    val file = new File(tmpDir, getClass.getSimpleName)
+    val filePath = "file://" + file.getAbsolutePath
 
-      val records = runAndReturnRecordsWritten {
-        sc.parallelize(1 to numRecords).saveAsTextFile(filePath)
-      }
-      assert(records == numRecords)
+    val records = runAndReturnRecordsWritten {
+      sc.parallelize(1 to numRecords).saveAsTextFile(filePath)
     }
+    assert(records == numRecords)
   }
 
   test("output metrics on records written - new Hadoop API") {
-    // Only supported on newer Hadoop
-    if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) {
-      val file = new File(tmpDir, getClass.getSimpleName)
-      val filePath = "file://" + file.getAbsolutePath
-
-      val records = runAndReturnRecordsWritten {
-        sc.parallelize(1 to numRecords).map(key => (key.toString, key.toString))
-          .saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](filePath)
-      }
-      assert(records == numRecords)
+    val file = new File(tmpDir, getClass.getSimpleName)
+    val filePath = "file://" + file.getAbsolutePath
+
+    val records = runAndReturnRecordsWritten {
+      sc.parallelize(1 to numRecords).map(key => (key.toString, key.toString))
+        .saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](filePath)
     }
+    assert(records == numRecords)
   }
 
   test("output metrics when writing text file") {
     val fs = FileSystem.getLocal(new Configuration())
     val outPath = new Path(fs.getWorkingDirectory, "outdir")
 
-    if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) {
-      val taskBytesWritten = new ArrayBuffer[Long]()
-      sc.addSparkListener(new SparkListener() {
-        override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
-          taskBytesWritten += taskEnd.taskMetrics.outputMetrics.bytesWritten
-        }
-      })
-
-      val rdd = sc.parallelize(Array("a", "b", "c", "d"), 2)
-
-      try {
-        rdd.saveAsTextFile(outPath.toString)
-        sc.listenerBus.waitUntilEmpty(500)
-        assert(taskBytesWritten.length == 2)
-        val outFiles = fs.listStatus(outPath).filter(_.getPath.getName != "_SUCCESS")
-        taskBytesWritten.zip(outFiles).foreach { case (bytes, fileStatus) =>
-          assert(bytes >= fileStatus.getLen)
-        }
-      } finally {
-        fs.delete(outPath, true)
+    val taskBytesWritten = new ArrayBuffer[Long]()
+    sc.addSparkListener(new SparkListener() {
+      override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+        taskBytesWritten += taskEnd.taskMetrics.outputMetrics.bytesWritten
+      }
+    })
+
+    val rdd = sc.parallelize(Array("a", "b", "c", "d"), 2)
+
+    try {
+      rdd.saveAsTextFile(outPath.toString)
+      sc.listenerBus.waitUntilEmpty(500)
+      assert(taskBytesWritten.length == 2)
+      val outFiles = fs.listStatus(outPath).filter(_.getPath.getName != "_SUCCESS")
+      taskBytesWritten.zip(outFiles).foreach { case (bytes, fileStatus) =>
+        assert(bytes >= fileStatus.getLen)
       }
+    } finally {
+      fs.delete(outPath, true)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/dev/appveyor-install-dependencies.ps1
----------------------------------------------------------------------
diff --git a/dev/appveyor-install-dependencies.ps1 b/dev/appveyor-install-dependencies.ps1
index b72d6b5..5a72967 100644
--- a/dev/appveyor-install-dependencies.ps1
+++ b/dev/appveyor-install-dependencies.ps1
@@ -95,7 +95,7 @@ $env:MAVEN_OPTS = "-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
 Pop-Location
 
 # ========================== Hadoop bin package
-$hadoopVer = "2.6.0"
+$hadoopVer = "2.6.5"
 $hadoopPath = "$tools\hadoop"
 if (!(Test-Path $hadoopPath)) {
     New-Item -ItemType Directory -Force -Path $hadoopPath | Out-Null

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/dev/create-release/release-build.sh
----------------------------------------------------------------------
diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh
index b08577c..d616f80 100755
--- a/dev/create-release/release-build.sh
+++ b/dev/create-release/release-build.sh
@@ -80,7 +80,7 @@ NEXUS_PROFILE=d63f592e7eac0 # Profile for Spark staging uploads
 BASE_DIR=$(pwd)
 
 MVN="build/mvn --force"
-PUBLISH_PROFILES="-Pmesos -Pyarn -Phive -Phive-thriftserver -Phadoop-2.2"
+PUBLISH_PROFILES="-Pmesos -Pyarn -Phive -Phive-thriftserver"
 PUBLISH_PROFILES="$PUBLISH_PROFILES -Pspark-ganglia-lgpl -Pkinesis-asl"
 
 rm -rf spark
@@ -236,11 +236,8 @@ if [[ "$1" == "package" ]]; then
   # We increment the Zinc port each time to avoid OOM's and other craziness if multiple builds
   # share the same Zinc server.
   FLAGS="-Psparkr -Phive -Phive-thriftserver -Pyarn -Pmesos"
-  make_binary_release "hadoop2.3" "-Phadoop-2.3 $FLAGS" "3033" &
-  make_binary_release "hadoop2.4" "-Phadoop-2.4 $FLAGS" "3034" &
   make_binary_release "hadoop2.6" "-Phadoop-2.6 $FLAGS" "3035" "withr" &
   make_binary_release "hadoop2.7" "-Phadoop-2.7 $FLAGS" "3036" "withpip" &
-  make_binary_release "hadoop2.4-without-hive" "-Psparkr -Phadoop-2.4 -Pyarn -Pmesos" "3037" &
   make_binary_release "without-hadoop" "-Psparkr -Phadoop-provided -Pyarn -Pmesos" "3038" &
   wait
   rm -rf spark-$SPARK_VERSION-bin-*/

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/dev/deps/spark-deps-hadoop-2.2
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
deleted file mode 100644
index 1254188..0000000
--- a/dev/deps/spark-deps-hadoop-2.2
+++ /dev/null
@@ -1,166 +0,0 @@
-JavaEWAH-0.3.2.jar
-RoaringBitmap-0.5.11.jar
-ST4-4.0.4.jar
-antlr-2.7.7.jar
-antlr-runtime-3.4.jar
-antlr4-runtime-4.5.3.jar
-aopalliance-1.0.jar
-aopalliance-repackaged-2.4.0-b34.jar
-apache-log4j-extras-1.2.17.jar
-arpack_combined_all-0.1.jar
-avro-1.7.7.jar
-avro-ipc-1.7.7.jar
-avro-mapred-1.7.7-hadoop2.jar
-bonecp-0.8.0.RELEASE.jar
-breeze-macros_2.11-0.12.jar
-breeze_2.11-0.12.jar
-calcite-avatica-1.2.0-incubating.jar
-calcite-core-1.2.0-incubating.jar
-calcite-linq4j-1.2.0-incubating.jar
-chill-java-0.8.0.jar
-chill_2.11-0.8.0.jar
-commons-beanutils-1.7.0.jar
-commons-beanutils-core-1.8.0.jar
-commons-cli-1.2.jar
-commons-codec-1.10.jar
-commons-collections-3.2.2.jar
-commons-compiler-3.0.0.jar
-commons-compress-1.4.1.jar
-commons-configuration-1.6.jar
-commons-crypto-1.0.0.jar
-commons-dbcp-1.4.jar
-commons-digester-1.8.jar
-commons-httpclient-3.1.jar
-commons-io-2.4.jar
-commons-lang-2.6.jar
-commons-lang3-3.5.jar
-commons-logging-1.1.3.jar
-commons-math-2.1.jar
-commons-math3-3.4.1.jar
-commons-net-2.2.jar
-commons-pool-1.5.4.jar
-compress-lzf-1.0.3.jar
-core-1.1.2.jar
-curator-client-2.4.0.jar
-curator-framework-2.4.0.jar
-curator-recipes-2.4.0.jar
-datanucleus-api-jdo-3.2.6.jar
-datanucleus-core-3.2.10.jar
-datanucleus-rdbms-3.2.9.jar
-derby-10.12.1.1.jar
-eigenbase-properties-1.1.5.jar
-guava-14.0.1.jar
-guice-3.0.jar
-guice-servlet-3.0.jar
-hadoop-annotations-2.2.0.jar
-hadoop-auth-2.2.0.jar
-hadoop-client-2.2.0.jar
-hadoop-common-2.2.0.jar
-hadoop-hdfs-2.2.0.jar
-hadoop-mapreduce-client-app-2.2.0.jar
-hadoop-mapreduce-client-common-2.2.0.jar
-hadoop-mapreduce-client-core-2.2.0.jar
-hadoop-mapreduce-client-jobclient-2.2.0.jar
-hadoop-mapreduce-client-shuffle-2.2.0.jar
-hadoop-yarn-api-2.2.0.jar
-hadoop-yarn-client-2.2.0.jar
-hadoop-yarn-common-2.2.0.jar
-hadoop-yarn-server-common-2.2.0.jar
-hadoop-yarn-server-web-proxy-2.2.0.jar
-hk2-api-2.4.0-b34.jar
-hk2-locator-2.4.0-b34.jar
-hk2-utils-2.4.0-b34.jar
-httpclient-4.5.2.jar
-httpcore-4.4.4.jar
-ivy-2.4.0.jar
-jackson-annotations-2.6.5.jar
-jackson-core-2.6.5.jar
-jackson-core-asl-1.9.13.jar
-jackson-databind-2.6.5.jar
-jackson-mapper-asl-1.9.13.jar
-jackson-module-paranamer-2.6.5.jar
-jackson-module-scala_2.11-2.6.5.jar
-janino-3.0.0.jar
-javassist-3.18.1-GA.jar
-javax.annotation-api-1.2.jar
-javax.inject-1.jar
-javax.inject-2.4.0-b34.jar
-javax.servlet-api-3.1.0.jar
-javax.ws.rs-api-2.0.1.jar
-javolution-5.5.1.jar
-jcl-over-slf4j-1.7.16.jar
-jdo-api-3.0.1.jar
-jersey-client-2.22.2.jar
-jersey-common-2.22.2.jar
-jersey-container-servlet-2.22.2.jar
-jersey-container-servlet-core-2.22.2.jar
-jersey-guava-2.22.2.jar
-jersey-media-jaxb-2.22.2.jar
-jersey-server-2.22.2.jar
-jets3t-0.7.1.jar
-jetty-util-6.1.26.jar
-jline-2.12.1.jar
-joda-time-2.9.3.jar
-jodd-core-3.5.2.jar
-jpam-1.1.jar
-json4s-ast_2.11-3.2.11.jar
-json4s-core_2.11-3.2.11.jar
-json4s-jackson_2.11-3.2.11.jar
-jsr305-1.3.9.jar
-jta-1.1.jar
-jtransforms-2.4.0.jar
-jul-to-slf4j-1.7.16.jar
-kryo-shaded-3.0.3.jar
-leveldbjni-all-1.8.jar
-libfb303-0.9.3.jar
-libthrift-0.9.3.jar
-log4j-1.2.17.jar
-lz4-1.3.0.jar
-mesos-1.0.0-shaded-protobuf.jar
-metrics-core-3.1.2.jar
-metrics-graphite-3.1.2.jar
-metrics-json-3.1.2.jar
-metrics-jvm-3.1.2.jar
-minlog-1.3.0.jar
-netty-3.9.9.Final.jar
-netty-all-4.0.43.Final.jar
-objenesis-2.1.jar
-opencsv-2.3.jar
-oro-2.0.8.jar
-osgi-resource-locator-1.0.1.jar
-paranamer-2.6.jar
-parquet-column-1.8.2.jar
-parquet-common-1.8.2.jar
-parquet-encoding-1.8.2.jar
-parquet-format-2.3.1.jar
-parquet-hadoop-1.8.2.jar
-parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.8.2.jar
-pmml-model-1.2.15.jar
-pmml-schema-1.2.15.jar
-protobuf-java-2.5.0.jar
-py4j-0.10.4.jar
-pyrolite-4.13.jar
-scala-compiler-2.11.8.jar
-scala-library-2.11.8.jar
-scala-parser-combinators_2.11-1.0.4.jar
-scala-reflect-2.11.8.jar
-scala-xml_2.11-1.0.2.jar
-scalap-2.11.8.jar
-shapeless_2.11-2.0.0.jar
-slf4j-api-1.7.16.jar
-slf4j-log4j12-1.7.16.jar
-snappy-0.2.jar
-snappy-java-1.1.2.6.jar
-spire-macros_2.11-0.7.4.jar
-spire_2.11-0.7.4.jar
-stax-api-1.0.1.jar
-stream-2.7.0.jar
-stringtemplate-3.2.1.jar
-super-csv-2.2.0.jar
-univocity-parsers-2.2.1.jar
-validation-api-1.1.0.Final.jar
-xbean-asm5-shaded-4.4.jar
-xmlenc-0.52.jar
-xz-1.0.jar
-zookeeper-3.4.5.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/dev/deps/spark-deps-hadoop-2.3
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
deleted file mode 100644
index 39ba2ae..0000000
--- a/dev/deps/spark-deps-hadoop-2.3
+++ /dev/null
@@ -1,174 +0,0 @@
-JavaEWAH-0.3.2.jar
-RoaringBitmap-0.5.11.jar
-ST4-4.0.4.jar
-activation-1.1.1.jar
-antlr-2.7.7.jar
-antlr-runtime-3.4.jar
-antlr4-runtime-4.5.3.jar
-aopalliance-1.0.jar
-aopalliance-repackaged-2.4.0-b34.jar
-apache-log4j-extras-1.2.17.jar
-arpack_combined_all-0.1.jar
-avro-1.7.7.jar
-avro-ipc-1.7.7.jar
-avro-mapred-1.7.7-hadoop2.jar
-base64-2.3.8.jar
-bcprov-jdk15on-1.51.jar
-bonecp-0.8.0.RELEASE.jar
-breeze-macros_2.11-0.12.jar
-breeze_2.11-0.12.jar
-calcite-avatica-1.2.0-incubating.jar
-calcite-core-1.2.0-incubating.jar
-calcite-linq4j-1.2.0-incubating.jar
-chill-java-0.8.0.jar
-chill_2.11-0.8.0.jar
-commons-beanutils-1.7.0.jar
-commons-beanutils-core-1.8.0.jar
-commons-cli-1.2.jar
-commons-codec-1.10.jar
-commons-collections-3.2.2.jar
-commons-compiler-3.0.0.jar
-commons-compress-1.4.1.jar
-commons-configuration-1.6.jar
-commons-crypto-1.0.0.jar
-commons-dbcp-1.4.jar
-commons-digester-1.8.jar
-commons-httpclient-3.1.jar
-commons-io-2.4.jar
-commons-lang-2.6.jar
-commons-lang3-3.5.jar
-commons-logging-1.1.3.jar
-commons-math3-3.4.1.jar
-commons-net-2.2.jar
-commons-pool-1.5.4.jar
-compress-lzf-1.0.3.jar
-core-1.1.2.jar
-curator-client-2.4.0.jar
-curator-framework-2.4.0.jar
-curator-recipes-2.4.0.jar
-datanucleus-api-jdo-3.2.6.jar
-datanucleus-core-3.2.10.jar
-datanucleus-rdbms-3.2.9.jar
-derby-10.12.1.1.jar
-eigenbase-properties-1.1.5.jar
-guava-14.0.1.jar
-guice-3.0.jar
-guice-servlet-3.0.jar
-hadoop-annotations-2.3.0.jar
-hadoop-auth-2.3.0.jar
-hadoop-client-2.3.0.jar
-hadoop-common-2.3.0.jar
-hadoop-hdfs-2.3.0.jar
-hadoop-mapreduce-client-app-2.3.0.jar
-hadoop-mapreduce-client-common-2.3.0.jar
-hadoop-mapreduce-client-core-2.3.0.jar
-hadoop-mapreduce-client-jobclient-2.3.0.jar
-hadoop-mapreduce-client-shuffle-2.3.0.jar
-hadoop-yarn-api-2.3.0.jar
-hadoop-yarn-client-2.3.0.jar
-hadoop-yarn-common-2.3.0.jar
-hadoop-yarn-server-common-2.3.0.jar
-hadoop-yarn-server-web-proxy-2.3.0.jar
-hk2-api-2.4.0-b34.jar
-hk2-locator-2.4.0-b34.jar
-hk2-utils-2.4.0-b34.jar
-httpclient-4.5.2.jar
-httpcore-4.4.4.jar
-ivy-2.4.0.jar
-jackson-annotations-2.6.5.jar
-jackson-core-2.6.5.jar
-jackson-core-asl-1.9.13.jar
-jackson-databind-2.6.5.jar
-jackson-mapper-asl-1.9.13.jar
-jackson-module-paranamer-2.6.5.jar
-jackson-module-scala_2.11-2.6.5.jar
-janino-3.0.0.jar
-java-xmlbuilder-1.0.jar
-javassist-3.18.1-GA.jar
-javax.annotation-api-1.2.jar
-javax.inject-1.jar
-javax.inject-2.4.0-b34.jar
-javax.servlet-api-3.1.0.jar
-javax.ws.rs-api-2.0.1.jar
-javolution-5.5.1.jar
-jaxb-api-2.2.2.jar
-jcl-over-slf4j-1.7.16.jar
-jdo-api-3.0.1.jar
-jersey-client-2.22.2.jar
-jersey-common-2.22.2.jar
-jersey-container-servlet-2.22.2.jar
-jersey-container-servlet-core-2.22.2.jar
-jersey-guava-2.22.2.jar
-jersey-media-jaxb-2.22.2.jar
-jersey-server-2.22.2.jar
-jets3t-0.9.3.jar
-jetty-6.1.26.jar
-jetty-util-6.1.26.jar
-jline-2.12.1.jar
-joda-time-2.9.3.jar
-jodd-core-3.5.2.jar
-jpam-1.1.jar
-json4s-ast_2.11-3.2.11.jar
-json4s-core_2.11-3.2.11.jar
-json4s-jackson_2.11-3.2.11.jar
-jsr305-1.3.9.jar
-jta-1.1.jar
-jtransforms-2.4.0.jar
-jul-to-slf4j-1.7.16.jar
-kryo-shaded-3.0.3.jar
-leveldbjni-all-1.8.jar
-libfb303-0.9.3.jar
-libthrift-0.9.3.jar
-log4j-1.2.17.jar
-lz4-1.3.0.jar
-mail-1.4.7.jar
-mesos-1.0.0-shaded-protobuf.jar
-metrics-core-3.1.2.jar
-metrics-graphite-3.1.2.jar
-metrics-json-3.1.2.jar
-metrics-jvm-3.1.2.jar
-minlog-1.3.0.jar
-mx4j-3.0.2.jar
-netty-3.9.9.Final.jar
-netty-all-4.0.43.Final.jar
-objenesis-2.1.jar
-opencsv-2.3.jar
-oro-2.0.8.jar
-osgi-resource-locator-1.0.1.jar
-paranamer-2.6.jar
-parquet-column-1.8.2.jar
-parquet-common-1.8.2.jar
-parquet-encoding-1.8.2.jar
-parquet-format-2.3.1.jar
-parquet-hadoop-1.8.2.jar
-parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.8.2.jar
-pmml-model-1.2.15.jar
-pmml-schema-1.2.15.jar
-protobuf-java-2.5.0.jar
-py4j-0.10.4.jar
-pyrolite-4.13.jar
-scala-compiler-2.11.8.jar
-scala-library-2.11.8.jar
-scala-parser-combinators_2.11-1.0.4.jar
-scala-reflect-2.11.8.jar
-scala-xml_2.11-1.0.2.jar
-scalap-2.11.8.jar
-shapeless_2.11-2.0.0.jar
-slf4j-api-1.7.16.jar
-slf4j-log4j12-1.7.16.jar
-snappy-0.2.jar
-snappy-java-1.1.2.6.jar
-spire-macros_2.11-0.7.4.jar
-spire_2.11-0.7.4.jar
-stax-api-1.0-2.jar
-stax-api-1.0.1.jar
-stream-2.7.0.jar
-stringtemplate-3.2.1.jar
-super-csv-2.2.0.jar
-univocity-parsers-2.2.1.jar
-validation-api-1.1.0.Final.jar
-xbean-asm5-shaded-4.4.jar
-xmlenc-0.52.jar
-xz-1.0.jar
-zookeeper-3.4.5.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/dev/deps/spark-deps-hadoop-2.4
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
deleted file mode 100644
index d151d12..0000000
--- a/dev/deps/spark-deps-hadoop-2.4
+++ /dev/null
@@ -1,174 +0,0 @@
-JavaEWAH-0.3.2.jar
-RoaringBitmap-0.5.11.jar
-ST4-4.0.4.jar
-activation-1.1.1.jar
-antlr-2.7.7.jar
-antlr-runtime-3.4.jar
-antlr4-runtime-4.5.3.jar
-aopalliance-1.0.jar
-aopalliance-repackaged-2.4.0-b34.jar
-apache-log4j-extras-1.2.17.jar
-arpack_combined_all-0.1.jar
-avro-1.7.7.jar
-avro-ipc-1.7.7.jar
-avro-mapred-1.7.7-hadoop2.jar
-base64-2.3.8.jar
-bcprov-jdk15on-1.51.jar
-bonecp-0.8.0.RELEASE.jar
-breeze-macros_2.11-0.12.jar
-breeze_2.11-0.12.jar
-calcite-avatica-1.2.0-incubating.jar
-calcite-core-1.2.0-incubating.jar
-calcite-linq4j-1.2.0-incubating.jar
-chill-java-0.8.0.jar
-chill_2.11-0.8.0.jar
-commons-beanutils-1.7.0.jar
-commons-beanutils-core-1.8.0.jar
-commons-cli-1.2.jar
-commons-codec-1.10.jar
-commons-collections-3.2.2.jar
-commons-compiler-3.0.0.jar
-commons-compress-1.4.1.jar
-commons-configuration-1.6.jar
-commons-crypto-1.0.0.jar
-commons-dbcp-1.4.jar
-commons-digester-1.8.jar
-commons-httpclient-3.1.jar
-commons-io-2.4.jar
-commons-lang-2.6.jar
-commons-lang3-3.5.jar
-commons-logging-1.1.3.jar
-commons-math3-3.4.1.jar
-commons-net-2.2.jar
-commons-pool-1.5.4.jar
-compress-lzf-1.0.3.jar
-core-1.1.2.jar
-curator-client-2.4.0.jar
-curator-framework-2.4.0.jar
-curator-recipes-2.4.0.jar
-datanucleus-api-jdo-3.2.6.jar
-datanucleus-core-3.2.10.jar
-datanucleus-rdbms-3.2.9.jar
-derby-10.12.1.1.jar
-eigenbase-properties-1.1.5.jar
-guava-14.0.1.jar
-guice-3.0.jar
-guice-servlet-3.0.jar
-hadoop-annotations-2.4.1.jar
-hadoop-auth-2.4.1.jar
-hadoop-client-2.4.1.jar
-hadoop-common-2.4.1.jar
-hadoop-hdfs-2.4.1.jar
-hadoop-mapreduce-client-app-2.4.1.jar
-hadoop-mapreduce-client-common-2.4.1.jar
-hadoop-mapreduce-client-core-2.4.1.jar
-hadoop-mapreduce-client-jobclient-2.4.1.jar
-hadoop-mapreduce-client-shuffle-2.4.1.jar
-hadoop-yarn-api-2.4.1.jar
-hadoop-yarn-client-2.4.1.jar
-hadoop-yarn-common-2.4.1.jar
-hadoop-yarn-server-common-2.4.1.jar
-hadoop-yarn-server-web-proxy-2.4.1.jar
-hk2-api-2.4.0-b34.jar
-hk2-locator-2.4.0-b34.jar
-hk2-utils-2.4.0-b34.jar
-httpclient-4.5.2.jar
-httpcore-4.4.4.jar
-ivy-2.4.0.jar
-jackson-annotations-2.6.5.jar
-jackson-core-2.6.5.jar
-jackson-core-asl-1.9.13.jar
-jackson-databind-2.6.5.jar
-jackson-mapper-asl-1.9.13.jar
-jackson-module-paranamer-2.6.5.jar
-jackson-module-scala_2.11-2.6.5.jar
-janino-3.0.0.jar
-java-xmlbuilder-1.0.jar
-javassist-3.18.1-GA.jar
-javax.annotation-api-1.2.jar
-javax.inject-1.jar
-javax.inject-2.4.0-b34.jar
-javax.servlet-api-3.1.0.jar
-javax.ws.rs-api-2.0.1.jar
-javolution-5.5.1.jar
-jaxb-api-2.2.2.jar
-jcl-over-slf4j-1.7.16.jar
-jdo-api-3.0.1.jar
-jersey-client-2.22.2.jar
-jersey-common-2.22.2.jar
-jersey-container-servlet-2.22.2.jar
-jersey-container-servlet-core-2.22.2.jar
-jersey-guava-2.22.2.jar
-jersey-media-jaxb-2.22.2.jar
-jersey-server-2.22.2.jar
-jets3t-0.9.3.jar
-jetty-6.1.26.jar
-jetty-util-6.1.26.jar
-jline-2.12.1.jar
-joda-time-2.9.3.jar
-jodd-core-3.5.2.jar
-jpam-1.1.jar
-json4s-ast_2.11-3.2.11.jar
-json4s-core_2.11-3.2.11.jar
-json4s-jackson_2.11-3.2.11.jar
-jsr305-1.3.9.jar
-jta-1.1.jar
-jtransforms-2.4.0.jar
-jul-to-slf4j-1.7.16.jar
-kryo-shaded-3.0.3.jar
-leveldbjni-all-1.8.jar
-libfb303-0.9.3.jar
-libthrift-0.9.3.jar
-log4j-1.2.17.jar
-lz4-1.3.0.jar
-mail-1.4.7.jar
-mesos-1.0.0-shaded-protobuf.jar
-metrics-core-3.1.2.jar
-metrics-graphite-3.1.2.jar
-metrics-json-3.1.2.jar
-metrics-jvm-3.1.2.jar
-minlog-1.3.0.jar
-mx4j-3.0.2.jar
-netty-3.9.9.Final.jar
-netty-all-4.0.43.Final.jar
-objenesis-2.1.jar
-opencsv-2.3.jar
-oro-2.0.8.jar
-osgi-resource-locator-1.0.1.jar
-paranamer-2.6.jar
-parquet-column-1.8.2.jar
-parquet-common-1.8.2.jar
-parquet-encoding-1.8.2.jar
-parquet-format-2.3.1.jar
-parquet-hadoop-1.8.2.jar
-parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.8.2.jar
-pmml-model-1.2.15.jar
-pmml-schema-1.2.15.jar
-protobuf-java-2.5.0.jar
-py4j-0.10.4.jar
-pyrolite-4.13.jar
-scala-compiler-2.11.8.jar
-scala-library-2.11.8.jar
-scala-parser-combinators_2.11-1.0.4.jar
-scala-reflect-2.11.8.jar
-scala-xml_2.11-1.0.2.jar
-scalap-2.11.8.jar
-shapeless_2.11-2.0.0.jar
-slf4j-api-1.7.16.jar
-slf4j-log4j12-1.7.16.jar
-snappy-0.2.jar
-snappy-java-1.1.2.6.jar
-spire-macros_2.11-0.7.4.jar
-spire_2.11-0.7.4.jar
-stax-api-1.0-2.jar
-stax-api-1.0.1.jar
-stream-2.7.0.jar
-stringtemplate-3.2.1.jar
-super-csv-2.2.0.jar
-univocity-parsers-2.2.1.jar
-validation-api-1.1.0.Final.jar
-xbean-asm5-shaded-4.4.jar
-xmlenc-0.52.jar
-xz-1.0.jar
-zookeeper-3.4.5.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/dev/make-distribution.sh
----------------------------------------------------------------------
diff --git a/dev/make-distribution.sh b/dev/make-distribution.sh
index 6fb25f3..dc8dfb9 100755
--- a/dev/make-distribution.sh
+++ b/dev/make-distribution.sh
@@ -52,20 +52,6 @@ function exit_with_usage {
 # Parse arguments
 while (( "$#" )); do
   case $1 in
-    --hadoop)
-      echo "Error: '--hadoop' is no longer supported:"
-      echo "Error: use Maven profiles and options -Dhadoop.version and -Dyarn.version instead."
-      echo "Error: Related profiles include hadoop-2.2, hadoop-2.3, hadoop-2.4, hadoop-2.6 and hadoop-2.7."
-      exit_with_usage
-      ;;
-    --with-yarn)
-      echo "Error: '--with-yarn' is no longer supported, use Maven option -Pyarn"
-      exit_with_usage
-      ;;
-    --with-hive)
-      echo "Error: '--with-hive' is no longer supported, use Maven options -Phive and -Phive-thriftserver"
-      exit_with_usage
-      ;;
     --tgz)
       MAKE_TGZ=true
       ;;

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/dev/run-tests.py
----------------------------------------------------------------------
diff --git a/dev/run-tests.py b/dev/run-tests.py
index ab285ac..ef9ab1a 100755
--- a/dev/run-tests.py
+++ b/dev/run-tests.py
@@ -305,9 +305,6 @@ def get_hadoop_profiles(hadoop_version):
     """
 
     sbt_maven_hadoop_profiles = {
-        "hadoop2.2": ["-Phadoop-2.2"],
-        "hadoop2.3": ["-Phadoop-2.3"],
-        "hadoop2.4": ["-Phadoop-2.4"],
         "hadoop2.6": ["-Phadoop-2.6"],
         "hadoop2.7": ["-Phadoop-2.7"],
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/dev/test-dependencies.sh
----------------------------------------------------------------------
diff --git a/dev/test-dependencies.sh b/dev/test-dependencies.sh
index 4014f42..eb43f22 100755
--- a/dev/test-dependencies.sh
+++ b/dev/test-dependencies.sh
@@ -32,9 +32,6 @@ export LC_ALL=C
 HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pyarn -Phive"
 MVN="build/mvn"
 HADOOP_PROFILES=(
-    hadoop-2.2
-    hadoop-2.3
-    hadoop-2.4
     hadoop-2.6
     hadoop-2.7
 )

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/docs/building-spark.md
----------------------------------------------------------------------
diff --git a/docs/building-spark.md b/docs/building-spark.md
index ffe356f..690c656 100644
--- a/docs/building-spark.md
+++ b/docs/building-spark.md
@@ -48,7 +48,7 @@ You can fix these problems by setting the `MAVEN_OPTS` variable as discussed bef
 
 Spark now comes packaged with a self-contained Maven installation to ease building and deployment of Spark from source located under the `build/` directory. This script will automatically download and setup all necessary build requirements ([Maven](https://maven.apache.org/), [Scala](http://www.scala-lang.org/), and [Zinc](https://github.com/typesafehub/zinc)) locally within the `build/` directory itself. It honors any `mvn` binary if present already, however, will pull down its own copy of Scala and Zinc regardless to ensure proper version requirements are met. `build/mvn` execution acts as a pass through to the `mvn` call allowing easy transition from previous build methods. As an example, one can build a version of Spark as follows:
 
-    ./build/mvn -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.0 -DskipTests clean package
+    ./build/mvn -DskipTests clean package
 
 Other build examples can be found below.
 
@@ -63,48 +63,21 @@ with Maven profile settings and so on like the direct Maven build. Example:
 
 This will build Spark distribution along with Python pip and R packages. For more information on usage, run `./dev/make-distribution.sh --help`
 
-## Specifying the Hadoop Version
+## Specifying the Hadoop Version and Enabling YARN
 
-Because HDFS is not protocol-compatible across versions, if you want to read from HDFS, you'll need to build Spark against the specific HDFS version in your environment. You can do this through the `hadoop.version` property. If unset, Spark will build against Hadoop 2.2.0 by default. Note that certain build profiles are required for particular Hadoop versions:
+You can specify the exact version of Hadoop to compile against through the `hadoop.version` property. 
+If unset, Spark will build against Hadoop 2.6.X by default.
 
-<table class="table">
-  <thead>
-    <tr><th>Hadoop version</th><th>Profile required</th></tr>
-  </thead>
-  <tbody>
-    <tr><td>2.2.x</td><td>hadoop-2.2</td></tr>
-    <tr><td>2.3.x</td><td>hadoop-2.3</td></tr>
-    <tr><td>2.4.x</td><td>hadoop-2.4</td></tr>
-    <tr><td>2.6.x</td><td>hadoop-2.6</td></tr>
-    <tr><td>2.7.x and later 2.x</td><td>hadoop-2.7</td></tr>
-  </tbody>
-</table>
-
-Note that support for versions of Hadoop before 2.6 are deprecated as of Spark 2.1.0 and may be
-removed in Spark 2.2.0.
-
-
-You can enable the `yarn` profile and optionally set the `yarn.version` property if it is different from `hadoop.version`. Spark only supports YARN versions 2.2.0 and later.
+You can enable the `yarn` profile and optionally set the `yarn.version` property if it is different 
+from `hadoop.version`.
 
 Examples:
 
-    # Apache Hadoop 2.2.X
-    ./build/mvn -Pyarn -Phadoop-2.2 -DskipTests clean package
-
-    # Apache Hadoop 2.3.X
-    ./build/mvn -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -DskipTests clean package
-
-    # Apache Hadoop 2.4.X or 2.5.X
-    ./build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package
-
     # Apache Hadoop 2.6.X
-    ./build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -DskipTests clean package
+    ./build/mvn -Pyarn -DskipTests clean package
 
     # Apache Hadoop 2.7.X and later
-    ./build/mvn -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.0 -DskipTests clean package
-
-    # Different versions of HDFS and YARN.
-    ./build/mvn -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -Dyarn.version=2.2.0 -DskipTests clean package
+    ./build/mvn -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.3 -DskipTests clean package
 
 ## Building With Hive and JDBC Support
 
@@ -112,8 +85,8 @@ To enable Hive integration for Spark SQL along with its JDBC server and CLI,
 add the `-Phive` and `Phive-thriftserver` profiles to your existing build options.
 By default Spark will build with Hive 1.2.1 bindings.
 
-    # Apache Hadoop 2.4.X with Hive 1.2.1 support
-    ./build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package
+    # With Hive 1.2.1 support
+    ./build/mvn -Pyarn -Phive -Phive-thriftserver -DskipTests clean package
 
 ## Packaging without Hadoop Dependencies for YARN
 
@@ -132,7 +105,7 @@ like ZooKeeper and Hadoop itself.
 To produce a Spark package compiled with Scala 2.10, use the `-Dscala-2.10` property:
 
     ./dev/change-scala-version.sh 2.10
-    ./build/mvn -Pyarn -Phadoop-2.4 -Dscala-2.10 -DskipTests clean package
+    ./build/mvn -Pyarn -Dscala-2.10 -DskipTests clean package
 
 Note that support for Scala 2.10 is deprecated as of Spark 2.1.0 and may be removed in Spark 2.2.0.
 
@@ -192,7 +165,7 @@ compilation. More advanced developers may wish to use SBT.
 The SBT build is derived from the Maven POM files, and so the same Maven profiles and variables
 can be set to control the SBT build. For example:
 
-    ./build/sbt -Pyarn -Phadoop-2.3 package
+    ./build/sbt package
 
 To avoid the overhead of launching sbt each time you need to re-compile, you can launch sbt
 in interactive mode by running `build/sbt`, and then run all build commands at the command
@@ -225,7 +198,7 @@ Note that tests should not be run as root or an admin user.
 
 The following is an example of a command to run the tests:
 
-    ./build/mvn -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver test
+    ./build/mvn test
 
 The ScalaTest plugin also supports running only a specific Scala test suite as follows:
 
@@ -240,16 +213,16 @@ or a Java test:
 
 The following is an example of a command to run the tests:
 
-    ./build/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver test
+    ./build/sbt test
 
 To run only a specific test suite as follows:
 
-    ./build/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver "test-only org.apache.spark.repl.ReplSuite"
-    ./build/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver "test-only org.apache.spark.repl.*"
+    ./build/sbt "test-only org.apache.spark.repl.ReplSuite"
+    ./build/sbt "test-only org.apache.spark.repl.*"
 
 To run test suites of a specific sub project as follows:
 
-    ./build/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver core/test
+    ./build/sbt core/test
 
 ## Running Java 8 Test Suites
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 051f64e..c95f627 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -337,7 +337,7 @@ To use a custom metrics.properties for the application master and executors, upd
   <td>
   Defines the validity interval for AM failure tracking.
   If the AM has been running for at least the defined interval, the AM failure count will be reset.
-  This feature is not enabled if not configured, and only supported in Hadoop 2.6+.
+  This feature is not enabled if not configured.
   </td>
 </tr>
 <tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3679b4e..ac61a57 100644
--- a/pom.xml
+++ b/pom.xml
@@ -122,12 +122,12 @@
     <sbt.project.name>spark</sbt.project.name>
     <slf4j.version>1.7.16</slf4j.version>
     <log4j.version>1.2.17</log4j.version>
-    <hadoop.version>2.2.0</hadoop.version>
+    <hadoop.version>2.6.5</hadoop.version>
     <protobuf.version>2.5.0</protobuf.version>
     <yarn.version>${hadoop.version}</yarn.version>
     <flume.version>1.6.0</flume.version>
-    <zookeeper.version>3.4.5</zookeeper.version>
-    <curator.version>2.4.0</curator.version>
+    <zookeeper.version>3.4.6</zookeeper.version>
+    <curator.version>2.6.0</curator.version>
     <hive.group>org.spark-project.hive</hive.group>
     <!-- Version used in Maven Hive dependency -->
     <hive.version>1.2.1.spark2</hive.version>
@@ -144,7 +144,7 @@
     <codahale.metrics.version>3.1.2</codahale.metrics.version>
     <avro.version>1.7.7</avro.version>
     <avro.mapred.classifier>hadoop2</avro.mapred.classifier>
-    <jets3t.version>0.7.1</jets3t.version>
+    <jets3t.version>0.9.3</jets3t.version>
     <aws.kinesis.client.version>1.6.2</aws.kinesis.client.version>
     <!-- the producer is used in tests -->
     <aws.kinesis.producer.version>0.10.2</aws.kinesis.producer.version>
@@ -2548,43 +2548,14 @@
     -->
 
     <profile>
-      <id>hadoop-2.2</id>
-    <!-- SPARK-7249: Default hadoop profile. Uses global properties. -->
-    </profile>
-
-    <profile>
-      <id>hadoop-2.3</id>
-      <properties>
-        <hadoop.version>2.3.0</hadoop.version>
-        <jets3t.version>0.9.3</jets3t.version>
-      </properties>
-    </profile>
-
-    <profile>
-      <id>hadoop-2.4</id>
-      <properties>
-        <hadoop.version>2.4.1</hadoop.version>
-        <jets3t.version>0.9.3</jets3t.version>
-      </properties>
-    </profile>
-
-    <profile>
       <id>hadoop-2.6</id>
-      <properties>
-        <hadoop.version>2.6.5</hadoop.version>
-        <jets3t.version>0.9.3</jets3t.version>
-        <zookeeper.version>3.4.6</zookeeper.version>
-        <curator.version>2.6.0</curator.version>
-      </properties>
+      <!-- Default hadoop profile. Uses global properties. -->
     </profile>
 
     <profile>
       <id>hadoop-2.7</id>
       <properties>
         <hadoop.version>2.7.3</hadoop.version>
-        <jets3t.version>0.9.3</jets3t.version>
-        <zookeeper.version>3.4.6</zookeeper.version>
-        <curator.version>2.6.0</curator.version>
       </properties>
     </profile>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 74edd53..bcc00fa 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -86,43 +86,11 @@ object SparkBuild extends PomBuild {
 
   val projectsMap: Map[String, Seq[Setting[_]]] = Map.empty
 
-  // Provides compatibility for older versions of the Spark build
-  def backwardCompatibility = {
-    import scala.collection.mutable
-    var profiles: mutable.Seq[String] = mutable.Seq("sbt")
-    // scalastyle:off println
-    if (Properties.envOrNone("SPARK_GANGLIA_LGPL").isDefined) {
-      println("NOTE: SPARK_GANGLIA_LGPL is deprecated, please use -Pspark-ganglia-lgpl flag.")
-      profiles ++= Seq("spark-ganglia-lgpl")
-    }
-    if (Properties.envOrNone("SPARK_HIVE").isDefined) {
-      println("NOTE: SPARK_HIVE is deprecated, please use -Phive and -Phive-thriftserver flags.")
-      profiles ++= Seq("hive", "hive-thriftserver")
-    }
-    Properties.envOrNone("SPARK_HADOOP_VERSION") match {
-      case Some(v) =>
-        println("NOTE: SPARK_HADOOP_VERSION is deprecated, please use -Dhadoop.version=" + v)
-        System.setProperty("hadoop.version", v)
-      case None =>
-    }
-    if (Properties.envOrNone("SPARK_YARN").isDefined) {
-      println("NOTE: SPARK_YARN is deprecated, please use -Pyarn flag.")
-      profiles ++= Seq("yarn")
-    }
-    // scalastyle:on println
-    profiles
-  }
-
   override val profiles = {
     val profiles = Properties.envOrNone("SBT_MAVEN_PROFILES") match {
-    case None => backwardCompatibility
-    case Some(v) =>
-      if (backwardCompatibility.nonEmpty)
-        // scalastyle:off println
-        println("Note: We ignore environment variables, when use of profile is detected in " +
-          "conjunction with environment variable.")
-        // scalastyle:on println
-      v.split("(\\s+|,)").filterNot(_.isEmpty).map(_.trim.replaceAll("-P", "")).toSeq
+      case None => Seq("sbt")
+      case Some(v) =>
+        v.split("(\\s+|,)").filterNot(_.isEmpty).map(_.trim.replaceAll("-P", "")).toSeq
     }
 
     if (System.getProperty("scala-2.10") == "") {

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/resource-managers/yarn/pom.xml
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/pom.xml b/resource-managers/yarn/pom.xml
index f090d24..92a33de 100644
--- a/resource-managers/yarn/pom.xml
+++ b/resource-managers/yarn/pom.xml
@@ -125,34 +125,12 @@
       <scope>test</scope>
     </dependency>
 
-     <!--
-    See SPARK-3710. hadoop-yarn-server-tests in Hadoop 2.2 fails to pull some needed
-    dependencies, so they need to be added manually for the tests to work.
-    -->
-
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-server-tests</artifactId>
       <classifier>tests</classifier>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-core</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.mortbay.jetty</groupId>
-      <artifactId>jetty</artifactId>
-      <version>6.1.26</version>
-      <exclusions>
-       <exclusion>
-        <groupId>org.mortbay.jetty</groupId>
-         <artifactId>servlet-api</artifactId>
-       </exclusion>
-      </exclusions>
-      <scope>test</scope>
-     </dependency>
 
      <!--
        Jersey 1 dependencies only required for YARN integration testing. Creating a YARN cluster

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index f79c66b..9df43ae 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.yarn.api._
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException
 import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
 
 import org.apache.spark._
@@ -460,17 +461,15 @@ private[spark] class ApplicationMaster(
             }
             failureCount = 0
           } catch {
-            case i: InterruptedException =>
+            case i: InterruptedException => // do nothing
+            case e: ApplicationAttemptNotFoundException =>
+              failureCount += 1
+              logError("Exception from Reporter thread.", e)
+              finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE,
+                e.getMessage)
             case e: Throwable =>
               failureCount += 1
-              // this exception was introduced in hadoop 2.4 and this code would not compile
-              // with earlier versions if we refer it directly.
-              if ("org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException" ==
-                e.getClass().getName()) {
-                logError("Exception from Reporter thread.", e)
-                finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE,
-                  e.getMessage)
-              } else if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
+              if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
                 finish(FinalApplicationStatus.FAILED,
                   ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
                     s"$failureCount time(s) from Reporter thread.")

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index b212b0e..635c1ac 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -26,7 +26,6 @@ import java.util.zip.{ZipEntry, ZipOutputStream}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map}
-import scala.util.{Failure, Success, Try}
 import scala.util.control.NonFatal
 
 import com.google.common.base.Objects
@@ -47,7 +46,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
 import org.apache.hadoop.yarn.util.Records
 
-import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException}
+import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager
@@ -216,18 +215,7 @@ private[spark] class Client(
     appContext.setApplicationType("SPARK")
 
     sparkConf.get(APPLICATION_TAGS).foreach { tags =>
-      try {
-        // The setApplicationTags method was only introduced in Hadoop 2.4+, so we need to use
-        // reflection to set it, printing a warning if a tag was specified but the YARN version
-        // doesn't support it.
-        val method = appContext.getClass().getMethod(
-          "setApplicationTags", classOf[java.util.Set[String]])
-        method.invoke(appContext, new java.util.HashSet[String](tags.asJava))
-      } catch {
-        case e: NoSuchMethodException =>
-          logWarning(s"Ignoring ${APPLICATION_TAGS.key} because this version of " +
-            "YARN does not support it")
-      }
+      appContext.setApplicationTags(new java.util.HashSet[String](tags.asJava))
     }
     sparkConf.get(MAX_APP_ATTEMPTS) match {
       case Some(v) => appContext.setMaxAppAttempts(v)
@@ -236,15 +224,7 @@ private[spark] class Client(
     }
 
     sparkConf.get(AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval =>
-      try {
-        val method = appContext.getClass().getMethod(
-          "setAttemptFailuresValidityInterval", classOf[Long])
-        method.invoke(appContext, interval: java.lang.Long)
-      } catch {
-        case e: NoSuchMethodException =>
-          logWarning(s"Ignoring ${AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key} because " +
-            "the version of YARN does not support it")
-      }
+      appContext.setAttemptFailuresValidityInterval(interval)
     }
 
     val capability = Records.newRecord(classOf[Resource])
@@ -253,53 +233,24 @@ private[spark] class Client(
 
     sparkConf.get(AM_NODE_LABEL_EXPRESSION) match {
       case Some(expr) =>
-        try {
-          val amRequest = Records.newRecord(classOf[ResourceRequest])
-          amRequest.setResourceName(ResourceRequest.ANY)
-          amRequest.setPriority(Priority.newInstance(0))
-          amRequest.setCapability(capability)
-          amRequest.setNumContainers(1)
-          val method = amRequest.getClass.getMethod("setNodeLabelExpression", classOf[String])
-          method.invoke(amRequest, expr)
-
-          val setResourceRequestMethod =
-            appContext.getClass.getMethod("setAMContainerResourceRequest", classOf[ResourceRequest])
-          setResourceRequestMethod.invoke(appContext, amRequest)
-        } catch {
-          case e: NoSuchMethodException =>
-            logWarning(s"Ignoring ${AM_NODE_LABEL_EXPRESSION.key} because the version " +
-              "of YARN does not support it")
-            appContext.setResource(capability)
-        }
+        val amRequest = Records.newRecord(classOf[ResourceRequest])
+        amRequest.setResourceName(ResourceRequest.ANY)
+        amRequest.setPriority(Priority.newInstance(0))
+        amRequest.setCapability(capability)
+        amRequest.setNumContainers(1)
+        amRequest.setNodeLabelExpression(expr)
+        appContext.setAMContainerResourceRequest(amRequest)
       case None =>
         appContext.setResource(capability)
     }
 
     sparkConf.get(ROLLED_LOG_INCLUDE_PATTERN).foreach { includePattern =>
-      try {
-        val logAggregationContext = Records.newRecord(
-          Utils.classForName("org.apache.hadoop.yarn.api.records.LogAggregationContext"))
-          .asInstanceOf[Object]
-
-        val setRolledLogsIncludePatternMethod =
-          logAggregationContext.getClass.getMethod("setRolledLogsIncludePattern", classOf[String])
-        setRolledLogsIncludePatternMethod.invoke(logAggregationContext, includePattern)
-
-        sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach { excludePattern =>
-          val setRolledLogsExcludePatternMethod =
-            logAggregationContext.getClass.getMethod("setRolledLogsExcludePattern", classOf[String])
-          setRolledLogsExcludePatternMethod.invoke(logAggregationContext, excludePattern)
-        }
-
-        val setLogAggregationContextMethod =
-          appContext.getClass.getMethod("setLogAggregationContext",
-            Utils.classForName("org.apache.hadoop.yarn.api.records.LogAggregationContext"))
-        setLogAggregationContextMethod.invoke(appContext, logAggregationContext)
-      } catch {
-        case NonFatal(e) =>
-          logWarning(s"Ignoring ${ROLLED_LOG_INCLUDE_PATTERN.key} because the version of YARN " +
-            s"does not support it", e)
+      val logAggregationContext = Records.newRecord(classOf[LogAggregationContext])
+      logAggregationContext.setRolledLogsIncludePattern(includePattern)
+      sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach { excludePattern =>
+        logAggregationContext.setRolledLogsExcludePattern(excludePattern)
       }
+      appContext.setLogAggregationContext(logAggregationContext)
     }
 
     appContext
@@ -786,14 +737,12 @@ private[spark] class Client(
     val pythonPath = new ListBuffer[String]()
     val (pyFiles, pyArchives) = sparkConf.get(PY_FILES).partition(_.endsWith(".py"))
     if (pyFiles.nonEmpty) {
-      pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
-        LOCALIZED_PYTHON_DIR)
+      pythonPath += buildPath(Environment.PWD.$$(), LOCALIZED_PYTHON_DIR)
     }
     (pySparkArchives ++ pyArchives).foreach { path =>
       val uri = Utils.resolveURI(path)
       if (uri.getScheme != LOCAL_SCHEME) {
-        pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
-          new Path(uri).getName())
+        pythonPath += buildPath(Environment.PWD.$$(), new Path(uri).getName())
       } else {
         pythonPath += uri.getPath()
       }
@@ -802,7 +751,7 @@ private[spark] class Client(
     // Finally, update the Spark config to propagate PYTHONPATH to the AM and executors.
     if (pythonPath.nonEmpty) {
       val pythonPathStr = (sys.env.get("PYTHONPATH") ++ pythonPath)
-        .mkString(YarnSparkHadoopUtil.getClassPathSeparator)
+        .mkString(ApplicationConstants.CLASS_PATH_SEPARATOR)
       env("PYTHONPATH") = pythonPathStr
       sparkConf.setExecutorEnv("PYTHONPATH", pythonPathStr)
     }
@@ -882,10 +831,7 @@ private[spark] class Client(
     // Add Xmx for AM memory
     javaOpts += "-Xmx" + amMemory + "m"
 
-    val tmpDir = new Path(
-      YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
-      YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR
-    )
+    val tmpDir = new Path(Environment.PWD.$$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
     javaOpts += "-Djava.io.tmpdir=" + tmpDir
 
     // TODO: Remove once cpuset version is pushed out.
@@ -982,15 +928,12 @@ private[spark] class Client(
       Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
     }
     val amArgs =
-      Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++
-        userArgs ++ Seq(
-          "--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
-            LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
+      Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++
+      Seq("--properties-file", buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
 
     // Command for the ApplicationMaster
-    val commands = prefixEnv ++ Seq(
-        YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server"
-      ) ++
+    val commands = prefixEnv ++
+      Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
       javaOpts ++ amArgs ++
       Seq(
         "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
@@ -1265,59 +1208,28 @@ private object Client extends Logging {
   private[yarn] def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String])
     : Unit = {
     val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf)
-    for (c <- classPathElementsToAdd.flatten) {
+    classPathElementsToAdd.foreach { c =>
       YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, c.trim)
     }
   }
 
-  private def getYarnAppClasspath(conf: Configuration): Option[Seq[String]] =
+  private def getYarnAppClasspath(conf: Configuration): Seq[String] =
     Option(conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) match {
-      case Some(s) => Some(s.toSeq)
+      case Some(s) => s.toSeq
       case None => getDefaultYarnApplicationClasspath
     }
 
-  private def getMRAppClasspath(conf: Configuration): Option[Seq[String]] =
+  private def getMRAppClasspath(conf: Configuration): Seq[String] =
     Option(conf.getStrings("mapreduce.application.classpath")) match {
-      case Some(s) => Some(s.toSeq)
+      case Some(s) => s.toSeq
       case None => getDefaultMRApplicationClasspath
     }
 
-  private[yarn] def getDefaultYarnApplicationClasspath: Option[Seq[String]] = {
-    val triedDefault = Try[Seq[String]] {
-      val field = classOf[YarnConfiguration].getField("DEFAULT_YARN_APPLICATION_CLASSPATH")
-      val value = field.get(null).asInstanceOf[Array[String]]
-      value.toSeq
-    } recoverWith {
-      case e: NoSuchFieldException => Success(Seq.empty[String])
-    }
-
-    triedDefault match {
-      case f: Failure[_] =>
-        logError("Unable to obtain the default YARN Application classpath.", f.exception)
-      case s: Success[Seq[String]] =>
-        logDebug(s"Using the default YARN application classpath: ${s.get.mkString(",")}")
-    }
-
-    triedDefault.toOption
-  }
-
-  private[yarn] def getDefaultMRApplicationClasspath: Option[Seq[String]] = {
-    val triedDefault = Try[Seq[String]] {
-      val field = classOf[MRJobConfig].getField("DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH")
-      StringUtils.getStrings(field.get(null).asInstanceOf[String]).toSeq
-    } recoverWith {
-      case e: NoSuchFieldException => Success(Seq.empty[String])
-    }
+  private[yarn] def getDefaultYarnApplicationClasspath: Seq[String] =
+    YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH.toSeq
 
-    triedDefault match {
-      case f: Failure[_] =>
-        logError("Unable to obtain the default MR Application classpath.", f.exception)
-      case s: Success[Seq[String]] =>
-        logDebug(s"Using the default MR application classpath: ${s.get.mkString(",")}")
-    }
-
-    triedDefault.toOption
-  }
+  private[yarn] def getDefaultMRApplicationClasspath: Seq[String] =
+    StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH).toSeq
 
   /**
    * Populate the classpath entry in the given environment map.
@@ -1339,11 +1251,9 @@ private object Client extends Logging {
       addClasspathEntry(getClusterPath(sparkConf, cp), env)
     }
 
-    addClasspathEntry(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env)
+    addClasspathEntry(Environment.PWD.$$(), env)
 
-    addClasspathEntry(
-      YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR +
-        LOCALIZED_CONF_DIR, env)
+    addClasspathEntry(Environment.PWD.$$() + Path.SEPARATOR + LOCALIZED_CONF_DIR, env)
 
     if (sparkConf.get(USER_CLASS_PATH_FIRST)) {
       // in order to properly add the app jar when user classpath is first
@@ -1369,9 +1279,8 @@ private object Client extends Logging {
     }
 
     // Add the Spark jars to the classpath, depending on how they were distributed.
-    addClasspathEntry(buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
-      LOCALIZED_LIB_DIR, "*"), env)
-    if (!sparkConf.get(SPARK_ARCHIVE).isDefined) {
+    addClasspathEntry(buildPath(Environment.PWD.$$(), LOCALIZED_LIB_DIR, "*"), env)
+    if (sparkConf.get(SPARK_ARCHIVE).isEmpty) {
       sparkConf.get(SPARK_JARS).foreach { jars =>
         jars.filter(isLocalUri).foreach { jar =>
           addClasspathEntry(getClusterPath(sparkConf, jar), env)
@@ -1430,13 +1339,11 @@ private object Client extends Logging {
     if (uri != null && uri.getScheme == LOCAL_SCHEME) {
       addClasspathEntry(getClusterPath(conf, uri.getPath), env)
     } else if (fileName != null) {
-      addClasspathEntry(buildPath(
-        YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), fileName), env)
+      addClasspathEntry(buildPath(Environment.PWD.$$(), fileName), env)
     } else if (uri != null) {
       val localPath = getQualifiedLocalPath(uri, hadoopConf)
       val linkName = Option(uri.getFragment()).getOrElse(localPath.getName())
-      addClasspathEntry(buildPath(
-        YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), linkName), env)
+      addClasspathEntry(buildPath(Environment.PWD.$$(), linkName), env)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 868c2ed..b55b4b1 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -152,10 +152,7 @@ private[yarn] class ExecutorRunnable(
     }
 
     javaOpts += "-Djava.io.tmpdir=" +
-      new Path(
-        YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
-        YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR
-      )
+      new Path(Environment.PWD.$$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
 
     // Certain configs need to be passed here because they are needed before the Executor
     // registers with the Scheduler and transfers the spark configs. Since the Executor backend
@@ -206,9 +203,8 @@ private[yarn] class ExecutorRunnable(
     }.toSeq
 
     YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts)
-    val commands = prefixEnv ++ Seq(
-      YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java",
-      "-server") ++
+    val commands = prefixEnv ++
+      Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
       javaOpts ++
       Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
         "--driver-url", masterAddress,

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index e498932..8a76dbd 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -150,20 +150,6 @@ private[yarn] class YarnAllocator(
 
   private val labelExpression = sparkConf.get(EXECUTOR_NODE_LABEL_EXPRESSION)
 
-  // ContainerRequest constructor that can take a node label expression. We grab it through
-  // reflection because it's only available in later versions of YARN.
-  private val nodeLabelConstructor = labelExpression.flatMap { expr =>
-    try {
-      Some(classOf[ContainerRequest].getConstructor(classOf[Resource],
-        classOf[Array[String]], classOf[Array[String]], classOf[Priority], classOf[Boolean],
-        classOf[String]))
-    } catch {
-      case e: NoSuchMethodException =>
-        logWarning(s"Node label expression $expr will be ignored because YARN version on" +
-          " classpath does not support it.")
-        None
-    }
-  }
 
   // A map to store preferred hostname and possible task numbers running on it.
   private var hostToLocalTaskCounts: Map[String, Int] = Map.empty
@@ -414,10 +400,7 @@ private[yarn] class YarnAllocator(
       resource: Resource,
       nodes: Array[String],
       racks: Array[String]): ContainerRequest = {
-    nodeLabelConstructor.map { constructor =>
-      constructor.newInstance(resource, nodes, racks, RM_REQUEST_PRIORITY, true: java.lang.Boolean,
-        labelExpression.orNull)
-    }.getOrElse(new ContainerRequest(resource, nodes, racks, RM_REQUEST_PRIORITY))
+    new ContainerRequest(resource, nodes, racks, RM_REQUEST_PRIORITY, true, labelExpression.orNull)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index 53df11e..163dfb5 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -17,12 +17,8 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.util.{List => JList}
-
 import scala.collection.JavaConverters._
-import scala.util.Try
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
@@ -99,24 +95,11 @@ private[spark] class YarnRMClient extends Logging {
   def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): Map[String, String] = {
     // Figure out which scheme Yarn is using. Note the method seems to have been added after 2.2,
     // so not all stable releases have it.
-    val prefix = Try(classOf[WebAppUtils].getMethod("getHttpSchemePrefix", classOf[Configuration])
-      .invoke(null, conf).asInstanceOf[String]).getOrElse("http://")
-
-    // If running a new enough Yarn, use the HA-aware API for retrieving the RM addresses.
-    try {
-      val method = classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter",
-        classOf[Configuration])
-      val proxies = method.invoke(null, conf).asInstanceOf[JList[String]]
-      val hosts = proxies.asScala.map { proxy => proxy.split(":")(0) }
-      val uriBases = proxies.asScala.map { proxy => prefix + proxy + proxyBase }
-      Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(","))
-    } catch {
-      case e: NoSuchMethodException =>
-        val proxy = WebAppUtils.getProxyHostAndPort(conf)
-        val parts = proxy.split(":")
-        val uriBase = prefix + proxy + proxyBase
-        Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase)
-    }
+    val prefix = WebAppUtils.getHttpSchemePrefix(conf)
+    val proxies = WebAppUtils.getProxyHostsAndPortsForAmFilter(conf)
+    val hosts = proxies.asScala.map(_.split(":").head)
+    val uriBases = proxies.asScala.map { proxy => prefix + proxy + proxyBase }
+    Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(","))
   }
 
   /** Returns the maximum number of attempts to register the AM. */
@@ -124,12 +107,10 @@ private[spark] class YarnRMClient extends Logging {
     val sparkMaxAttempts = sparkConf.get(MAX_APP_ATTEMPTS).map(_.toInt)
     val yarnMaxAttempts = yarnConf.getInt(
       YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
-    val retval: Int = sparkMaxAttempts match {
+    sparkMaxAttempts match {
       case Some(x) => if (x <= yarnMaxAttempts) x else yarnMaxAttempts
       case None => yarnMaxAttempts
     }
-
-    retval
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e8d3fca4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index cc53b1b..9357885 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -17,13 +17,11 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.io.File
 import java.nio.charset.StandardCharsets.UTF_8
 import java.util.regex.Matcher
 import java.util.regex.Pattern
 
 import scala.collection.mutable.{HashMap, ListBuffer}
-import scala.util.Try
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.Text
@@ -31,7 +29,6 @@ import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.security.Credentials
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.ApplicationConstants
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority}
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.util.ConverterUtils
@@ -137,7 +134,12 @@ object YarnSparkHadoopUtil {
    * If the map already contains this key, append the value to the existing value instead.
    */
   def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit = {
-    val newValue = if (env.contains(key)) { env(key) + getClassPathSeparator  + value } else value
+    val newValue =
+      if (env.contains(key)) {
+        env(key) + ApplicationConstants.CLASS_PATH_SEPARATOR  + value
+      } else {
+        value
+      }
     env.put(key, newValue)
   }
 
@@ -156,8 +158,8 @@ object YarnSparkHadoopUtil {
         while (m.find()) {
           val variable = m.group(1)
           var replace = ""
-          if (env.get(variable) != None) {
-            replace = env.get(variable).get
+          if (env.contains(variable)) {
+            replace = env(variable)
           } else {
             // if this key is not configured for the child .. get it from the env
             replace = System.getenv(variable)
@@ -235,13 +237,11 @@ object YarnSparkHadoopUtil {
         YarnCommandBuilderUtils.quoteForBatchScript(arg)
       } else {
         val escaped = new StringBuilder("'")
-        for (i <- 0 to arg.length() - 1) {
-          arg.charAt(i) match {
-            case '$' => escaped.append("\\$")
-            case '"' => escaped.append("\\\"")
-            case '\'' => escaped.append("'\\''")
-            case c => escaped.append(c)
-          }
+        arg.foreach {
+          case '$' => escaped.append("\\$")
+          case '"' => escaped.append("\\\"")
+          case '\'' => escaped.append("'\\''")
+          case c => escaped.append(c)
         }
         escaped.append("'").toString()
       }
@@ -263,33 +263,6 @@ object YarnSparkHadoopUtil {
   }
 
   /**
-   * Expand environment variable using Yarn API.
-   * If environment.$$() is implemented, return the result of it.
-   * Otherwise, return the result of environment.$()
-   * Note: $$() is added in Hadoop 2.4.
-   */
-  private lazy val expandMethod =
-    Try(classOf[Environment].getMethod("$$"))
-      .getOrElse(classOf[Environment].getMethod("$"))
-
-  def expandEnvironment(environment: Environment): String =
-    expandMethod.invoke(environment).asInstanceOf[String]
-
-  /**
-   * Get class path separator using Yarn API.
-   * If ApplicationConstants.CLASS_PATH_SEPARATOR is implemented, return it.
-   * Otherwise, return File.pathSeparator
-   * Note: CLASS_PATH_SEPARATOR is added in Hadoop 2.4.
-   */
-  private lazy val classPathSeparatorField =
-    Try(classOf[ApplicationConstants].getField("CLASS_PATH_SEPARATOR"))
-      .getOrElse(classOf[File].getField("pathSeparator"))
-
-  def getClassPathSeparator(): String = {
-    classPathSeparatorField.get(null).asInstanceOf[String]
-  }
-
-  /**
    * Getting the initial target number of executors depends on whether dynamic allocation is
    * enabled.
    * If not using dynamic allocation it gets the number of executors requested by the user.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org