You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/06/24 18:50:11 UTC

spark git commit: [SPARK-8578] [SQL] Should ignore user defined output committer when appending data

Repository: spark
Updated Branches:
  refs/heads/master 9d36ec243 -> bba6699d0


[SPARK-8578] [SQL] Should ignore user defined output committer when appending data

https://issues.apache.org/jira/browse/SPARK-8578

It is not very safe to use a custom output committer when append data to an existing dir. This changes adds the logic to check if we are appending data, and if so, we use the output committer associated with the file output format.

Author: Yin Huai <yh...@databricks.com>

Closes #6964 from yhuai/SPARK-8578 and squashes the following commits:

43544c4 [Yin Huai] Do not use a custom output commiter when appendiing data.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bba6699d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bba6699d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bba6699d

Branch: refs/heads/master
Commit: bba6699d0e9093bc041a9a33dd31992790f32174
Parents: 9d36ec2
Author: Yin Huai <yh...@databricks.com>
Authored: Wed Jun 24 09:50:03 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed Jun 24 09:50:03 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/sources/commands.scala | 89 ++++++++++++--------
 .../sql/sources/hadoopFsRelationSuites.scala    | 83 +++++++++++++++++-
 2 files changed, 136 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bba6699d/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 215e53c..fb6173f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -96,7 +96,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
     val fs = outputPath.getFileSystem(hadoopConf)
     val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
 
-    val doInsertion = (mode, fs.exists(qualifiedOutputPath)) match {
+    val pathExists = fs.exists(qualifiedOutputPath)
+    val doInsertion = (mode, pathExists) match {
       case (SaveMode.ErrorIfExists, true) =>
         sys.error(s"path $qualifiedOutputPath already exists.")
       case (SaveMode.Overwrite, true) =>
@@ -107,6 +108,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
       case (SaveMode.Ignore, exists) =>
         !exists
     }
+    // If we are appending data to an existing dir.
+    val isAppend = (pathExists) && (mode == SaveMode.Append)
 
     if (doInsertion) {
       val job = new Job(hadoopConf)
@@ -130,10 +133,10 @@ private[sql] case class InsertIntoHadoopFsRelation(
 
       val partitionColumns = relation.partitionColumns.fieldNames
       if (partitionColumns.isEmpty) {
-        insert(new DefaultWriterContainer(relation, job), df)
+        insert(new DefaultWriterContainer(relation, job, isAppend), df)
       } else {
         val writerContainer = new DynamicPartitionWriterContainer(
-          relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME)
+          relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME, isAppend)
         insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns)
       }
     }
