You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/01/02 14:14:57 UTC

[2/2] spark git commit: [SPARK-12481][CORE][STREAMING][SQL] Remove usage of Hadoop deprecated APIs and reflection that supported 1.x

[SPARK-12481][CORE][STREAMING][SQL] Remove usage of Hadoop deprecated APIs and reflection that supported 1.x

Remove use of deprecated Hadoop APIs now that 2.2+ is required

Author: Sean Owen <so...@cloudera.com>

Closes #10446 from srowen/SPARK-12481.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/15bd7362
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/15bd7362
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/15bd7362

Branch: refs/heads/master
Commit: 15bd73627e04591fd13667b4838c9098342db965
Parents: 94f7a12
Author: Sean Owen <so...@cloudera.com>
Authored: Sat Jan 2 13:15:53 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Sat Jan 2 13:15:53 2016 +0000

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 16 ++---
 .../org/apache/spark/SparkHadoopWriter.scala    | 20 +++---
 .../apache/spark/deploy/SparkHadoopUtil.scala   | 41 ++----------
 .../deploy/history/FsHistoryProvider.scala      | 16 ++---
 .../input/FixedLengthBinaryInputFormat.scala    |  3 +-
 .../input/FixedLengthBinaryRecordReader.scala   |  7 +-
 .../apache/spark/input/PortableDataStream.scala |  7 +-
 .../spark/input/WholeTextFileInputFormat.scala  |  2 +-
 .../spark/input/WholeTextFileRecordReader.scala |  5 +-
 .../spark/mapred/SparkHadoopMapRedUtil.scala    | 51 +-------------
 .../mapreduce/SparkHadoopMapReduceUtil.scala    | 68 -------------------
 .../org/apache/spark/rdd/BinaryFileRDD.scala    |  4 +-
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |  3 +-
 .../org/apache/spark/rdd/NewHadoopRDD.scala     | 12 ++--
 .../org/apache/spark/rdd/PairRDDFunctions.scala | 26 +++-----
 .../spark/rdd/ReliableCheckpointRDD.scala       |  3 +-
 .../org/apache/spark/rdd/WholeTextFileRDD.scala |  3 +-
 .../spark/scheduler/EventLoggingListener.scala  | 12 +---
 .../spark/scheduler/InputFormatInfo.scala       |  2 +-
 .../apache/spark/util/ShutdownHookManager.scala | 19 +-----
 .../java/org/apache/spark/JavaAPISuite.java     |  2 +-
 .../test/scala/org/apache/spark/FileSuite.scala |  9 ++-
 .../scheduler/EventLoggingListenerSuite.scala   |  4 +-
 .../spark/scheduler/ReplayListenerSuite.scala   |  3 +-
 .../spark/examples/CassandraCQLTest.scala       |  4 +-
 .../apache/spark/examples/CassandraTest.scala   |  4 +-
 project/MimaExcludes.scala                      |  5 ++
 scalastyle-config.xml                           |  8 ---
 .../InsertIntoHadoopFsRelation.scala            |  2 +-
 .../execution/datasources/SqlNewHadoopRDD.scala | 18 +++--
 .../execution/datasources/WriterContainer.scala | 27 +++-----
 .../datasources/json/JSONRelation.scala         | 12 ++--
 .../parquet/CatalystReadSupport.scala           |  2 -
 .../parquet/DirectParquetOutputCommitter.scala  |  6 +-
 .../datasources/parquet/ParquetRelation.scala   | 20 +++---
 .../datasources/text/DefaultSource.scala        | 13 ++--
 .../apache/spark/sql/sources/interfaces.scala   |  8 +--
 .../org/apache/spark/sql/hive/HiveContext.scala |  2 +-
 .../spark/sql/hive/client/ClientWrapper.scala   | 70 --------------------
 .../spark/sql/hive/hiveWriterContainers.scala   | 13 ++--
 .../spark/sql/hive/orc/OrcFileOperator.scala    |  2 +-
 .../apache/spark/sql/hive/orc/OrcRelation.scala | 16 ++---
 .../spark/sql/sources/SimpleTextRelation.scala  |  5 +-
 .../streaming/util/FileBasedWriteAheadLog.scala |  3 +-
 .../util/FileBasedWriteAheadLogWriter.scala     | 10 +--
 .../streaming/util/WriteAheadLogSuite.scala     |  3 +-
 46 files changed, 150 insertions(+), 441 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index bbdc915..77e44ee 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -874,11 +874,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
       path: String,
       minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
     assertNotStopped()
-    val job = new NewHadoopJob(hadoopConfiguration)
+    val job = NewHadoopJob.getInstance(hadoopConfiguration)
     // Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
     // comma separated files as input. (see SPARK-7155)
     NewFileInputFormat.setInputPaths(job, path)
-    val updateConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+    val updateConf = job.getConfiguration
     new WholeTextFileRDD(
       this,
       classOf[WholeTextFileInputFormat],
@@ -923,11 +923,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
       path: String,
       minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = withScope {
     assertNotStopped()
-    val job = new NewHadoopJob(hadoopConfiguration)
+    val job = NewHadoopJob.getInstance(hadoopConfiguration)
     // Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking
     // comma separated files as input. (see SPARK-7155)
     NewFileInputFormat.setInputPaths(job, path)
-    val updateConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+    val updateConf = job.getConfiguration
     new BinaryFileRDD(
       this,
       classOf[StreamInputFormat],
@@ -1100,13 +1100,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
       vClass: Class[V],
       conf: Configuration = hadoopConfiguration): RDD[(K, V)] = withScope {
     assertNotStopped()
-    // The call to new NewHadoopJob automatically adds security credentials to conf,
+    // The call to NewHadoopJob automatically adds security credentials to conf,
     // so we don't need to explicitly add them ourselves
-    val job = new NewHadoopJob(conf)
+    val job = NewHadoopJob.getInstance(conf)
     // Use setInputPaths so that newAPIHadoopFile aligns with hadoopFile/textFile in taking
     // comma separated files as input. (see SPARK-7155)
     NewFileInputFormat.setInputPaths(job, path)
-    val updatedConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+    val updatedConf = job.getConfiguration
     new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path)
   }
 
@@ -1369,7 +1369,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
       if (!fs.exists(hadoopPath)) {
         throw new FileNotFoundException(s"Added file $hadoopPath does not exist.")
       }
-      val isDir = fs.getFileStatus(hadoopPath).isDir
+      val isDir = fs.getFileStatus(hadoopPath).isDirectory
       if (!isLocal && scheme == "file" && isDir) {
         throw new SparkException(s"addFile does not support local directories when not running " +
           "local mode.")

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index ac6eaab..dd400b8 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -25,6 +25,7 @@ import java.util.Date
 import org.apache.hadoop.mapred._
 import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.TaskType
 
 import org.apache.spark.mapred.SparkHadoopMapRedUtil
 import org.apache.spark.rdd.HadoopRDD
@@ -37,10 +38,7 @@ import org.apache.spark.util.SerializableJobConf
  * a filename to write to, etc, exactly like in a Hadoop MapReduce job.
  */
 private[spark]
-class SparkHadoopWriter(jobConf: JobConf)
-  extends Logging
-  with SparkHadoopMapRedUtil
-  with Serializable {
+class SparkHadoopWriter(jobConf: JobConf) extends Logging with Serializable {
 
   private val now = new Date()
   private val conf = new SerializableJobConf(jobConf)
@@ -131,7 +129,7 @@ class SparkHadoopWriter(jobConf: JobConf)
 
   private def getJobContext(): JobContext = {
     if (jobContext == null) {
-      jobContext = newJobContext(conf.value, jID.value)
+      jobContext = new JobContextImpl(conf.value, jID.value)
     }
     jobContext
   }
@@ -143,6 +141,12 @@ class SparkHadoopWriter(jobConf: JobConf)
     taskContext
   }
 
+  protected def newTaskAttemptContext(
+      conf: JobConf,
+      attemptId: TaskAttemptID): TaskAttemptContext = {
+    new TaskAttemptContextImpl(conf, attemptId)
+  }
+
   private def setIDs(jobid: Int, splitid: Int, attemptid: Int) {
     jobID = jobid
     splitID = splitid
@@ -150,7 +154,7 @@ class SparkHadoopWriter(jobConf: JobConf)
 
     jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobid))
     taID = new SerializableWritable[TaskAttemptID](
-        new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
+        new TaskAttemptID(new TaskID(jID.value, TaskType.MAP, splitID), attemptID))
   }
 }
 
