You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/09/21 22:07:43 UTC

git commit: Revert "[SPARK-3595] Respect configured OutputCommitters when calling saveAsHadoopFile"

Repository: spark
Updated Branches:
  refs/heads/branch-1.1 7a766577a -> f5bf7dedb


Revert "[SPARK-3595] Respect configured OutputCommitters when calling saveAsHadoopFile"

This reverts commit 7a766577a466377bf504fa2d8c3ca454844a6ea6.

[NOTE: After some thought I decided not to merge this into 1.1 quite yet]


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5bf7ded
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5bf7ded
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5bf7ded

Branch: refs/heads/branch-1.1
Commit: f5bf7dedb1a29a2949caeb7d43a0eb43af873779
Parents: 7a76657
Author: Patrick Wendell <pw...@gmail.com>
Authored: Sun Sep 21 13:07:20 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Sun Sep 21 13:07:20 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/SparkHadoopWriter.scala    |   2 +-
 .../org/apache/spark/rdd/PairRDDFunctions.scala |   7 +-
 .../spark/rdd/PairRDDFunctionsSuite.scala       | 107 ++++---------------
 3 files changed, 25 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f5bf7ded/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index 376e69c..f670398 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -116,7 +116,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
         }
       }
     } else {
-      logInfo ("No need to commit output of task: " + taID.value)
+      logWarning ("No need to commit output of task: " + taID.value)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f5bf7ded/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 51ba8c2..f6d9d12 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -872,12 +872,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
       hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
     }
-
-    // Use configured output committer if already set
-    if (conf.getOutputCommitter == null) {
-      hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
-    }
-
+    hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
     FileOutputFormat.setOutputPath(hadoopConf,
       SparkHadoopWriter.createPathFromString(path, hadoopConf))
     saveAsHadoopDataset(hadoopConf)

http://git-wip-us.apache.org/repos/asf/spark/blob/f5bf7ded/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index e84cc69..63d3ddb 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -17,21 +17,17 @@
 
 package org.apache.spark.rdd
 
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.mapred._
-import org.apache.hadoop.util.Progressable
-
-import scala.collection.mutable.{ArrayBuffer, HashSet}
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashSet
 import scala.util.Random
 
+import org.scalatest.FunSuite
 import com.google.common.io.Files
-import org.apache.hadoop.conf.{Configurable, Configuration}
-import org.apache.hadoop.mapreduce.{JobContext => NewJobContext, OutputCommitter => NewOutputCommitter,
-OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter,
-TaskAttemptContext => NewTaskAttempContext}
-import org.apache.spark.{Partitioner, SharedSparkContext}
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.conf.{Configuration, Configurable}
+
 import org.apache.spark.SparkContext._
