You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/06/24 18:51:24 UTC

spark git commit: [SPARK-8578] [SQL] Should ignore user defined output committer when appending data (branch 1.4)

Repository: spark
Updated Branches:
  refs/heads/branch-1.4 eafbe1345 -> 7e53ff258


[SPARK-8578] [SQL] Should ignore user defined output committer when appending data (branch 1.4)

This is https://github.com/apache/spark/pull/6964 for branch 1.4.

Author: Yin Huai <yh...@databricks.com>

Closes #6966 from yhuai/SPARK-8578-branch-1.4 and squashes the following commits:

9c3947b [Yin Huai] Do not use a custom output commiter when appendiing data.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7e53ff25
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7e53ff25
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7e53ff25

Branch: refs/heads/branch-1.4
Commit: 7e53ff25813dc6a79f728c91e6c1d4d4dfa32aab
Parents: eafbe13
Author: Yin Huai <yh...@databricks.com>
Authored: Wed Jun 24 09:51:18 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed Jun 24 09:51:18 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/sources/commands.scala | 89 ++++++++++++--------
 .../sql/sources/hadoopFsRelationSuites.scala    | 83 +++++++++++++++++-
 2 files changed, 136 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7e53ff25/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 9a75dd7..29a47f5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -97,7 +97,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
     val fs = outputPath.getFileSystem(hadoopConf)
     val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
 
-    val doInsertion = (mode, fs.exists(qualifiedOutputPath)) match {
+    val pathExists = fs.exists(qualifiedOutputPath)
+    val doInsertion = (mode, pathExists) match {
       case (SaveMode.ErrorIfExists, true) =>
         sys.error(s"path $qualifiedOutputPath already exists.")
       case (SaveMode.Overwrite, true) =>
@@ -108,6 +109,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
       case (SaveMode.Ignore, exists) =>
         !exists
     }
+    // If we are appending data to an existing dir.
+    val isAppend = (pathExists) && (mode == SaveMode.Append)
 
     if (doInsertion) {
       val job = new Job(hadoopConf)
@@ -133,10 +136,10 @@ private[sql] case class InsertIntoHadoopFsRelation(
 
       val partitionColumns = relation.partitionColumns.fieldNames
       if (partitionColumns.isEmpty) {
-        insert(new DefaultWriterContainer(relation, job), df)
+        insert(new DefaultWriterContainer(relation, job, isAppend), df)
       } else {
         val writerContainer = new DynamicPartitionWriterContainer(
-          relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME)
+          relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME, isAppend)
         insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns)
       }
     }
@@ -286,7 +289,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
 
 private[sql] abstract class BaseWriterContainer(
     @transient val relation: HadoopFsRelation,
-    @transient job: Job)
+    @transient job: Job,
+    isAppend: Boolean)
   extends SparkHadoopMapReduceUtil
   with Logging
   with Serializable {
@@ -365,34 +369,47 @@ private[sql] abstract class BaseWriterContainer(
   }
 
   private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
-    val committerClass = context.getConfiguration.getClass(
-      SQLConf.OUTPUT_COMMITTER_CLASS, null, classOf[OutputCommitter])
-
-    Option(committerClass).map { clazz =>
-      logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
-
-      // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
-      // has an associated output committer. To override this output committer,
-      // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
-      // If a data source needs to override the output committer, it needs to set the
-      // output committer in prepareForWrite method.
-      if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
-        // The specified output committer is a FileOutputCommitter.
-        // So, we will use the FileOutputCommitter-specified constructor.
-        val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
-        ctor.newInstance(new Path(outputPath), context)
-      } else {
-        // The specified output committer is just a OutputCommitter.
-        // So, we will use the no-argument constructor.
-        val ctor = clazz.getDeclaredConstructor()
-        ctor.newInstance()
+    val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
+
+    if (isAppend) {
+      // If we are appending data to an existing dir, we will only use the output committer
+      // associated with the file output format since it is not safe to use a custom
+      // committer for appending. For example, in S3, direct parquet output committer may
+      // leave partial data in the destination dir when the the appending job fails.
+      logInfo(
+        s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " +
+          "for appending.")
+      defaultOutputCommitter
+    } else {
+      val committerClass = context.getConfiguration.getClass(
+        SQLConf.OUTPUT_COMMITTER_CLASS, null, classOf[OutputCommitter])
+
+      Option(committerClass).map { clazz =>
+        logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
+
+        // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
+        // has an associated output committer. To override this output committer,
+        // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
+        // If a data source needs to override the output committer, it needs to set the
+        // output committer in prepareForWrite method.
+        if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
+          // The specified output committer is a FileOutputCommitter.
+          // So, we will use the FileOutputCommitter-specified constructor.
+          val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
+          ctor.newInstance(new Path(outputPath), context)
+        } else {
+          // The specified output committer is just a OutputCommitter.
+          // So, we will use the no-argument constructor.
+          val ctor = clazz.getDeclaredConstructor()
+          ctor.newInstance()
+        }
+      }.getOrElse {
+        // If output committer class is not set, we will use the one associated with the
+        // file output format.
+        logInfo(
+          s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}")
+        defaultOutputCommitter
       }
-    }.getOrElse {
-      // If output committer class is not set, we will use the one associated with the
-      // file output format.
-      val outputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
-      logInfo(s"Using output committer class ${outputCommitter.getClass.getCanonicalName}")
-      outputCommitter
     }
   }
 
