You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/09/21 22:04:43 UTC

git commit: [SPARK-3595] Respect configured OutputCommitters when calling saveAsHadoopFile

Repository: spark
Updated Branches:
  refs/heads/master d112a6c79 -> a0454efe2


[SPARK-3595] Respect configured OutputCommitters when calling saveAsHadoopFile

Addresses the issue in https://issues.apache.org/jira/browse/SPARK-3595, namely saveAsHadoopFile hardcoding the OutputCommitter.  This is not ideal when running Spark jobs that write to S3, especially when running them from an EMR cluster where the default OutputCommitter is a DirectOutputCommitter.

Author: Ian Hummel <ia...@themodernlife.net>

Closes #2450 from themodernlife/spark-3595 and squashes the following commits:

f37a0e5 [Ian Hummel] Update based on comments from pwendell
a11d9f3 [Ian Hummel] Fix formatting
4359664 [Ian Hummel] Add an example showing usage
8b6be94 [Ian Hummel] Add ability to specify OutputCommitter, espcially useful when writing to an S3 bucket from an EMR cluster


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a0454efe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a0454efe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a0454efe

Branch: refs/heads/master
Commit: a0454efe21e5c7ffe1b9bb7b18021a5580952e69
Parents: d112a6c
Author: Ian Hummel <ia...@themodernlife.net>
Authored: Sun Sep 21 13:04:36 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Sun Sep 21 13:04:36 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/SparkHadoopWriter.scala    |   2 +-
 .../org/apache/spark/rdd/PairRDDFunctions.scala |   7 +-
 .../spark/rdd/PairRDDFunctionsSuite.scala       | 107 +++++++++++++++----
 3 files changed, 91 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a0454efe/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index f670398..376e69c 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -116,7 +116,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
         }
       }
     } else {
-      logWarning ("No need to commit output of task: " + taID.value)
+      logInfo ("No need to commit output of task: " + taID.value)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a0454efe/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index f6d9d12..51ba8c2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -872,7 +872,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
       hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
     }
-    hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
+
+    // Use configured output committer if already set
+    if (conf.getOutputCommitter == null) {
+      hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
+    }
+
     FileOutputFormat.setOutputPath(hadoopConf,
       SparkHadoopWriter.createPathFromString(path, hadoopConf))
     saveAsHadoopDataset(hadoopConf)

http://git-wip-us.apache.org/repos/asf/spark/blob/a0454efe/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index 63d3ddb..e84cc69 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -17,17 +17,21 @@
 
 package org.apache.spark.rdd
 
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashSet
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.mapred._
+import org.apache.hadoop.util.Progressable
+
+import scala.collection.mutable.{ArrayBuffer, HashSet}
 import scala.util.Random
 
-import org.scalatest.FunSuite
 import com.google.common.io.Files
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.conf.{Configuration, Configurable}
-
-import org.apache.spark.SparkContext._
+import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.mapreduce.{JobContext => NewJobContext, OutputCommitter => NewOutputCommitter,
+OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter,
+TaskAttemptContext => NewTaskAttempContext}
 import org.apache.spark.{Partitioner, SharedSparkContext}
