You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/11/08 17:41:05 UTC

spark git commit: [SPARK-18191][CORE] Port RDD API to use commit protocol

Repository: spark
Updated Branches:
  refs/heads/master 73feaa30e -> 9c419698f


[SPARK-18191][CORE] Port RDD API to use commit protocol

## What changes were proposed in this pull request?

This PR port RDD API to use commit protocol, the changes made here:
1. Add new internal helper class that saves an RDD using a Hadoop OutputFormat named `SparkNewHadoopWriter`, it's similar with `SparkHadoopWriter` but uses commit protocol. This class supports the newer `mapreduce` API, instead of the old `mapred` API which is supported by `SparkHadoopWriter`;
2. Rewrite `PairRDDFunctions.saveAsNewAPIHadoopDataset` function, so it uses commit protocol now.

## How was this patch tested?
Exsiting test cases.

Author: jiangxingbo <ji...@gmail.com>

Closes #15769 from jiangxb1987/rdd-commit.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9c419698
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9c419698
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9c419698

Branch: refs/heads/master
Commit: 9c419698fe110a805570031cac3387a51957d9d1
Parents: 73feaa3
Author: jiangxingbo <ji...@gmail.com>
Authored: Tue Nov 8 09:41:01 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Nov 8 09:41:01 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/SparkHadoopWriter.scala    |  25 +-
 .../io/HadoopMapReduceCommitProtocol.scala      |   6 +-
 .../io/SparkHadoopMapReduceWriter.scala         | 249 +++++++++++++++++++
 .../org/apache/spark/rdd/PairRDDFunctions.scala | 139 +----------
 .../spark/rdd/PairRDDFunctionsSuite.scala       |  20 +-
 .../datasources/FileFormatWriter.scala          |   4 +-
 .../spark/sql/hive/hiveWriterContainers.scala   |   3 +-
 .../spark/streaming/dstream/DStream.scala       |   5 +-
 .../streaming/scheduler/JobScheduler.scala      |   5 +-
 9 files changed, 280 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9c419698/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index 7f75a39..46e22b2 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -23,11 +23,11 @@ import java.text.SimpleDateFormat
 import java.util.{Date, Locale}
 
 import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred._
 import org.apache.hadoop.mapreduce.TaskType
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.SparkHadoopWriterUtils
 import org.apache.spark.mapred.SparkHadoopMapRedUtil
 import org.apache.spark.rdd.HadoopRDD
 import org.apache.spark.util.SerializableJobConf
@@ -153,29 +153,8 @@ class SparkHadoopWriter(jobConf: JobConf) extends Logging with Serializable {
     splitID = splitid
     attemptID = attemptid
 
-    jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobid))
+    jID = new SerializableWritable[JobID](SparkHadoopWriterUtils.createJobID(now, jobid))
     taID = new SerializableWritable[TaskAttemptID](
         new TaskAttemptID(new TaskID(jID.value, TaskType.MAP, splitID), attemptID))
   }
 }
