You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/05/18 21:17:15 UTC

spark git commit: [SPARK-7567] [SQL] [follow-up] Use a new flag to set output committer based on mapreduce apis

Repository: spark
Updated Branches:
  refs/heads/master 103c863c2 -> 530397ba2


[SPARK-7567] [SQL] [follow-up] Use a new flag to set output committer based on mapreduce apis

cc liancheng marmbrus

Author: Yin Huai <yh...@databricks.com>

Closes #6130 from yhuai/directOutput and squashes the following commits:

312b07d [Yin Huai] A data source can use spark.sql.sources.outputCommitterClass to override the output committer.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/530397ba
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/530397ba
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/530397ba

Branch: refs/heads/master
Commit: 530397ba2f5c0fcabb86ba73048c95177ed0b9fc
Parents: 103c863
Author: Yin Huai <yh...@databricks.com>
Authored: Mon May 18 12:17:10 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon May 18 12:17:10 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLConf.scala    |  4 +++
 .../apache/spark/sql/parquet/newParquet.scala   |  2 +-
 .../org/apache/spark/sql/sources/commands.scala | 29 +++++++++++++++-----
 .../apache/spark/sql/sources/interfaces.scala   |  3 +-
 4 files changed, 29 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/530397ba/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 6da910e..77c6af2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -71,6 +71,10 @@ private[spark] object SQLConf {
   // Whether to perform partition discovery when loading external data sources.  Default to true.
   val PARTITION_DISCOVERY_ENABLED = "spark.sql.sources.partitionDiscovery.enabled"
 
+  // The output committer class used by FSBasedRelation. The specified class needs to be a
+  // subclass of org.apache.hadoop.mapreduce.OutputCommitter.
+  val OUTPUT_COMMITTER_CLASS = "spark.sql.sources.outputCommitterClass"
+
   // Whether to perform eager analysis when constructing a dataframe.
   // Set to false when debugging requires the ability to look at invalid query plans.
   val DATAFRAME_EAGER_ANALYSIS = "spark.sql.eagerAnalysis"

http://git-wip-us.apache.org/repos/asf/spark/blob/530397ba/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index bcbdb1e..fea54a2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -197,7 +197,7 @@ private[sql] class ParquetRelation2(
         classOf[ParquetOutputCommitter])
 
     conf.setClass(
-      "mapred.output.committer.class",
+      SQLConf.OUTPUT_COMMITTER_CLASS,
       committerClass,
       classOf[ParquetOutputCommitter])
 

http://git-wip-us.apache.org/repos/asf/spark/blob/530397ba/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index a09bb08..d54dbb0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable
 
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, FileOutputFormat}
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
 import org.apache.hadoop.util.Shell
 import parquet.hadoop.util.ContextUtil
 
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
+import org.apache.spark.sql.{SQLConf, DataFrame, SQLContext, SaveMode}
 
 private[sql] case class InsertIntoDataSource(
     logicalRelation: LogicalRelation,
@@ -287,24 +287,39 @@ private[sql] abstract class BaseWriterContainer(
   protected def getWorkPath: String = {
     outputCommitter match {
       // FileOutputCommitter writes to a temporary location returned by `getWorkPath`.
-      case f: FileOutputCommitter => f.getWorkPath.toString
+      case f: MapReduceFileOutputCommitter => f.getWorkPath.toString
       case _ => outputPath
     }
   }
 
   private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
     val committerClass = context.getConfiguration.getClass(
-      "mapred.output.committer.class", null, classOf[OutputCommitter])
+      SQLConf.OUTPUT_COMMITTER_CLASS, null, classOf[OutputCommitter])
 
     Option(committerClass).map { clazz =>
-      val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
-      ctor.newInstance(new Path(outputPath), context)
+      // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
+      // has an associated output committer. To override this output committer,
+      // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
+      // If a data source needs to override the output committer, it needs to set the
+      // output committer in prepareForWrite method.
+      if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
+        // The specified output committer is a FileOutputCommitter.
+        // So, we will use the FileOutputCommitter-specified constructor.
+        val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
+        ctor.newInstance(new Path(outputPath), context)
+      } else {
+        // The specified output committer is just a OutputCommitter.
+        // So, we will use the no-argument constructor.
+        val ctor = clazz.getDeclaredConstructor()
+        ctor.newInstance()
+      }
     }.getOrElse {
+      // If output committer class is not set, we will use the one associated with the
+      // file output format.
       outputFormatClass.newInstance().getOutputCommitter(context)
     }
   }
 
-
   private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
     this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
     this.taskId = new TaskID(this.jobId, true, splitId)

http://git-wip-us.apache.org/repos/asf/spark/blob/530397ba/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 274ab44..a82a675 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -527,7 +527,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
 
   /**
    * Prepares a write job and returns an [[OutputWriterFactory]].  Client side job preparation can
-   * be put here.  For example, user defined output committer can be configured here.
+   * be put here.  For example, user defined output committer can be configured here
+   * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
    *
    * Note that the only side effect expected here is mutating `job` via its setters.  Especially,
    * Spark SQL caches [[BaseRelation]] instances for performance, mutating relation internal states


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org