+import org.apache.spark.SparkContext._
+import org.scalatest.FunSuite
 
 class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
   test("aggregateByKey") {
@@ -467,7 +471,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
     val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
 
     // No error, non-configurable formats still work
-    pairs.saveAsNewAPIHadoopFile[FakeFormat]("ignored")
+    pairs.saveAsNewAPIHadoopFile[NewFakeFormat]("ignored")
 
     /*
       Check that configurable formats get configured:
@@ -478,6 +482,17 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
     pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
   }
 
+  test("saveAsHadoopFile should respect configured output committers") {
+    val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
+    val conf = new JobConf()
+    conf.setOutputCommitter(classOf[FakeOutputCommitter])
+
+    FakeOutputCommitter.ran = false
+    pairs.saveAsHadoopFile("ignored", pairs.keyClass, pairs.valueClass, classOf[FakeOutputFormat], conf)
+
+    assert(FakeOutputCommitter.ran, "OutputCommitter was never called")
+  }
+
   test("lookup") {
     val pairs = sc.parallelize(Array((1,2), (3,4), (5,6), (5,7)))
 
@@ -621,40 +636,86 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
   and the test will therefore throw InstantiationException when saveAsNewAPIHadoopFile
   tries to instantiate them with Class.newInstance.
  */
+
+/*
+ * Original Hadoop API
+ */
 class FakeWriter extends RecordWriter[Integer, Integer] {
+  override def write(key: Integer, value: Integer): Unit = ()
 
-  def close(p1: TaskAttemptContext) = ()
+  override def close(reporter: Reporter): Unit = ()
+}
+
+class FakeOutputCommitter() extends OutputCommitter() {
+  override def setupJob(jobContext: JobContext): Unit = ()
+
+  override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true
+
+  override def setupTask(taskContext: TaskAttemptContext): Unit = ()
+
+  override def commitTask(taskContext: TaskAttemptContext): Unit = {
+    FakeOutputCommitter.ran = true
+    ()
+  }
+
+  override def abortTask(taskContext: TaskAttemptContext): Unit = ()
+}
+
+/*
+ * Used to communicate state between the test harness and the OutputCommitter.
+ */
+object FakeOutputCommitter {
+  var ran = false
+}
+
+class FakeOutputFormat() extends OutputFormat[Integer, Integer]() {
+  override def getRecordWriter(
+      ignored: FileSystem,
+      job: JobConf, name: String,
+      progress: Progressable): RecordWriter[Integer, Integer] = {
+    new FakeWriter()
+  }
+
+  override def checkOutputSpecs(ignored: FileSystem, job: JobConf): Unit = ()
+}
+
+/*
+ * New-style Hadoop API
+ */
+class NewFakeWriter extends NewRecordWriter[Integer, Integer] {
+
+  def close(p1: NewTaskAttempContext) = ()
 
   def write(p1: Integer, p2: Integer) = ()
 
 }
 
-class FakeCommitter extends OutputCommitter {
-  def setupJob(p1: JobContext) = ()
+class NewFakeCommitter extends NewOutputCommitter {
+  def setupJob(p1: NewJobContext) = ()
 
-  def needsTaskCommit(p1: TaskAttemptContext): Boolean = false
+  def needsTaskCommit(p1: NewTaskAttempContext): Boolean = false
 
-  def setupTask(p1: TaskAttemptContext) = ()
+  def setupTask(p1: NewTaskAttempContext) = ()
 
-  def commitTask(p1: TaskAttemptContext) = ()
+  def commitTask(p1: NewTaskAttempContext) = ()
 
-  def abortTask(p1: TaskAttemptContext) = ()
+  def abortTask(p1: NewTaskAttempContext) = ()
 }
 
-class FakeFormat() extends OutputFormat[Integer, Integer]() {
+class NewFakeFormat() extends NewOutputFormat[Integer, Integer]() {
 
-  def checkOutputSpecs(p1: JobContext)  = ()
+  def checkOutputSpecs(p1: NewJobContext)  = ()
 
-  def getRecordWriter(p1: TaskAttemptContext): RecordWriter[Integer, Integer] = {
-    new FakeWriter()
+  def getRecordWriter(p1: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = {
+    new NewFakeWriter()
   }
 
-  def getOutputCommitter(p1: TaskAttemptContext): OutputCommitter = {
-    new FakeCommitter()
+  def getOutputCommitter(p1: NewTaskAttempContext): NewOutputCommitter = {
+    new NewFakeCommitter()
   }
 }
 
-class ConfigTestFormat() extends FakeFormat() with Configurable {
+class ConfigTestFormat() extends NewFakeFormat() with Configurable {
 
   var setConfCalled = false
   def setConf(p1: Configuration) = {
@@ -664,7 +725,7 @@ class ConfigTestFormat() extends FakeFormat() with Configurable {
 
   def getConf: Configuration = null
 
-  override def getRecordWriter(p1: TaskAttemptContext): RecordWriter[Integer, Integer] = {
+  override def getRecordWriter(p1: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = {
     assert(setConfCalled, "setConf was never called")
     super.getRecordWriter(p1)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org