@@ -442,8 +459,9 @@ private[sql] abstract class BaseWriterContainer(
 
 private[sql] class DefaultWriterContainer(
     @transient relation: HadoopFsRelation,
-    @transient job: Job)
-  extends BaseWriterContainer(relation, job) {
+    @transient job: Job,
+    isAppend: Boolean)
+  extends BaseWriterContainer(relation, job, isAppend) {
 
   @transient private var writer: OutputWriter = _
 
@@ -482,8 +500,9 @@ private[sql] class DynamicPartitionWriterContainer(
     @transient relation: HadoopFsRelation,
     @transient job: Job,
     partitionColumns: Array[String],
-    defaultPartitionName: String)
-  extends BaseWriterContainer(relation, job) {
+    defaultPartitionName: String,
+    isAppend: Boolean)
+  extends BaseWriterContainer(relation, job, isAppend) {
 
   // All output writers are created on executor side.
   @transient protected var outputWriters: mutable.Map[String, OutputWriter] = _

http://git-wip-us.apache.org/repos/asf/spark/blob/7e53ff25/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 5b5ed5a..619bfba 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -17,10 +17,16 @@
 
 package org.apache.spark.sql.sources
 
+import scala.collection.JavaConversions._
+
 import java.io.File
 
 import com.google.common.io.Files
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+import parquet.hadoop.ParquetOutputCommitter
 
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql._
@@ -476,7 +482,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
   // more cores, the issue can be reproduced steadily.  Fortunately our Jenkins builder meets this
   // requirement.  We probably want to move this test case to spark-integration-tests or spark-perf
   // later.
-  test("SPARK-8406: Avoids name collision while writing Parquet files") {
+  test("SPARK-8406: Avoids name collision while writing files") {
     withTempPath { dir =>
       val path = dir.getCanonicalPath
       sqlContext
@@ -497,6 +503,81 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
       }
     }
   }
+
+  test("SPARK-8578 specified custom output committer will not be used to append data") {
+    val clonedConf = new Configuration(configuration)
+    try {
+      val df = sqlContext.range(1, 10).toDF("i")
+      withTempPath { dir =>
+        df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
+        configuration.set(
+          SQLConf.OUTPUT_COMMITTER_CLASS,
+          classOf[AlwaysFailOutputCommitter].getName)
+        // Since Parquet has its own output committer setting, also set it
+        // to AlwaysFailParquetOutputCommitter at here.
+        configuration.set("spark.sql.parquet.output.committer.class",
+          classOf[AlwaysFailParquetOutputCommitter].getName)
+        // Because there data already exists,
+        // this append should succeed because we will use the output committer associated
+        // with file format and AlwaysFailOutputCommitter will not be used.
+        df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
+        checkAnswer(
+          sqlContext.read
+            .format(dataSourceName)
+            .option("dataSchema", df.schema.json)
+            .load(dir.getCanonicalPath),
+          df.unionAll(df))
+
+        // This will fail because AlwaysFailOutputCommitter is used when we do append.
+        intercept[Exception] {
+          df.write.mode("overwrite").format(dataSourceName).save(dir.getCanonicalPath)
+        }
+      }
+      withTempPath { dir =>
+        configuration.set(
+          SQLConf.OUTPUT_COMMITTER_CLASS,
+          classOf[AlwaysFailOutputCommitter].getName)
+        // Since Parquet has its own output committer setting, also set it
+        // to AlwaysFailParquetOutputCommitter at here.
+        configuration.set("spark.sql.parquet.output.committer.class",
+          classOf[AlwaysFailParquetOutputCommitter].getName)
+        // Because there is no existing data,
+        // this append will fail because AlwaysFailOutputCommitter is used when we do append
+        // and there is no existing data.
+        intercept[Exception] {
+          df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
+        }
+      }
+    } finally {
+      // Hadoop 1 doesn't have `Configuration.unset`
+      configuration.clear()
+      clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
+    }
+  }
+}
+
+// This class is used to test SPARK-8578. We should not use any custom output committer when
+// we actually append data to an existing dir.
+class AlwaysFailOutputCommitter(
+    outputPath: Path,
+    context: TaskAttemptContext)
+  extends FileOutputCommitter(outputPath, context) {
+
+  override def commitJob(context: JobContext): Unit = {
+    sys.error("Intentional job commitment failure for testing purpose.")
+  }
+}
+
+// This class is used to test SPARK-8578. We should not use any custom output committer when
+// we actually append data to an existing dir.
+class AlwaysFailParquetOutputCommitter(
+    outputPath: Path,
+    context: TaskAttemptContext)
+  extends ParquetOutputCommitter(outputPath, context) {
+
+  override def commitJob(context: JobContext): Unit = {
+    sys.error("Intentional job commitment failure for testing purpose.")
+  }
 }
 
 class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org