@@ -168,9 +172,9 @@ object SparkHadoopWriter {
     }
     val outputPath = new Path(path)
     val fs = outputPath.getFileSystem(conf)
-    if (outputPath == null || fs == null) {
+    if (fs == null) {
       throw new IllegalArgumentException("Incorrectly formatted output path")
     }
-    outputPath.makeQualified(fs)
+    outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 59e9056..4bd94f1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -33,9 +33,6 @@ import org.apache.hadoop.fs.FileSystem.Statistics
 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
 import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.JobContext
-import org.apache.hadoop.mapreduce.{TaskAttemptContext => MapReduceTaskAttemptContext}
-import org.apache.hadoop.mapreduce.{TaskAttemptID => MapReduceTaskAttemptID}
 import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 
 import org.apache.spark.annotation.DeveloperApi
@@ -76,9 +73,6 @@ class SparkHadoopUtil extends Logging {
     }
   }
 
-  @deprecated("use newConfiguration with SparkConf argument", "1.2.0")
-  def newConfiguration(): Configuration = newConfiguration(null)
-
   /**
    * Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
    * subsystems.
@@ -191,33 +185,6 @@ class SparkHadoopUtil extends Logging {
   }
 
   /**
-   * Using reflection to get the Configuration from JobContext/TaskAttemptContext. If we directly
-   * call `JobContext/TaskAttemptContext.getConfiguration`, it will generate different byte codes
-   * for Hadoop 1.+ and Hadoop 2.+ because JobContext/TaskAttemptContext is class in Hadoop 1.+
-   * while it's interface in Hadoop 2.+.
-   */
-  def getConfigurationFromJobContext(context: JobContext): Configuration = {
-    // scalastyle:off jobconfig
-    val method = context.getClass.getMethod("getConfiguration")
-    // scalastyle:on jobconfig
-    method.invoke(context).asInstanceOf[Configuration]
-  }
-
-  /**
-   * Using reflection to call `getTaskAttemptID` from TaskAttemptContext. If we directly
-   * call `TaskAttemptContext.getTaskAttemptID`, it will generate different byte codes
-   * for Hadoop 1.+ and Hadoop 2.+ because TaskAttemptContext is class in Hadoop 1.+
-   * while it's interface in Hadoop 2.+.
-   */
-  def getTaskAttemptIDFromTaskAttemptContext(
-      context: MapReduceTaskAttemptContext): MapReduceTaskAttemptID = {
-    // scalastyle:off jobconfig
-    val method = context.getClass.getMethod("getTaskAttemptID")
-    // scalastyle:on jobconfig
-    method.invoke(context).asInstanceOf[MapReduceTaskAttemptID]
-  }
-
-  /**
    * Get [[FileStatus]] objects for all leaf children (files) under the given base path. If the
    * given path points to a file, return a single-element collection containing [[FileStatus]] of
    * that file.
@@ -233,11 +200,11 @@ class SparkHadoopUtil extends Logging {
    */
   def listLeafStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = {
     def recurse(status: FileStatus): Seq[FileStatus] = {
-      val (directories, leaves) = fs.listStatus(status.getPath).partition(_.isDir)
+      val (directories, leaves) = fs.listStatus(status.getPath).partition(_.isDirectory)
       leaves ++ directories.flatMap(f => listLeafStatuses(fs, f))
     }
 
-    if (baseStatus.isDir) recurse(baseStatus) else Seq(baseStatus)
+    if (baseStatus.isDirectory) recurse(baseStatus) else Seq(baseStatus)
   }
 
   def listLeafDirStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