@@ -277,7 +280,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
 
 private[sql] abstract class BaseWriterContainer(
     @transient val relation: HadoopFsRelation,
-    @transient job: Job)
+    @transient job: Job,
+    isAppend: Boolean)
   extends SparkHadoopMapReduceUtil
   with Logging
   with Serializable {
@@ -356,34 +360,47 @@ private[sql] abstract class BaseWriterContainer(
   }
 
   private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
-    val committerClass = context.getConfiguration.getClass(
-      SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
-
-    Option(committerClass).map { clazz =>
-      logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
-
-      // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
-      // has an associated output committer. To override this output committer,
-      // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
-      // If a data source needs to override the output committer, it needs to set the
-      // output committer in prepareForWrite method.
-      if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
-        // The specified output committer is a FileOutputCommitter.
-        // So, we will use the FileOutputCommitter-specified constructor.
-        val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
-        ctor.newInstance(new Path(outputPath), context)
-      } else {
-        // The specified output committer is just a OutputCommitter.
-        // So, we will use the no-argument constructor.
-        val ctor = clazz.getDeclaredConstructor()
-        ctor.newInstance()
+    val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
+
+    if (isAppend) {
+      // If we are appending data to an existing dir, we will only use the output committer
+      // associated with the file output format since it is not safe to use a custom
+      // committer for appending. For example, in S3, direct parquet output committer may
+      // leave partial data in the destination dir when the the appending job fails.
+      logInfo(
+        s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " +
+        "for appending.")
+      defaultOutputCommitter
+    } else {
+      val committerClass = context.getConfiguration.getClass(
+        SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
+
+      Option(committerClass).map { clazz =>
+        logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
+
+        // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
+        // has an associated output committer. To override this output committer,
+        // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
+        // If a data source needs to override the output committer, it needs to set the
+        // output committer in prepareForWrite method.
+        if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
+          // The specified output committer is a FileOutputCommitter.
+          // So, we will use the FileOutputCommitter-specified constructor.
+          val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
+          ctor.newInstance(new Path(outputPath), context)
+        } else {
+          // The specified output committer is just a OutputCommitter.
+          // So, we will use the no-argument constructor.
+          val ctor = clazz.getDeclaredConstructor()
+          ctor.newInstance()
+        }
+      }.getOrElse {
+        // If output committer class is not set, we will use the one associated with the
+        // file output format.
+        logInfo(
+          s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}")
+        defaultOutputCommitter
       }
-    }.getOrElse {
-      // If output committer class is not set, we will use the one associated with the
-      // file output format.
-      val outputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
-      logInfo(s"Using output committer class ${outputCommitter.getClass.getCanonicalName}")
-      outputCommitter
     }
   }
 
@@ -433,8 +450,9 @@ private[sql] abstract class BaseWriterContainer(
 
 private[sql] class DefaultWriterContainer(
     @transient relation: HadoopFsRelation,
-    @transient job: Job)
-  extends BaseWriterContainer(relation, job) {
+    @transient job: Job,
+    isAppend: Boolean)
+  extends BaseWriterContainer(relation, job, isAppend) {
 
   @transient private var writer: OutputWriter = _
 
@@ -473,8 +491,9 @@ private[sql] class DynamicPartitionWriterContainer(
     @transient relation: HadoopFsRelation,
     @transient job: Job,
     partitionColumns: Array[String],
-    defaultPartitionName: String)
-  extends BaseWriterContainer(relation, job) {
+    defaultPartitionName: String,
+    isAppend: Boolean)
+  extends BaseWriterContainer(relation, job, isAppend) {
 
   // All output writers are created on executor side.
   @transient protected var outputWriters: mutable.Map[String, OutputWriter] = _

http://git-wip-us.apache.org/repos/asf/spark/blob/bba6699d/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index e0d8277..a16ab3a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -17,10 +17,16 @@
 
 package org.apache.spark.sql.sources
 
+import scala.collection.JavaConversions._
+
 import java.io.File
 
 import com.google.common.io.Files
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+import org.apache.parquet.hadoop.ParquetOutputCommitter
 
 import org.apache.spark.{SparkException, SparkFunSuite}
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -476,7 +482,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
   // more cores, the issue can be reproduced steadily.  Fortunately our Jenkins builder meets this
   // requirement.  We probably want to move this test case to spark-integration-tests or spark-perf
   // later.
-  test("SPARK-8406: Avoids name collision while writing Parquet files") {
+  test("SPARK-8406: Avoids name collision while writing files") {
     withTempPath { dir =>
       val path = dir.getCanonicalPath
       sqlContext
@@ -497,6 +503,81 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
       }
     }
   }
+
+  test("SPARK-8578 specified custom output committer will not be used to append data") {
+    val clonedConf = new Configuration(configuration)
+    try {
+      val df = sqlContext.range(1, 10).toDF("i")
+      withTempPath { dir =>
+        df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
+        configuration.set(
+          SQLConf.OUTPUT_COMMITTER_CLASS.key,
+          classOf[AlwaysFailOutputCommitter].getName)
+        // Since Parquet has its own output committer setting, also set it
+        // to AlwaysFailParquetOutputCommitter at here.
+        configuration.set("spark.sql.parquet.output.committer.class",
+          classOf[AlwaysFailParquetOutputCommitter].getName)
+        // Because there data already exists,
+        // this append should succeed because we will use the output committer associated
+        // with file format and AlwaysFailOutputCommitter will not be used.
+        df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
+        checkAnswer(
+          sqlContext.read
+            .format(dataSourceName)
+            .option("dataSchema", df.schema.json)
+            .load(dir.getCanonicalPath),
+          df.unionAll(df))
+
+        // This will fail because AlwaysFailOutputCommitter is used when we do append.
+        intercept[Exception] {
+          df.write.mode("overwrite").format(dataSourceName).save(dir.getCanonicalPath)
+        }
+      }
+      withTempPath { dir =>
+        configuration.set(
+          SQLConf.OUTPUT_COMMITTER_CLASS.key,
+          classOf[AlwaysFailOutputCommitter].getName)
+        // Since Parquet has its own output committer setting, also set it
+        // to AlwaysFailParquetOutputCommitter at here.
+        configuration.set("spark.sql.parquet.output.committer.class",
+          classOf[AlwaysFailParquetOutputCommitter].getName)
+        // Because there is no existing data,
+        // this append will fail because AlwaysFailOutputCommitter is used when we do append
+        // and there is no existing data.
+        intercept[Exception] {
+          df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
+        }
+      }
+    } finally {
+      // Hadoop 1 doesn't have `Configuration.unset`
+      configuration.clear()
+      clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
+    }
+  }
+}
+
+// This class is used to test SPARK-8578. We should not use any custom output committer when
+// we actually append data to an existing dir.
+class AlwaysFailOutputCommitter(
+    outputPath: Path,
+    context: TaskAttemptContext)
+  extends FileOutputCommitter(outputPath, context) {
+
+  override def commitJob(context: JobContext): Unit = {
+    sys.error("Intentional job commitment failure for testing purpose.")
+  }
+}
+
+// This class is used to test SPARK-8578. We should not use any custom output committer when
+// we actually append data to an existing dir.
+class AlwaysFailParquetOutputCommitter(
+    outputPath: Path,
+    context: TaskAttemptContext)
+  extends ParquetOutputCommitter(outputPath, context) {
+
+  override def commitJob(context: JobContext): Unit = {
+    sys.error("Intentional job commitment failure for testing purpose.")
+  }
 }
 
 class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org