-
-private[spark]
-object SparkHadoopWriter {
-  def createJobID(time: Date, id: Int): JobID = {
-    val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
-    val jobtrackerID = formatter.format(time)
-    new JobID(jobtrackerID, id)
-  }
-
-  def createPathFromString(path: String, conf: JobConf): Path = {
-    if (path == null) {
-      throw new IllegalArgumentException("Output path is null")
-    }
-    val outputPath = new Path(path)
-    val fs = outputPath.getFileSystem(conf)
-    if (fs == null) {
-      throw new IllegalArgumentException("Incorrectly formatted output path")
-    }
-    outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9c419698/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
index 66ccb6d..d643a32 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
@@ -24,7 +24,6 @@ import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 
-import org.apache.spark.SparkHadoopWriter
 import org.apache.spark.internal.Logging
 import org.apache.spark.mapred.SparkHadoopMapRedUtil
 
@@ -69,7 +68,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
 
   override def setupJob(jobContext: JobContext): Unit = {
     // Setup IDs
-    val jobId = SparkHadoopWriter.createJobID(new Date, 0)
+    val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
     val taskId = new TaskID(jobId, TaskType.MAP, 0)
     val taskAttemptId = new TaskAttemptID(taskId, 0)
 
@@ -108,4 +107,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
   override def abortTask(taskContext: TaskAttemptContext): Unit = {
     committer.abortTask(taskContext)
   }
+
+  /** Whether we are using a direct output committer */
+  def isDirectOutput(): Boolean = committer.getClass.getSimpleName.contains("Direct")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c419698/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
new file mode 100644
index 0000000..a405c44
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io
+
+import java.text.SimpleDateFormat
+import java.util.{Date, Locale}
+
+import scala.reflect.ClassTag
+import scala.util.DynamicVariable
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.{JobConf, JobID}
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+
+import org.apache.spark.{SparkConf, SparkException, TaskContext}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.OutputMetrics
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.{SerializableConfiguration, Utils}
+
+/**
+ * A helper object that saves an RDD using a Hadoop OutputFormat
+ * (from the newer mapreduce API, not the old mapred API).
+ */
+private[spark]
+object SparkHadoopMapReduceWriter extends Logging {
+
+  /**
+   * Basic work flow of this command is:
+   * 1. Driver side setup, prepare the data source and hadoop configuration for the write job to
+   *    be issued.
+   * 2. Issues a write job consists of one or more executor side tasks, each of which writes all
+   *    rows within an RDD partition.
+   * 3. If no exception is thrown in a task, commits that task, otherwise aborts that task;  If any
+   *    exception is thrown during task commitment, also aborts that task.
+   * 4. If all tasks are committed, commit the job, otherwise aborts the job;  If any exception is
+   *    thrown during job commitment, also aborts the job.
+   */
+  def write[K, V: ClassTag](
+      rdd: RDD[(K, V)],
+      hadoopConf: Configuration): Unit = {
+    // Extract context and configuration from RDD.
+    val sparkContext = rdd.context
+    val stageId = rdd.id
+    val sparkConf = rdd.conf
+    val conf = new SerializableConfiguration(hadoopConf)
+
+    // Set up a job.
+    val jobTrackerId = SparkHadoopWriterUtils.createJobTrackerID(new Date())
+    val jobAttemptId = new TaskAttemptID(jobTrackerId, stageId, TaskType.MAP, 0, 0)
+    val jobContext = new TaskAttemptContextImpl(conf.value, jobAttemptId)
+    val format = jobContext.getOutputFormatClass
+
+    if (SparkHadoopWriterUtils.isOutputSpecValidationEnabled(sparkConf)) {
+      // FileOutputFormat ignores the filesystem parameter
+      val jobFormat = format.newInstance
+      jobFormat.checkOutputSpecs(jobContext)
+    }
+
+    val committer = FileCommitProtocol.instantiate(
+      className = classOf[HadoopMapReduceCommitProtocol].getName,
+      jobId = stageId.toString,
+      outputPath = conf.value.get("mapred.output.dir"),
+      isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
+    committer.setupJob(jobContext)
+
+    // When speculation is on and output committer class name contains "Direct", we should warn
+    // users that they may loss data if they are using a direct output committer.
+    if (SparkHadoopWriterUtils.isSpeculationEnabled(sparkConf) && committer.isDirectOutput) {
+      val warningMessage =
+        s"$committer may be an output committer that writes data directly to " +
+          "the final location. Because speculation is enabled, this output committer may " +
+          "cause data loss (see the case in SPARK-10063). If possible, please use an output " +
+          "committer that does not have this behavior (e.g. FileOutputCommitter)."
+      logWarning(warningMessage)
+    }
+
+    // Try to write all RDD partitions as a Hadoop OutputFormat.
+    try {
+      val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => {
+        executeTask(
+          context = context,
+          jobTrackerId = jobTrackerId,
+          sparkStageId = context.stageId,
+          sparkPartitionId = context.partitionId,
+          sparkAttemptNumber = context.attemptNumber,
+          committer = committer,
+          hadoopConf = conf.value,
+          outputFormat = format.asInstanceOf[Class[OutputFormat[K, V]]],
+          iterator = iter)
+      })
+
+      committer.commitJob(jobContext, ret)
+      logInfo(s"Job ${jobContext.getJobID} committed.")
+    } catch {
+      case cause: Throwable =>
+        logError(s"Aborting job ${jobContext.getJobID}.", cause)
+        committer.abortJob(jobContext)
+        throw new SparkException("Job aborted.", cause)
+    }
+  }
+
+  /** Write a RDD partition out in a single Spark task. */
+  private def executeTask[K, V: ClassTag](
+      context: TaskContext,
+      jobTrackerId: String,
+      sparkStageId: Int,
+      sparkPartitionId: Int,
+      sparkAttemptNumber: Int,
+      committer: FileCommitProtocol,
+      hadoopConf: Configuration,
+      outputFormat: Class[_ <: OutputFormat[K, V]],
+      iterator: Iterator[(K, V)]): TaskCommitMessage = {
+    // Set up a task.
+    val attemptId = new TaskAttemptID(jobTrackerId, sparkStageId, TaskType.REDUCE,
+      sparkPartitionId, sparkAttemptNumber)
+    val taskContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
+    committer.setupTask(taskContext)
+
+    val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] =
+      SparkHadoopWriterUtils.initHadoopOutputMetrics(context)
+
+    // Initiate the writer.
+    val taskFormat = outputFormat.newInstance
+    val writer = taskFormat.getRecordWriter(taskContext)
+      .asInstanceOf[RecordWriter[K, V]]
+    require(writer != null, "Unable to obtain RecordWriter")
+    var recordsWritten = 0L
+
+    // Write all rows in RDD partition.
+    try {
+      val ret = Utils.tryWithSafeFinallyAndFailureCallbacks {
+        while (iterator.hasNext) {
+          val pair = iterator.next()
+          writer.write(pair._1, pair._2)
+
+          // Update bytes written metric every few records
+          SparkHadoopWriterUtils.maybeUpdateOutputMetrics(
+            outputMetricsAndBytesWrittenCallback, recordsWritten)
+          recordsWritten += 1
+        }
+
+        committer.commitTask(taskContext)
+      }(catchBlock = {
+        committer.abortTask(taskContext)
+        logError(s"Task ${taskContext.getTaskAttemptID} aborted.")
+      }, finallyBlock = writer.close(taskContext))
+
+      outputMetricsAndBytesWrittenCallback.foreach {
+        case (om, callback) =>
+          om.setBytesWritten(callback())
+          om.setRecordsWritten(recordsWritten)
+      }
+
+      ret
+    } catch {
+      case t: Throwable =>
+        throw new SparkException("Task failed while writing rows", t)
+    }
+  }
+}
+
+private[spark]
+object SparkHadoopWriterUtils {
+
+  private val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256
+
+  def createJobID(time: Date, id: Int): JobID = {
+    val jobtrackerID = createJobTrackerID(time)
+    new JobID(jobtrackerID, id)
+  }
+
+  def createJobTrackerID(time: Date): String = {
+    new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(time)
+  }
+
+  def createPathFromString(path: String, conf: JobConf): Path = {
+    if (path == null) {
+      throw new IllegalArgumentException("Output path is null")
+    }
+    val outputPath = new Path(path)
+    val fs = outputPath.getFileSystem(conf)
+    if (fs == null) {
+      throw new IllegalArgumentException("Incorrectly formatted output path")
+    }
+    outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+  }
+
+  // Note: this needs to be a function instead of a 'val' so that the disableOutputSpecValidation
+  // setting can take effect:
+  def isOutputSpecValidationEnabled(conf: SparkConf): Boolean = {
+    val validationDisabled = disableOutputSpecValidation.value
+    val enabledInConf = conf.getBoolean("spark.hadoop.validateOutputSpecs", true)
+    enabledInConf && !validationDisabled
+  }
+
+  def isSpeculationEnabled(conf: SparkConf): Boolean = {
+    conf.getBoolean("spark.speculation", false)
+  }
+
+  // TODO: these don't seem like the right abstractions.
+  // We should abstract the duplicate code in a less awkward way.
+
+  // return type: (output metrics, bytes written callback), defined only if the latter is defined
+  def initHadoopOutputMetrics(
+      context: TaskContext): Option[(OutputMetrics, () => Long)] = {
+    val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback()
+    bytesWrittenCallback.map { b =>
+      (context.taskMetrics().outputMetrics, b)
+    }
+  }
+
+  def maybeUpdateOutputMetrics(
+      outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)],
+      recordsWritten: Long): Unit = {
+    if (recordsWritten % RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) {
+      outputMetricsAndBytesWrittenCallback.foreach {
+        case (om, callback) =>
+          om.setBytesWritten(callback())
+          om.setRecordsWritten(recordsWritten)
+      }
+    }
+  }
+
+  /**
+   * Allows for the `spark.hadoop.validateOutputSpecs` checks to be disabled on a case-by-case
+   * basis; see SPARK-4835 for more details.
+   */
+  val disableOutputSpecValidation: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9c419698/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 67baad1..f9b9631 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -18,33 +18,31 @@
 package org.apache.spark.rdd
 
 import java.nio.ByteBuffer
-import java.text.SimpleDateFormat
-import java.util.{Date, HashMap => JHashMap, Locale}
+import java.util.{HashMap => JHashMap}
 
 import scala.collection.{mutable, Map}
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag
-import scala.util.DynamicVariable
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
-import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.io.compress.CompressionCodec
 import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
-import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, TaskAttemptID, TaskType}
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat}
 
 import org.apache.spark._
 import org.apache.spark.Partitioner.defaultPartitioner
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.executor.OutputMetrics
+import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol, SparkHadoopMapReduceWriter, SparkHadoopWriterUtils}
 import org.apache.spark.internal.Logging
 import org.apache.spark.partial.{BoundedDouble, PartialResult}
 import org.apache.spark.serializer.Serializer