-import org.scalatest.FunSuite
+import org.apache.spark.{Partitioner, SharedSparkContext}
 
 class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
   test("aggregateByKey") {
@@ -471,7 +467,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
     val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
 
     // No error, non-configurable formats still work
-    pairs.saveAsNewAPIHadoopFile[NewFakeFormat]("ignored")
+    pairs.saveAsNewAPIHadoopFile[FakeFormat]("ignored")
 
     /*
       Check that configurable formats get configured:
@@ -482,17 +478,6 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
     pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
   }
 
-  test("saveAsHadoopFile should respect configured output committers") {
-    val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
-    val conf = new JobConf()
-    conf.setOutputCommitter(classOf[FakeOutputCommitter])
-
-    FakeOutputCommitter.ran = false
-    pairs.saveAsHadoopFile("ignored", pairs.keyClass, pairs.valueClass, classOf[FakeOutputFormat], conf)
-
-    assert(FakeOutputCommitter.ran, "OutputCommitter was never called")
-  }
-
   test("lookup") {
     val pairs = sc.parallelize(Array((1,2), (3,4), (5,6), (5,7)))
 
@@ -636,86 +621,40 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
   and the test will therefore throw InstantiationException when saveAsNewAPIHadoopFile
   tries to instantiate them with Class.newInstance.
  */
-
-/*
- * Original Hadoop API
- */
 class FakeWriter extends RecordWriter[Integer, Integer] {
-  override def write(key: Integer, value: Integer): Unit = ()
 
-  override def close(reporter: Reporter): Unit = ()
-}
-
-class FakeOutputCommitter() extends OutputCommitter() {
-  override def setupJob(jobContext: JobContext): Unit = ()
-
-  override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true
-
-  override def setupTask(taskContext: TaskAttemptContext): Unit = ()
-
-  override def commitTask(taskContext: TaskAttemptContext): Unit = {
-    FakeOutputCommitter.ran = true
-    ()
-  }
-
-  override def abortTask(taskContext: TaskAttemptContext): Unit = ()
-}
-
-/*
- * Used to communicate state between the test harness and the OutputCommitter.
- */
-object FakeOutputCommitter {
-  var ran = false
-}
-
-class FakeOutputFormat() extends OutputFormat[Integer, Integer]() {
-  override def getRecordWriter(
-      ignored: FileSystem,
-      job: JobConf, name: String,
-      progress: Progressable): RecordWriter[Integer, Integer] = {
-    new FakeWriter()
-  }
-
-  override def checkOutputSpecs(ignored: FileSystem, job: JobConf): Unit = ()
-}
-
-/*
- * New-style Hadoop API
- */
-class NewFakeWriter extends NewRecordWriter[Integer, Integer] {
-
-  def close(p1: NewTaskAttempContext) = ()
+  def close(p1: TaskAttemptContext) = ()
 
   def write(p1: Integer, p2: Integer) = ()
 
 }
 
-class NewFakeCommitter extends NewOutputCommitter {
-  def setupJob(p1: NewJobContext) = ()
+class FakeCommitter extends OutputCommitter {
+  def setupJob(p1: JobContext) = ()
 
-  def needsTaskCommit(p1: NewTaskAttempContext): Boolean = false
+  def needsTaskCommit(p1: TaskAttemptContext): Boolean = false
 
-  def setupTask(p1: NewTaskAttempContext) = ()
+  def setupTask(p1: TaskAttemptContext) = ()
 
-  def commitTask(p1: NewTaskAttempContext) = ()
+  def commitTask(p1: TaskAttemptContext) = ()
 
-  def abortTask(p1: NewTaskAttempContext) = ()
+  def abortTask(p1: TaskAttemptContext) = ()
 }
 
-class NewFakeFormat() extends NewOutputFormat[Integer, Integer]() {
+class FakeFormat() extends OutputFormat[Integer, Integer]() {
 
-  def checkOutputSpecs(p1: NewJobContext)  = ()
+  def checkOutputSpecs(p1: JobContext)  = ()
 
-  def getRecordWriter(p1: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = {
-    new NewFakeWriter()
+  def getRecordWriter(p1: TaskAttemptContext): RecordWriter[Integer, Integer] = {
+    new FakeWriter()
   }
 
-  def getOutputCommitter(p1: NewTaskAttempContext): NewOutputCommitter = {
-    new NewFakeCommitter()
+  def getOutputCommitter(p1: TaskAttemptContext): OutputCommitter = {
+    new FakeCommitter()
   }
 }
 
-class ConfigTestFormat() extends NewFakeFormat() with Configurable {
+class ConfigTestFormat() extends FakeFormat() with Configurable {
 
   var setConfCalled = false
   def setConf(p1: Configuration) = {
@@ -725,7 +664,7 @@ class ConfigTestFormat() extends NewFakeFormat() with Configurable {
 
   def getConf: Configuration = null
 
-  override def getRecordWriter(p1: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = {
+  override def getRecordWriter(p1: TaskAttemptContext): RecordWriter[Integer, Integer] = {
     assert(setConfCalled, "setConf was never called")
     super.getRecordWriter(p1)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org