@@ -246,12 +213,12 @@ class SparkHadoopUtil extends Logging {
 
   def listLeafDirStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = {
     def recurse(status: FileStatus): Seq[FileStatus] = {
-      val (directories, files) = fs.listStatus(status.getPath).partition(_.isDir)
+      val (directories, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
       val leaves = if (directories.isEmpty) Seq(status) else Seq.empty[FileStatus]
       leaves ++ directories.flatMap(dir => listLeafDirStatuses(fs, dir))
     }
 
-    assert(baseStatus.isDir)
+    assert(baseStatus.isDirectory)
     recurse(baseStatus)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 6e91d73..c93bc8c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -28,6 +28,7 @@ import com.google.common.io.ByteStreams
 import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
 import org.apache.hadoop.hdfs.DistributedFileSystem
+import org.apache.hadoop.hdfs.protocol.HdfsConstants
 import org.apache.hadoop.security.AccessControlException
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
@@ -167,7 +168,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
       }
       throw new IllegalArgumentException(msg)
     }
-    if (!fs.getFileStatus(path).isDir) {
+    if (!fs.getFileStatus(path).isDirectory) {
       throw new IllegalArgumentException(
         "Logging directory specified is not a directory: %s".format(logDir))
     }
@@ -304,7 +305,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
         logError("Exception encountered when attempting to update last scan time", e)
         lastScanTime
     } finally {
-      if (!fs.delete(path)) {
+      if (!fs.delete(path, true)) {
         logWarning(s"Error deleting ${path}")
       }
     }
@@ -603,7 +604,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
    * As of Spark 1.3, these files are consolidated into a single one that replaces the directory.
    * See SPARK-2261 for more detail.
    */
-  private def isLegacyLogDirectory(entry: FileStatus): Boolean = entry.isDir()
+  private def isLegacyLogDirectory(entry: FileStatus): Boolean = entry.isDirectory
 
   /**
    * Returns the modification time of the given event log. If the status points at an empty
@@ -648,8 +649,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
   }
 
   /**
-   * Checks whether HDFS is in safe mode. The API is slightly different between hadoop 1 and 2,
-   * so we have to resort to ugly reflection (as usual...).
+   * Checks whether HDFS is in safe mode.
    *
    * Note that DistributedFileSystem is a `@LimitedPrivate` class, which for all practical reasons
    * makes it more public than not.
@@ -663,11 +663,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
 
   // For testing.
   private[history] def isFsInSafeMode(dfs: DistributedFileSystem): Boolean = {
-    val hadoop2Class = "org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction"
-    val actionClass: Class[_] = getClass().getClassLoader().loadClass(hadoop2Class)
-    val action = actionClass.getField("SAFEMODE_GET").get(null)
-    val method = dfs.getClass().getMethod("setSafeMode", action.getClass())
-    method.invoke(dfs, action).asInstanceOf[Boolean]
+    dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala
index 532850d..30431a9 100644
--- a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala
+++ b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala
@@ -23,7 +23,6 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}
 
 import org.apache.spark.Logging
-import org.apache.spark.deploy.SparkHadoopUtil
 
 /**
  * Custom Input Format for reading and splitting flat binary files that contain records,
@@ -36,7 +35,7 @@ private[spark] object FixedLengthBinaryInputFormat {
 
   /** Retrieves the record length property from a Hadoop configuration */
   def getRecordLength(context: JobContext): Int = {
-    SparkHadoopUtil.get.getConfigurationFromJobContext(context).get(RECORD_LENGTH_PROPERTY).toInt
+    context.getConfiguration.get(RECORD_LENGTH_PROPERTY).toInt
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala
index 67a9692..25596a1 100644
--- a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala
+++ b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala
@@ -24,7 +24,6 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory
 import org.apache.hadoop.io.{BytesWritable, LongWritable}
 import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext}
 import org.apache.hadoop.mapreduce.lib.input.FileSplit
-import org.apache.spark.deploy.SparkHadoopUtil
 
 /**
  * FixedLengthBinaryRecordReader is returned by FixedLengthBinaryInputFormat.
@@ -83,16 +82,16 @@ private[spark] class FixedLengthBinaryRecordReader
     // the actual file we will be reading from
     val file = fileSplit.getPath
     // job configuration
-    val job = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+    val conf = context.getConfiguration
     // check compression
-    val codec = new CompressionCodecFactory(job).getCodec(file)
+    val codec = new CompressionCodecFactory(conf).getCodec(file)
     if (codec != null) {
       throw new IOException("FixedLengthRecordReader does not support reading compressed files")
     }
     // get the record length
     recordLength = FixedLengthBinaryInputFormat.getRecordLength(context)
     // get the filesystem
-    val fs = file.getFileSystem(job)
+    val fs = file.getFileSystem(conf)
     // open the File
     fileInputStream = fs.open(file)
     // seek to the splitStart position

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
index 280e7a5..cb76e3c 100644
--- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
+++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
@@ -27,8 +27,6 @@ import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}
 import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat, CombineFileRecordReader, CombineFileSplit}
 
-import org.apache.spark.deploy.SparkHadoopUtil
-
 /**
  * A general format for reading whole files in as streams, byte arrays,
  * or other functions to be added
@@ -44,7 +42,7 @@ private[spark] abstract class StreamFileInputFormat[T]
    */
   def setMinPartitions(context: JobContext, minPartitions: Int) {
     val files = listStatus(context).asScala
-    val totalLen = files.map(file => if (file.isDir) 0L else file.getLen).sum
+    val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum
     val maxSplitSize = Math.ceil(totalLen * 1.0 / files.size).toLong
     super.setMaxSplitSize(maxSplitSize)
   }
@@ -135,8 +133,7 @@ class PortableDataStream(
 
   private val confBytes = {
     val baos = new ByteArrayOutputStream()
-    SparkHadoopUtil.get.getConfigurationFromJobContext(context).
-      write(new DataOutputStream(baos))
+    context.getConfiguration.write(new DataOutputStream(baos))
     baos.toByteArray
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
index 4134087..fa34f1e 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
@@ -53,7 +53,7 @@ private[spark] class WholeTextFileInputFormat
    */
   def setMinPartitions(context: JobContext, minPartitions: Int) {
     val files = listStatus(context).asScala
-    val totalLen = files.map(file => if (file.isDir) 0L else file.getLen).sum
+    val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum
     val maxSplitSize = Math.ceil(totalLen * 1.0 /
       (if (minPartitions == 0) 1 else minPartitions)).toLong
     super.setMaxSplitSize(maxSplitSize)

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
index b56b2aa..998c898 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
@@ -26,8 +26,6 @@ import org.apache.hadoop.mapreduce.InputSplit
 import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecordReader}
 import org.apache.hadoop.mapreduce.RecordReader
 import org.apache.hadoop.mapreduce.TaskAttemptContext
-import org.apache.spark.deploy.SparkHadoopUtil
-
 
 /**
  * A trait to implement [[org.apache.hadoop.conf.Configurable Configurable]] interface.
@@ -52,8 +50,7 @@ private[spark] class WholeTextFileRecordReader(
   extends RecordReader[Text, Text] with Configurable {
 
   private[this] val path = split.getPath(index)
-  private[this] val fs = path.getFileSystem(
-    SparkHadoopUtil.get.getConfigurationFromJobContext(context))
+  private[this] val fs = path.getFileSystem(context.getConfiguration)
 
   // True means the current file has been processed, then skip it.
   private[this] var processed = false

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
index f7298e8..249bdf5 100644
--- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
@@ -18,61 +18,12 @@
 package org.apache.spark.mapred
 
 import java.io.IOException
-import java.lang.reflect.Modifier
 
-import org.apache.hadoop.mapred._
 import org.apache.hadoop.mapreduce.{TaskAttemptContext => MapReduceTaskAttemptContext}
 import org.apache.hadoop.mapreduce.{OutputCommitter => MapReduceOutputCommitter}
 
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.executor.CommitDeniedException
 import org.apache.spark.{Logging, SparkEnv, TaskContext}
-import org.apache.spark.util.{Utils => SparkUtils}
-
-private[spark]
-trait SparkHadoopMapRedUtil {
-  def newJobContext(conf: JobConf, jobId: JobID): JobContext = {
-    val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl",
-      "org.apache.hadoop.mapred.JobContext")
-    val ctor = klass.getDeclaredConstructor(classOf[JobConf],
-      classOf[org.apache.hadoop.mapreduce.JobID])
-    // In Hadoop 1.0.x, JobContext is an interface, and JobContextImpl is package private.
-    // Make it accessible if it's not in order to access it.
-    if (!Modifier.isPublic(ctor.getModifiers)) {
-      ctor.setAccessible(true)
-    }
-    ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
-  }
-
-  def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = {
-    val klass = firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl",
-      "org.apache.hadoop.mapred.TaskAttemptContext")
-    val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[TaskAttemptID])
-    // See above
-    if (!Modifier.isPublic(ctor.getModifiers)) {
-      ctor.setAccessible(true)
-    }
-    ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
-  }
-
-  def newTaskAttemptID(
-      jtIdentifier: String,
-      jobId: Int,
-      isMap: Boolean,
-      taskId: Int,
-      attemptId: Int): TaskAttemptID = {
-    new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId)
-  }
-
-  private def firstAvailableClass(first: String, second: String): Class[_] = {
-    try {
-      SparkUtils.classForName(first)
-    } catch {
-      case e: ClassNotFoundException =>
-        SparkUtils.classForName(second)
-    }
-  }
-}
 
 object SparkHadoopMapRedUtil extends Logging {
   /**
@@ -93,7 +44,7 @@ object SparkHadoopMapRedUtil extends Logging {
       jobId: Int,
       splitId: Int): Unit = {
 
-    val mrTaskAttemptID = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(mrTaskContext)
+    val mrTaskAttemptID = mrTaskContext.getTaskAttemptID
 
     // Called after we have decided to commit
     def performCommit(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala b/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala
deleted file mode 100644
index 82d807f..0000000
--- a/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mapreduce
-
-import java.lang.{Boolean => JBoolean, Integer => JInteger}
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapreduce.{JobContext, JobID, TaskAttemptContext, TaskAttemptID}
-import org.apache.spark.util.Utils
-
-private[spark]
-trait SparkHadoopMapReduceUtil {
-  def newJobContext(conf: Configuration, jobId: JobID): JobContext = {
-    val klass = Utils.classForName("org.apache.hadoop.mapreduce.task.JobContextImpl")
-    val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[JobID])
-    ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
-  }
-
-  def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = {
-    val klass = Utils.classForName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl")
-    val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[TaskAttemptID])
-    ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
-  }
-
-  def newTaskAttemptID(
-      jtIdentifier: String,
-      jobId: Int,
-      isMap: Boolean,
-      taskId: Int,
-      attemptId: Int): TaskAttemptID = {
-    val klass = Utils.classForName("org.apache.hadoop.mapreduce.TaskAttemptID")
-    try {
-      // First, attempt to use the old-style constructor that takes a boolean isMap
-      // (not available in YARN)
-      val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], classOf[Boolean],
-        classOf[Int], classOf[Int])
-      ctor.newInstance(jtIdentifier, new JInteger(jobId), new JBoolean(isMap), new JInteger(taskId),
-        new JInteger(attemptId)).asInstanceOf[TaskAttemptID]
-    } catch {
-      case exc: NoSuchMethodException => {
-        // If that failed, look for the new constructor that takes a TaskType (not available in 1.x)
-        val taskTypeClass = Utils.classForName("org.apache.hadoop.mapreduce.TaskType")
-          .asInstanceOf[Class[Enum[_]]]
-        val taskType = taskTypeClass.getMethod("valueOf", classOf[String]).invoke(
-          taskTypeClass, if (isMap) "MAP" else "REDUCE")
-        val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], taskTypeClass,
-          classOf[Int], classOf[Int])
-        ctor.newInstance(jtIdentifier, new JInteger(jobId), taskType, new JInteger(taskId),
-          new JInteger(attemptId)).asInstanceOf[TaskAttemptID]
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
index aedced7..2bf2337 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
@@ -20,6 +20,8 @@ package org.apache.spark.rdd
 import org.apache.hadoop.conf.{ Configurable, Configuration }
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.task.JobContextImpl
+
 import org.apache.spark.input.StreamFileInputFormat
 import org.apache.spark.{ Partition, SparkContext }
 
@@ -40,7 +42,7 @@ private[spark] class BinaryFileRDD[T](
         configurable.setConf(conf)
       case _ =>
     }
-    val jobContext = newJobContext(conf, jobId)
+    val jobContext = new JobContextImpl(conf, jobId)
     inputFormat.setMinPartitions(jobContext, minPartitions)
     val rawSplits = inputFormat.getSplits(jobContext).toArray
     val result = new Array[Partition](rawSplits.size)

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index f37c95b..920d3bf 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -36,6 +36,7 @@ import org.apache.hadoop.mapred.JobID
 import org.apache.hadoop.mapred.TaskAttemptID
 import org.apache.hadoop.mapred.TaskID
 import org.apache.hadoop.mapred.lib.CombineFileSplit
+import org.apache.hadoop.mapreduce.TaskType
 import org.apache.hadoop.util.ReflectionUtils
 
 import org.apache.spark._
@@ -357,7 +358,7 @@ private[spark] object HadoopRDD extends Logging {
   def addLocalConfiguration(jobTrackerId: String, jobId: Int, splitId: Int, attemptId: Int,
                             conf: JobConf) {
     val jobID = new JobID(jobTrackerId, jobId)
-    val taId = new TaskAttemptID(new TaskID(jobID, true, splitId), attemptId)
+    val taId = new TaskAttemptID(new TaskID(jobID, TaskType.MAP, splitId), attemptId)
 
     conf.set("mapred.tip.id", taId.getTaskID.toString)
     conf.set("mapred.task.id", taId.toString)

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 86f38ae..8b330a3 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -26,11 +26,11 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
+import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark._
 import org.apache.spark.executor.DataReadMethod
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
 import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager}
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -66,9 +66,7 @@ class NewHadoopRDD[K, V](
     keyClass: Class[K],
     valueClass: Class[V],
     @transient private val _conf: Configuration)
-  extends RDD[(K, V)](sc, Nil)
-  with SparkHadoopMapReduceUtil
-  with Logging {
+  extends RDD[(K, V)](sc, Nil) with Logging {
 
   // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
   private val confBroadcast = sc.broadcast(new SerializableConfiguration(_conf))
@@ -109,7 +107,7 @@ class NewHadoopRDD[K, V](
         configurable.setConf(_conf)
       case _ =>
     }
-    val jobContext = newJobContext(_conf, jobId)
+    val jobContext = new JobContextImpl(_conf, jobId)
     val rawSplits = inputFormat.getSplits(jobContext).toArray
     val result = new Array[Partition](rawSplits.size)
     for (i <- 0 until rawSplits.size) {
@@ -144,8 +142,8 @@ class NewHadoopRDD[K, V](
           configurable.setConf(conf)
         case _ =>
       }
-      val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
-      val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
+      val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
+      val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
       private var reader = format.createRecordReader(
         split.serializableHadoopSplit.value, hadoopAttemptContext)
       reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 44d1955..b872301 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -33,15 +33,14 @@ import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.io.compress.CompressionCodec
 import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
-import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat,
-  RecordWriter => NewRecordWriter}
+import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, TaskType, TaskAttemptID}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 
 import org.apache.spark._
 import org.apache.spark.Partitioner.defaultPartitioner
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.executor.{DataWriteMethod, OutputMetrics}
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.spark.partial.{BoundedDouble, PartialResult}
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.util.{SerializableConfiguration, Utils}
@@ -53,10 +52,7 @@ import org.apache.spark.util.random.StratifiedSamplingUtils
  */
 class PairRDDFunctions[K, V](self: RDD[(K, V)])
     (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
-  extends Logging
-  with SparkHadoopMapReduceUtil
-  with Serializable
-{
+  extends Logging with Serializable {
 
   /**
    * :: Experimental ::
@@ -985,11 +981,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       conf: Configuration = self.context.hadoopConfiguration): Unit = self.withScope {
     // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
     val hadoopConf = conf
-    val job = new NewAPIHadoopJob(hadoopConf)
+    val job = NewAPIHadoopJob.getInstance(hadoopConf)
     job.setOutputKeyClass(keyClass)
     job.setOutputValueClass(valueClass)
     job.setOutputFormatClass(outputFormatClass)
-    val jobConfiguration = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+    val jobConfiguration = job.getConfiguration
     jobConfiguration.set("mapred.output.dir", path)
     saveAsNewAPIHadoopDataset(jobConfiguration)
   }
@@ -1074,11 +1070,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
   def saveAsNewAPIHadoopDataset(conf: Configuration): Unit = self.withScope {
     // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
     val hadoopConf = conf
-    val job = new NewAPIHadoopJob(hadoopConf)
+    val job = NewAPIHadoopJob.getInstance(hadoopConf)
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")
     val jobtrackerID = formatter.format(new Date())
     val stageId = self.id
-    val jobConfiguration = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+    val jobConfiguration = job.getConfiguration
     val wrappedConf = new SerializableConfiguration(jobConfiguration)
     val outfmt = job.getOutputFormatClass
     val jobFormat = outfmt.newInstance
@@ -1091,9 +1087,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     val writeShard = (context: TaskContext, iter: Iterator[(K, V)]) => {
       val config = wrappedConf.value
       /* "reduce task" <split #> <attempt # = spark task #> */
-      val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
+      val attemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.REDUCE, context.partitionId,
         context.attemptNumber)
-      val hadoopContext = newTaskAttemptContext(config, attemptId)
+      val hadoopContext = new TaskAttemptContextImpl(config, attemptId)
       val format = outfmt.newInstance
       format match {
         case c: Configurable => c.setConf(config)
@@ -1125,8 +1121,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       1
     } : Int
 
-    val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
-    val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
+    val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.MAP, 0, 0)
+    val jobTaskContext = new TaskAttemptContextImpl(wrappedConf.value, jobAttemptId)
     val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
 
     // When speculation is on and output committer class name contains "Direct", we should warn

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index fa71b8c..a9b3d52 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -174,7 +174,8 @@ private[spark] object ReliableCheckpointRDD extends Logging {
       fs.create(tempOutputPath, false, bufferSize)
     } else {
       // This is mainly for testing purpose
-      fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication, blockSize)
+      fs.create(tempOutputPath, false, bufferSize,
+        fs.getDefaultReplication(fs.getWorkingDirectory), blockSize)
     }
     val serializer = env.serializer.newInstance()
     val serializeStream = serializer.serializeStream(fileOutputStream)

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala
index e3f14fe..8e1baae 100644
--- a/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala
@@ -20,6 +20,7 @@ package org.apache.spark.rdd
 import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.io.{Text, Writable}
 import org.apache.hadoop.mapreduce.InputSplit
+import org.apache.hadoop.mapreduce.task.JobContextImpl
 
 import org.apache.spark.{Partition, SparkContext}
 import org.apache.spark.input.WholeTextFileInputFormat
@@ -44,7 +45,7 @@ private[spark] class WholeTextFileRDD(
         configurable.setConf(conf)
       case _ =>
     }
-    val jobContext = newJobContext(conf, jobId)
+    val jobContext = new JobContextImpl(conf, jobId)
     inputFormat.setMinPartitions(jobContext, minPartitions)
     val rawSplits = inputFormat.getSplits(jobContext).toArray
     val result = new Array[Partition](rawSplits.size)

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index eaa07ac..68792c5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -77,14 +77,6 @@ private[spark] class EventLoggingListener(
   // Only defined if the file system scheme is not local
   private var hadoopDataStream: Option[FSDataOutputStream] = None
 
-  // The Hadoop APIs have changed over time, so we use reflection to figure out
-  // the correct method to use to flush a hadoop data stream. See SPARK-1518
-  // for details.
-  private val hadoopFlushMethod = {
-    val cls = classOf[FSDataOutputStream]
-    scala.util.Try(cls.getMethod("hflush")).getOrElse(cls.getMethod("sync"))
-  }
-
   private var writer: Option[PrintWriter] = None
 
   // For testing. Keep track of all JSON serialized events that have been logged.
@@ -97,7 +89,7 @@ private[spark] class EventLoggingListener(
    * Creates the log file in the configured log directory.
    */
   def start() {
-    if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDir) {
+    if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
       throw new IllegalArgumentException(s"Log directory $logBaseDir does not exist.")
     }
 
@@ -147,7 +139,7 @@ private[spark] class EventLoggingListener(
     // scalastyle:on println
     if (flushLogger) {
       writer.foreach(_.flush())
-      hadoopDataStream.foreach(hadoopFlushMethod.invoke(_))
+      hadoopDataStream.foreach(_.hflush())
     }
     if (testing) {
       loggedEvents += eventJson

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
index 0e438ab..8235b10 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
@@ -103,7 +103,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
     val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] =
       ReflectionUtils.newInstance(inputFormatClazz.asInstanceOf[Class[_]], conf).asInstanceOf[
         org.apache.hadoop.mapreduce.InputFormat[_, _]]
-    val job = new Job(conf)
+    val job = Job.getInstance(conf)
 
     val retval = new ArrayBuffer[SplitInfo]()
     val list = instance.getSplits(job)

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
index 0065b1f..acc24ca 100644
--- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
+++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
@@ -20,7 +20,7 @@ package org.apache.spark.util
 import java.io.File
 import java.util.PriorityQueue
 
-import scala.util.{Failure, Success, Try}
+import scala.util.Try
 
 import org.apache.hadoop.fs.FileSystem
 import org.apache.spark.Logging
@@ -177,21 +177,8 @@ private [util] class SparkShutdownHookManager {
     val hookTask = new Runnable() {
       override def run(): Unit = runAll()
     }
-    Try(Utils.classForName("org.apache.hadoop.util.ShutdownHookManager")) match {
-      case Success(shmClass) =>
-        val fsPriority = classOf[FileSystem]
-          .getField("SHUTDOWN_HOOK_PRIORITY")
-          .get(null) // static field, the value is not used
-          .asInstanceOf[Int]
-        val shm = shmClass.getMethod("get").invoke(null)
-        shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int])
-          .invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))
-
-      case Failure(_) =>
-        // scalastyle:off runtimeaddshutdownhook
-        Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook"));
-        // scalastyle:on runtimeaddshutdownhook
-    }
+    org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook(
+      hookTask, FileSystem.SHUTDOWN_HOOK_PRIORITY + 30)
   }
 
   def runAll(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 11f1248..d91948e 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -1246,7 +1246,7 @@ public class JavaAPISuite implements Serializable {
 
     JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir,
         org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
-        IntWritable.class, Text.class, new Job().getConfiguration());
+        IntWritable.class, Text.class, Job.getInstance().getConfiguration());
     Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() {
       @Override
       public String call(Tuple2<IntWritable, Text> x) {

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index f6a7f43..2e47801 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -19,12 +19,11 @@ package org.apache.spark
 
 import java.io.{File, FileWriter}
 
-import org.apache.spark.deploy.SparkHadoopUtil
+import scala.io.Source
+
 import org.apache.spark.input.PortableDataStream
 import org.apache.spark.storage.StorageLevel
 
-import scala.io.Source
-
 import org.apache.hadoop.io._
 import org.apache.hadoop.io.compress.DefaultCodec
 import org.apache.hadoop.mapred.{JobConf, FileAlreadyExistsException, FileSplit, TextInputFormat, TextOutputFormat}
@@ -506,11 +505,11 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
     sc = new SparkContext("local", "test")
     val randomRDD = sc.parallelize(
       Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
-    val job = new Job(sc.hadoopConfiguration)
+    val job = Job.getInstance(sc.hadoopConfiguration)
     job.setOutputKeyClass(classOf[String])
     job.setOutputValueClass(classOf[String])
     job.setOutputFormatClass(classOf[NewTextOutputFormat[String, String]])
-    val jobConfig = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+    val jobConfig = job.getConfiguration
     jobConfig.set("mapred.output.dir", tempDir.getPath + "/outputDataset_new")
     randomRDD.saveAsNewAPIHadoopDataset(jobConfig)
     assert(new File(tempDir.getPath + "/outputDataset_new/part-r-00000").exists() === true)

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 5cb2d42..43da6fc 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -67,11 +67,11 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
     val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS)
     assert(fileSystem.exists(logPath))
     val logStatus = fileSystem.getFileStatus(logPath)
-    assert(!logStatus.isDir)
+    assert(!logStatus.isDirectory)
 
     // Verify log is renamed after stop()
     eventLogger.stop()
-    assert(!fileSystem.getFileStatus(new Path(eventLogger.logPath)).isDir)
+    assert(!fileSystem.getFileStatus(new Path(eventLogger.logPath)).isDirectory)
   }
 
   test("Basic event logging") {

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 103fc19..761e82e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -23,7 +23,6 @@ import java.net.URI
 import org.json4s.jackson.JsonMethods._
 import org.scalatest.BeforeAndAfter
 
-import org.apache.spark.{SparkConf, SparkContext, SPARK_VERSION}
 import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.io.CompressionCodec
@@ -115,7 +114,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter {
     val applications = fileSystem.listStatus(logDirPath)
     assert(applications != null && applications.size > 0)
     val eventLog = applications.sortBy(_.getModificationTime).last
-    assert(!eventLog.isDir)
+    assert(!eventLog.isDirectory)
 
     // Replay events
     val logData = EventLoggingListener.openEventLog(eventLog.getPath(), fileSystem)

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
index d1b9b8d..5a80985 100644
--- a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
@@ -16,7 +16,6 @@
  */
 
  // scalastyle:off println
- // scalastyle:off jobcontext
 package org.apache.spark.examples
 
 import java.nio.ByteBuffer
@@ -80,7 +79,7 @@ object CassandraCQLTest {
     val InputColumnFamily = "ordercf"
     val OutputColumnFamily = "salecount"
 
-    val job = new Job()
+    val job = Job.getInstance()
     job.setInputFormatClass(classOf[CqlPagingInputFormat])
     val configuration = job.getConfiguration
     ConfigHelper.setInputInitialAddress(job.getConfiguration(), cHost)
@@ -137,4 +136,3 @@ object CassandraCQLTest {
   }
 }
 // scalastyle:on println
-// scalastyle:on jobcontext

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
index 1e679bf..ad39a01 100644
--- a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
@@ -16,7 +16,6 @@
  */
 
 // scalastyle:off println
-// scalastyle:off jobcontext
 package org.apache.spark.examples
 
 import java.nio.ByteBuffer
@@ -59,7 +58,7 @@ object CassandraTest {
     val sc = new SparkContext(sparkConf)
 
     // Build the job configuration with ConfigHelper provided by Cassandra
-    val job = new Job()
+    val job = Job.getInstance()
     job.setInputFormatClass(classOf[ColumnFamilyInputFormat])
 
     val host: String = args(1)
@@ -131,7 +130,6 @@ object CassandraTest {
   }
 }
 // scalastyle:on println
-// scalastyle:on jobcontext
 
 /*
 create keyspace casDemo;

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 59886ab..612ddf8 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -49,6 +49,11 @@ object MimaExcludes {
         ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.SparkContext.emptyRDD"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.broadcast.HttpBroadcastFactory")
         ) ++
+      Seq(
+        // SPARK-12481
+        ProblemFilters.exclude[IncompatibleTemplateDefProblem](
+          "org.apache.spark.mapred.SparkHadoopMapRedUtil")
+      ) ++
       // When 1.6 is officially released, update this exclusion list.
       Seq(
         MimaBuild.excludeSparkPackage("deploy"),

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/scalastyle-config.xml
----------------------------------------------------------------------
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index 16d18b3..ee855ca 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -187,14 +187,6 @@ This file is divided into 3 sections:
     scala.collection.JavaConverters._ and use .asScala / .asJava methods</customMessage>
   </check>
 
-  <!-- As of SPARK-10330 JobContext methods should not be called directly -->
-  <check customId="jobcontext" level="error" class="org.scalastyle.scalariform.TokenChecker" enabled="true">
-    <parameters><parameter name="regex">^getConfiguration$|^getTaskAttemptID$</parameter></parameters>
-    <customMessage>Instead of calling .getConfiguration() or .getTaskAttemptID() directly,
-    use SparkHadoopUtil's getConfigurationFromJobContext() and getTaskAttemptIDFromTaskAttemptContext() methods.
-    </customMessage>
-  </check>
-
   <!-- ================================================================================ -->
   <!--       rules we'd like to enforce, but haven't cleaned up the codebase yet        -->
   <!-- ================================================================================ -->

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
index 735d52f..758bcd7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
@@ -93,7 +93,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
     val isAppend = pathExists && (mode == SaveMode.Append)
 
     if (doInsertion) {
-      val job = new Job(hadoopConf)
+      val job = Job.getInstance(hadoopConf)
       job.setOutputKeyClass(classOf[Void])
       job.setOutputValueClass(classOf[InternalRow])
       FileOutputFormat.setOutputPath(job, qualifiedOutputPath)

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
index eea780c..12f8783 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
@@ -26,10 +26,10 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
+import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.executor.DataReadMethod
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.spark.sql.{SQLConf, SQLContext}
 import org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader
 import org.apache.spark.storage.StorageLevel
@@ -68,16 +68,14 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
     initLocalJobFuncOpt: Option[Job => Unit],
     inputFormatClass: Class[_ <: InputFormat[Void, V]],
     valueClass: Class[V])
-    extends RDD[V](sqlContext.sparkContext, Nil)
-  with SparkHadoopMapReduceUtil
-  with Logging {
+    extends RDD[V](sqlContext.sparkContext, Nil) with Logging {
 
   protected def getJob(): Job = {
-    val conf: Configuration = broadcastedConf.value.value
+    val conf = broadcastedConf.value.value
     // "new Job" will make a copy of the conf. Then, it is
     // safe to mutate conf properties with initLocalJobFuncOpt
     // and initDriverSideJobFuncOpt.
-    val newJob = new Job(conf)
+    val newJob = Job.getInstance(conf)
     initLocalJobFuncOpt.map(f => f(newJob))
     newJob
   }
@@ -87,7 +85,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
     if (isDriverSide) {
       initDriverSideJobFuncOpt.map(f => f(job))
     }
-    SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+    job.getConfiguration
   }
 
   private val jobTrackerId: String = {
@@ -110,7 +108,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
         configurable.setConf(conf)
       case _ =>
     }
-    val jobContext = newJobContext(conf, jobId)
+    val jobContext = new JobContextImpl(conf, jobId)
     val rawSplits = inputFormat.getSplits(jobContext).toArray
     val result = new Array[SparkPartition](rawSplits.size)
     for (i <- 0 until rawSplits.size) {
@@ -154,8 +152,8 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
           configurable.setConf(conf)
         case _ =>
       }
-      val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
-      val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
+      val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
+      val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
       private[this] var reader: RecordReader[Void, V] = null
 
       /**

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 983f4df..8b0b647 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -24,10 +24,10 @@ import scala.collection.JavaConverters._
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+
 import org.apache.spark._
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.mapred.SparkHadoopMapRedUtil
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.InternalRow
@@ -41,14 +41,12 @@ private[sql] abstract class BaseWriterContainer(
     @transient val relation: HadoopFsRelation,
     @transient private val job: Job,
     isAppend: Boolean)
-  extends SparkHadoopMapReduceUtil
-  with Logging
-  with Serializable {
+  extends Logging with Serializable {
 
   protected val dataSchema = relation.dataSchema
 
   protected val serializableConf =
-    new SerializableConfiguration(SparkHadoopUtil.get.getConfigurationFromJobContext(job))
+    new SerializableConfiguration(job.getConfiguration)
 
   // This UUID is used to avoid output file name collision between different appending write jobs.
   // These jobs may belong to different SparkContext instances. Concrete data source implementations
@@ -90,8 +88,7 @@ private[sql] abstract class BaseWriterContainer(
     // This UUID is sent to executor side together with the serialized `Configuration` object within
     // the `Job` instance.  `OutputWriters` on the executor side should use this UUID to generate
     // unique task output files.
-    SparkHadoopUtil.get.getConfigurationFromJobContext(job).
-      set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)
+    job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)
 
     // Order of the following two lines is important.  For Hadoop 1, TaskAttemptContext constructor
     // clones the Configuration object passed in.  If we initialize the TaskAttemptContext first,
@@ -101,7 +98,7 @@ private[sql] abstract class BaseWriterContainer(
     // committer, since their initialization involve the job configuration, which can be potentially
     // decorated in `prepareJobForWrite`.
     outputWriterFactory = relation.prepareJobForWrite(job)
-    taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
+    taskAttemptContext = new TaskAttemptContextImpl(serializableConf.value, taskAttemptId)
 
     outputFormatClass = job.getOutputFormatClass
     outputCommitter = newOutputCommitter(taskAttemptContext)
@@ -111,7 +108,7 @@ private[sql] abstract class BaseWriterContainer(
   def executorSideSetup(taskContext: TaskContext): Unit = {
     setupIDs(taskContext.stageId(), taskContext.partitionId(), taskContext.attemptNumber())
     setupConf()
-    taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
+    taskAttemptContext = new TaskAttemptContextImpl(serializableConf.value, taskAttemptId)
     outputCommitter = newOutputCommitter(taskAttemptContext)
     outputCommitter.setupTask(taskAttemptContext)
   }
@@ -166,7 +163,7 @@ private[sql] abstract class BaseWriterContainer(
           "because spark.speculation is configured to be true.")
       defaultOutputCommitter
     } else {
-      val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+      val configuration = context.getConfiguration
       val committerClass = configuration.getClass(
         SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
 
@@ -201,10 +198,8 @@ private[sql] abstract class BaseWriterContainer(
 
   private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
     this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
-    this.taskId = new TaskID(this.jobId, true, splitId)
-    // scalastyle:off jobcontext
+    this.taskId = new TaskID(this.jobId, TaskType.MAP, splitId)
     this.taskAttemptId = new TaskAttemptID(taskId, attemptId)
-    // scalastyle:on jobcontext
   }
 
   private def setupConf(): Unit = {
@@ -250,7 +245,7 @@ private[sql] class DefaultWriterContainer(
 
   def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
     executorSideSetup(taskContext)
-    val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext)
+    val configuration = taskAttemptContext.getConfiguration
     configuration.set("spark.sql.sources.output.path", outputPath)
     val writer = newOutputWriter(getWorkPath)
     writer.initConverter(dataSchema)
@@ -421,7 +416,7 @@ private[sql] class DynamicPartitionWriterContainer(
     def newOutputWriter(key: InternalRow): OutputWriter = {
       val partitionPath = getPartitionString(key).getString(0)
       val path = new Path(getWorkPath, partitionPath)
-      val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext)
+      val configuration = taskAttemptContext.getConfiguration
       configuration.set(
         "spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString)
       val newWriter = super.newOutputWriter(path.toString)

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 3e61ba3..54a8552 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -30,8 +30,6 @@ import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
 
 import org.apache.spark.Logging
 import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.mapred.SparkHadoopMapRedUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
@@ -89,8 +87,8 @@ private[sql] class JSONRelation(
   override val needConversion: Boolean = false
 
   private def createBaseRdd(inputPaths: Array[FileStatus]): RDD[String] = {
-    val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
-    val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+    val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration)
+    val conf = job.getConfiguration
 
     val paths = inputPaths.map(_.getPath)
 
@@ -176,7 +174,7 @@ private[json] class JsonOutputWriter(
     path: String,
     dataSchema: StructType,
     context: TaskAttemptContext)
-  extends OutputWriter with SparkHadoopMapRedUtil with Logging {
+  extends OutputWriter with Logging {
 
   private[this] val writer = new CharArrayWriter()
   // create the Generator without separator inserted between 2 records
@@ -186,9 +184,9 @@ private[json] class JsonOutputWriter(
   private val recordWriter: RecordWriter[NullWritable, Text] = {
     new TextOutputFormat[NullWritable, Text]() {
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
-        val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+        val configuration = context.getConfiguration
         val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
-        val taskAttemptId = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(context)
+        val taskAttemptId = context.getTaskAttemptID
         val split = taskAttemptId.getTaskID.getId
         new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
index a958373..e5d8e60 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
@@ -58,9 +58,7 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
    */
   override def init(context: InitContext): ReadContext = {
     catalystRequestedSchema = {
-      // scalastyle:off jobcontext
       val conf = context.getConfiguration
-      // scalastyle:on jobcontext
       val schemaString = conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
       assert(schemaString != null, "Parquet requested schema not set.")
       StructType.fromString(schemaString)

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
index 1a4e99f..e54f51e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
@@ -54,11 +54,7 @@ private[datasources] class DirectParquetOutputCommitter(
   override def setupTask(taskContext: TaskAttemptContext): Unit = {}
 
   override def commitJob(jobContext: JobContext) {
-    val configuration = {
-      // scalastyle:off jobcontext
-      ContextUtil.getConfiguration(jobContext)
-      // scalastyle:on jobcontext
-    }
+    val configuration = ContextUtil.getConfiguration(jobContext)
     val fileSystem = outputPath.getFileSystem(configuration)
 
     if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 1af2a39..af964b4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.hadoop.mapreduce.task.JobContextImpl
 import org.apache.parquet.filter2.predicate.FilterApi
 import org.apache.parquet.hadoop._
 import org.apache.parquet.hadoop.metadata.CompressionCodecName
@@ -40,7 +41,6 @@ import org.apache.parquet.{Log => ApacheParquetLog}
 import org.slf4j.bridge.SLF4JBridgeHandler
 
 import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.{RDD, SqlNewHadoopPartition, SqlNewHadoopRDD}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
@@ -82,9 +82,9 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext
         //     `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
         //     partitions in the case of dynamic partitioning.
         override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
-          val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+          val configuration = context.getConfiguration
           val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
-          val taskAttemptId = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(context)
+          val taskAttemptId = context.getTaskAttemptID
           val split = taskAttemptId.getTaskID.getId
           new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
         }
@@ -217,11 +217,7 @@ private[sql] class ParquetRelation(
   override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum
 
   override def prepareJobForWrite(job: Job): OutputWriterFactory = {
-    val conf = {
-      // scalastyle:off jobcontext
-      ContextUtil.getConfiguration(job)
-      // scalastyle:on jobcontext
-    }
+    val conf = ContextUtil.getConfiguration(job)
 
     // SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible
     val committerClassName = conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key)
@@ -340,7 +336,7 @@ private[sql] class ParquetRelation(
           // URI of the path to create a new Path.
           val pathWithEscapedAuthority = escapePathUserInfo(f.getPath)
           new FileStatus(
-            f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime,
+            f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, f.getModificationTime,
             f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithEscapedAuthority)
         }.toSeq
 
@@ -359,7 +355,7 @@ private[sql] class ParquetRelation(
             }
           }
 
-          val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
+          val jobContext = new JobContextImpl(getConf(isDriverSide = true), jobId)
           val rawSplits = inputFormat.getSplits(jobContext)
 
           Array.tabulate[SparkPartition](rawSplits.size) { i =>
@@ -564,7 +560,7 @@ private[sql] object ParquetRelation extends Logging {
       parquetFilterPushDown: Boolean,
       assumeBinaryIsString: Boolean,
       assumeInt96IsTimestamp: Boolean)(job: Job): Unit = {
-    val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+    val conf = job.getConfiguration
     conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
 
     // Try to push down filters when filter push-down is enabled.
@@ -607,7 +603,7 @@ private[sql] object ParquetRelation extends Logging {
       FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
     }
 
-    overrideMinSplitSize(parquetBlockSize, SparkHadoopUtil.get.getConfigurationFromJobContext(job))
+    overrideMinSplitSize(parquetBlockSize, job.getConfiguration)
   }
 
   private[parquet] def readSchema(

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index 41fcb11..248467a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -26,8 +26,6 @@ import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext, Job}
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 
 import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.mapred.SparkHadoopMapRedUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
@@ -88,8 +86,8 @@ private[sql] class TextRelation(
       filters: Array[Filter],
       inputPaths: Array[FileStatus],
       broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
-    val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
-    val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+    val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration)
+    val conf = job.getConfiguration
     val paths = inputPaths.map(_.getPath).sortBy(_.toUri)
 
     if (paths.nonEmpty) {
@@ -138,17 +136,16 @@ private[sql] class TextRelation(
 }
 
 class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemptContext)
-  extends OutputWriter
-  with SparkHadoopMapRedUtil {
+  extends OutputWriter {
 
   private[this] val buffer = new Text()
 
   private val recordWriter: RecordWriter[NullWritable, Text] = {
     new TextOutputFormat[NullWritable, Text]() {
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
-        val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+        val configuration = context.getConfiguration
         val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
-        val taskAttemptId = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(context)
+        val taskAttemptId = context.getTaskAttemptID
         val split = taskAttemptId.getTaskID.getId
         new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index fc8ce69..d6c5d14 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -462,7 +462,7 @@ abstract class HadoopFsRelation private[sql](
           name.toLowerCase == "_temporary" || name.startsWith(".")
         }
 
-        val (dirs, files) = statuses.partition(_.isDir)
+        val (dirs, files) = statuses.partition(_.isDirectory)
 
         // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500)
         if (dirs.isEmpty) {
@@ -858,10 +858,10 @@ private[sql] object HadoopFsRelation extends Logging {
       val jobConf = new JobConf(fs.getConf, this.getClass())
       val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
       if (pathFilter != null) {
-        val (dirs, files) = fs.listStatus(status.getPath, pathFilter).partition(_.isDir)
+        val (dirs, files) = fs.listStatus(status.getPath, pathFilter).partition(_.isDirectory)
         files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
       } else {
-        val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
+        val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
         files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
       }
     }
@@ -896,7 +896,7 @@ private[sql] object HadoopFsRelation extends Logging {
       FakeFileStatus(
         status.getPath.toString,
         status.getLen,
-        status.isDir,
+        status.isDirectory,
         status.getReplication,
         status.getBlockSize,
         status.getModificationTime,

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 384ea21..5d00e73 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -380,7 +380,7 @@ class HiveContext private[hive](
 
         def calculateTableSize(fs: FileSystem, path: Path): Long = {
           val fileStatus = fs.getFileStatus(path)
-          val size = if (fileStatus.isDir) {
+          val size = if (fileStatus.isDirectory) {
             fs.listStatus(path)
               .map { status =>
                 if (!status.getPath().getName().startsWith(stagingDir)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
index 598ccde..d3da22a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
@@ -31,9 +31,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive
 import org.apache.hadoop.hive.ql.processors._
 import org.apache.hadoop.hive.ql.session.SessionState
 import org.apache.hadoop.hive.ql.{Driver, metadata}
-import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader}
 import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.util.VersionInfo
 
 import org.apache.spark.{SparkConf, SparkException, Logging}
 import org.apache.spark.sql.catalyst.expressions.Expression
@@ -65,74 +63,6 @@ private[hive] class ClientWrapper(
   extends ClientInterface
   with Logging {
 
-  overrideHadoopShims()
-
-  // !! HACK ALERT !!
-  //
-  // Internally, Hive `ShimLoader` tries to load different versions of Hadoop shims by checking
-  // major version number gathered from Hadoop jar files:
-  //
-  // - For major version number 1, load `Hadoop20SShims`, where "20S" stands for Hadoop 0.20 with
-  //   security.
-  // - For major version number 2, load `Hadoop23Shims`, where "23" stands for Hadoop 0.23.
-  //
-  // However, APIs in Hadoop 2.0.x and 2.1.x versions were in flux due to historical reasons. It
-  // turns out that Hadoop 2.0.x versions should also be used together with `Hadoop20SShims`, but
-  // `Hadoop23Shims` is chosen because the major version number here is 2.
-  //
-  // To fix this issue, we try to inspect Hadoop version via `org.apache.hadoop.utils.VersionInfo`
-  // and load `Hadoop20SShims` for Hadoop 1.x and 2.0.x versions.  If Hadoop version information is
-  // not available, we decide whether to override the shims or not by checking for existence of a
-  // probe method which doesn't exist in Hadoop 1.x or 2.0.x versions.
-  private def overrideHadoopShims(): Unit = {
-    val hadoopVersion = VersionInfo.getVersion
-    val VersionPattern = """(\d+)\.(\d+).*""".r
-
-    hadoopVersion match {
-      case null =>
-        logError("Failed to inspect Hadoop version")
-
-        // Using "Path.getPathWithoutSchemeAndAuthority" as the probe method.
-        val probeMethod = "getPathWithoutSchemeAndAuthority"
-        if (!classOf[Path].getDeclaredMethods.exists(_.getName == probeMethod)) {
-          logInfo(
-            s"Method ${classOf[Path].getCanonicalName}.$probeMethod not found, " +
-              s"we are probably using Hadoop 1.x or 2.0.x")
-          loadHadoop20SShims()
-        }
-
-      case VersionPattern(majorVersion, minorVersion) =>
-        logInfo(s"Inspected Hadoop version: $hadoopVersion")
-
-        // Loads Hadoop20SShims for 1.x and 2.0.x versions
-        val (major, minor) = (majorVersion.toInt, minorVersion.toInt)
-        if (major < 2 || (major == 2 && minor == 0)) {
-          loadHadoop20SShims()
-        }
-    }
-
-    // Logs the actual loaded Hadoop shims class
-    val loadedShimsClassName = ShimLoader.getHadoopShims.getClass.getCanonicalName
-    logInfo(s"Loaded $loadedShimsClassName for Hadoop version $hadoopVersion")
-  }
-
-  private def loadHadoop20SShims(): Unit = {
-    val hadoop20SShimsClassName = "org.apache.hadoop.hive.shims.Hadoop20SShims"
-    logInfo(s"Loading Hadoop shims $hadoop20SShimsClassName")
-
-    try {
-      val shimsField = classOf[ShimLoader].getDeclaredField("hadoopShims")
-      // scalastyle:off classforname
-      val shimsClass = Class.forName(hadoop20SShimsClassName)
-      // scalastyle:on classforname
-      val shims = classOf[HadoopShims].cast(shimsClass.newInstance())
-      shimsField.setAccessible(true)
-      shimsField.set(null, shims)
-    } catch { case cause: Throwable =>
-      throw new RuntimeException(s"Failed to load $hadoop20SShimsClassName", cause)
-    }
-  }
-
   // Circular buffer to hold what hive prints to STDOUT and ERR.  Only printed when failures occur.
   private val outputBuffer = new CircularBuffer()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org