-import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.util.Utils
 import org.apache.spark.util.collection.CompactBuffer
 import org.apache.spark.util.random.StratifiedSamplingUtils
 
@@ -1060,7 +1058,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     }
 
     FileOutputFormat.setOutputPath(hadoopConf,
-      SparkHadoopWriter.createPathFromString(path, hadoopConf))
+      SparkHadoopWriterUtils.createPathFromString(path, hadoopConf))
     saveAsHadoopDataset(hadoopConf)
   }
 
@@ -1076,80 +1074,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * result of using direct output committer with speculation enabled.
    */
   def saveAsNewAPIHadoopDataset(conf: Configuration): Unit = self.withScope {
-    // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
-    val hadoopConf = conf
-    val job = NewAPIHadoopJob.getInstance(hadoopConf)
-    val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
-    val jobtrackerID = formatter.format(new Date())
-    val stageId = self.id
-    val jobConfiguration = job.getConfiguration
-    val wrappedConf = new SerializableConfiguration(jobConfiguration)
-    val outfmt = job.getOutputFormatClass
-    val jobFormat = outfmt.newInstance
-
-    if (isOutputSpecValidationEnabled) {
-      // FileOutputFormat ignores the filesystem parameter
-      jobFormat.checkOutputSpecs(job)
-    }
-
-    val writeShard = (context: TaskContext, iter: Iterator[(K, V)]) => {
-      val config = wrappedConf.value
-      /* "reduce task" <split #> <attempt # = spark task #> */
-      val attemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.REDUCE, context.partitionId,
-        context.attemptNumber)
-      val hadoopContext = new TaskAttemptContextImpl(config, attemptId)
-      val format = outfmt.newInstance
-      format match {
-        case c: Configurable => c.setConf(config)
-        case _ => ()
-      }
-      val committer = format.getOutputCommitter(hadoopContext)
-      committer.setupTask(hadoopContext)
-
-      val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] =
-        initHadoopOutputMetrics(context)
-
-      val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K, V]]
-      require(writer != null, "Unable to obtain RecordWriter")
-      var recordsWritten = 0L
-      Utils.tryWithSafeFinallyAndFailureCallbacks {
-        while (iter.hasNext) {
-          val pair = iter.next()
-          writer.write(pair._1, pair._2)
-
-          // Update bytes written metric every few records
-          maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback, recordsWritten)
-          recordsWritten += 1
-        }
-      }(finallyBlock = writer.close(hadoopContext))
-      committer.commitTask(hadoopContext)
-      outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) =>
-        om.setBytesWritten(callback())
-        om.setRecordsWritten(recordsWritten)
-      }
-      1
-    } : Int
-
-    val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.MAP, 0, 0)
-    val jobTaskContext = new TaskAttemptContextImpl(wrappedConf.value, jobAttemptId)
-    val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
-
-    // When speculation is on and output committer class name contains "Direct", we should warn
-    // users that they may loss data if they are using a direct output committer.
-    val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
-    val outputCommitterClass = jobCommitter.getClass.getSimpleName
-    if (speculationEnabled && outputCommitterClass.contains("Direct")) {
-      val warningMessage =
-        s"$outputCommitterClass may be an output committer that writes data directly to " +
-          "the final location. Because speculation is enabled, this output committer may " +
-          "cause data loss (see the case in SPARK-10063). If possible, please use an output " +
-          "committer that does not have this behavior (e.g. FileOutputCommitter)."
-      logWarning(warningMessage)
-    }
-
-    jobCommitter.setupJob(jobTaskContext)
-    self.context.runJob(self, writeShard)
-    jobCommitter.commitJob(jobTaskContext)
+    SparkHadoopMapReduceWriter.write(
+      rdd = self,
+      hadoopConf = conf)
   }
 
   /**
@@ -1178,7 +1105,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
       valueClass.getSimpleName + ")")
 
-    if (isOutputSpecValidationEnabled) {
+    if (SparkHadoopWriterUtils.isOutputSpecValidationEnabled(self.conf)) {
       // FileOutputFormat ignores the filesystem parameter
       val ignoredFs = FileSystem.get(hadoopConf)
       hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)
@@ -1193,7 +1120,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt
 
       val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] =
-        initHadoopOutputMetrics(context)
+        SparkHadoopWriterUtils.initHadoopOutputMetrics(context)
 
       writer.setup(context.stageId, context.partitionId, taskAttemptId)
       writer.open()
@@ -1205,7 +1132,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
           writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
 
           // Update bytes written metric every few records
-          maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback, recordsWritten)
+          SparkHadoopWriterUtils.maybeUpdateOutputMetrics(
+            outputMetricsAndBytesWrittenCallback, recordsWritten)
           recordsWritten += 1
         }
       }(finallyBlock = writer.close())
@@ -1220,29 +1148,6 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     writer.commitJob()
   }
 
-  // TODO: these don't seem like the right abstractions.
-  // We should abstract the duplicate code in a less awkward way.
-
-  // return type: (output metrics, bytes written callback), defined only if the latter is defined
-  private def initHadoopOutputMetrics(
-      context: TaskContext): Option[(OutputMetrics, () => Long)] = {
-    val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback()
-    bytesWrittenCallback.map { b =>
-      (context.taskMetrics().outputMetrics, b)
-    }
-  }
-
-  private def maybeUpdateOutputMetrics(
-      outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)],
-      recordsWritten: Long): Unit = {
-    if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) {
-      outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) =>
-        om.setBytesWritten(callback())
-        om.setRecordsWritten(recordsWritten)
-      }
-    }
-  }
-
   /**
    * Return an RDD with the keys of each tuple.
    */
@@ -1258,22 +1163,4 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
   private[spark] def valueClass: Class[_] = vt.runtimeClass
 
   private[spark] def keyOrdering: Option[Ordering[K]] = Option(ord)
-
-  // Note: this needs to be a function instead of a 'val' so that the disableOutputSpecValidation
-  // setting can take effect:
-  private def isOutputSpecValidationEnabled: Boolean = {
-    val validationDisabled = PairRDDFunctions.disableOutputSpecValidation.value
-    val enabledInConf = self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)
-    enabledInConf && !validationDisabled
-  }
-}
-
-private[spark] object PairRDDFunctions {
-  val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256
-
-  /**
-   * Allows for the `spark.hadoop.validateOutputSpecs` checks to be disabled on a case-by-case
-   * basis; see SPARK-4835 for more details.
-   */
-  val disableOutputSpecValidation: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c419698/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index b0d69de..fe547d4 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -509,21 +509,6 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
       (2, ArrayBuffer(1))))
   }
 
-  test("saveNewAPIHadoopFile should call setConf if format is configurable") {
-    val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
-
-    // No error, non-configurable formats still work
-    pairs.saveAsNewAPIHadoopFile[NewFakeFormat]("ignored")
-
-    /*
-      Check that configurable formats get configured:
-      ConfigTestFormat throws an exception if we try to write
-      to it when setConf hasn't been called first.
-      Assertion is in ConfigTestFormat.getRecordWriter.
-     */
-    pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
-  }
-
   test("saveAsHadoopFile should respect configured output committers") {
     val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
     val conf = new JobConf()
@@ -544,7 +529,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
     val e = intercept[SparkException] {
       pairs.saveAsNewAPIHadoopFile[NewFakeFormatWithCallback]("ignored")
     }
-    assert(e.getMessage contains "failed to write")
+    assert(e.getCause.getMessage contains "failed to write")
 
     assert(FakeWriterWithCallback.calledBy === "write,callback,close")
     assert(FakeWriterWithCallback.exception != null, "exception should be captured")
@@ -725,8 +710,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
 }
 
 /*
-  These classes are fakes for testing
-    "saveNewAPIHadoopFile should call setConf if format is configurable".
+  These classes are fakes for testing saveAsHadoopFile/saveNewAPIHadoopFile.
   Unfortunately, they have to be top level classes, and not defined in
   the test method, because otherwise Scala won't generate no-args constructors
   and the test will therefore throw InstantiationException when saveAsNewAPIHadoopFile

http://git-wip-us.apache.org/repos/asf/spark/blob/9c419698/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index e404dcd..fa7fe14 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 
 import org.apache.spark._
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.io.FileCommitProtocol
+import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils}
 import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
 import org.apache.spark.sql.{Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
@@ -153,7 +153,7 @@ object FileFormatWriter extends Logging {
       committer: FileCommitProtocol,
       iterator: Iterator[InternalRow]): (TaskCommitMessage, Set[String]) = {
 
-    val jobId = SparkHadoopWriter.createJobID(new Date, sparkStageId)
+    val jobId = SparkHadoopWriterUtils.createJobID(new Date, sparkStageId)
     val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
     val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9c419698/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index e53c3e4..a34e2e7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.TaskType
 
 import org.apache.spark._
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.SparkHadoopWriterUtils
 import org.apache.spark.mapred.SparkHadoopMapRedUtil
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -142,7 +143,7 @@ private[hive] class SparkHiveWriterContainer(
     splitID = splitId
     attemptID = attemptId
 
-    jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobId))
+    jID = new SerializableWritable[JobID](SparkHadoopWriterUtils.createJobID(now, jobId))
     taID = new SerializableWritable[TaskAttemptID](
       new TaskAttemptID(new TaskID(jID.value, TaskType.MAP, splitID), attemptID))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c419698/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index fa15a0b..7e0a2ca 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -27,7 +27,8 @@ import scala.util.matching.Regex
 
 import org.apache.spark.{SparkContext, SparkException}
 import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.{BlockRDD, PairRDDFunctions, RDD, RDDOperationScope}
+import org.apache.spark.internal.io.SparkHadoopWriterUtils
+import org.apache.spark.rdd.{BlockRDD, RDD, RDDOperationScope}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.StreamingContext.rddToFileName
@@ -337,7 +338,7 @@ abstract class DStream[T: ClassTag] (
           // scheduler, since we may need to write output to an existing directory during checkpoint
           // recovery; see SPARK-4835 for more details. We need to have this call here because
           // compute() might cause Spark jobs to be launched.
-          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
+          SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
             compute(time)
           }
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c419698/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 98e0993..b7d114b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -26,7 +26,8 @@ import org.apache.commons.lang3.SerializationUtils
 
 import org.apache.spark.ExecutorAllocationClient
 import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.{PairRDDFunctions, RDD}
+import org.apache.spark.internal.io.SparkHadoopWriterUtils
+import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.api.python.PythonDStream
 import org.apache.spark.streaming.ui.UIUtils
@@ -250,7 +251,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
           // Disable checks for existing output directories in jobs launched by the streaming
           // scheduler, since we may need to write output to an existing directory during checkpoint
           // recovery; see SPARK-4835 for more details.
-          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
+          SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
             job.run()
           }
           _eventLoop = eventLoop


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org