You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/07/20 19:08:23 UTC

spark git commit: [SPARK-10063] Follow-up: remove dead code related to an old output committer

Repository: spark
Updated Branches:
  refs/heads/master 03367d7aa -> 3ac609308


[SPARK-10063] Follow-up: remove dead code related to an old output committer

## What changes were proposed in this pull request?

DirectParquetOutputCommitter was removed from Spark as it was deemed unsafe to use. We however still have some code to generate warning. This patch removes those code as well.

This is kind of a follow-up of https://github.com/apache/spark/pull/16796

## How was this patch tested?

existing tests

Author: Wenchen Fan <we...@databricks.com>

Closes #18689 from cloud-fan/minor.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3ac60930
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3ac60930
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3ac60930

Branch: refs/heads/master
Commit: 3ac60930865209bf804ec6506d9d8b0ddd613157
Parents: 03367d7
Author: Wenchen Fan <we...@databricks.com>
Authored: Thu Jul 20 12:08:20 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Thu Jul 20 12:08:20 2017 -0700

----------------------------------------------------------------------
 .../spark/internal/io/FileCommitProtocol.scala  | 21 +++------
 .../spark/internal/io/SparkHadoopWriter.scala   |  8 ++--
 .../OutputCommitCoordinatorSuite.scala          |  3 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |  6 ++-
 .../InsertIntoHadoopFsRelationCommand.scala     |  3 +-
 .../SQLHadoopMapReduceCommitProtocol.scala      | 49 +++++++++-----------
 .../execution/streaming/FileStreamSink.scala    |  3 +-
 .../sql/sources/PartitionedWriteSuite.scala     | 13 ++----
 .../hive/execution/InsertIntoHiveTable.scala    |  3 +-
 9 files changed, 43 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3ac60930/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
index 7efa941..50f51e1 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
@@ -24,12 +24,12 @@ import org.apache.spark.util.Utils
 
 
 /**
- * An interface to define how a single Spark job commits its outputs. Two notes:
+ * An interface to define how a single Spark job commits its outputs. Three notes:
  *
  * 1. Implementations must be serializable, as the committer instance instantiated on the driver
  *    will be used for tasks on executors.
- * 2. Implementations should have a constructor with either 2 or 3 arguments:
- *    (jobId: String, path: String) or (jobId: String, path: String, isAppend: Boolean).
+ * 2. Implementations should have a constructor with 2 arguments:
+ *      (jobId: String, path: String)
  * 3. A committer should not be reused across multiple Spark jobs.
  *
  * The proper call sequence is:
@@ -139,19 +139,10 @@ object FileCommitProtocol {
   /**
    * Instantiates a FileCommitProtocol using the given className.
    */
-  def instantiate(className: String, jobId: String, outputPath: String, isAppend: Boolean)
+  def instantiate(className: String, jobId: String, outputPath: String)
     : FileCommitProtocol = {
     val clazz = Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]]
-
-    // First try the one with argument (jobId: String, outputPath: String, isAppend: Boolean).
-    // If that doesn't exist, try the one with (jobId: string, outputPath: String).
-    try {
-      val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String], classOf[Boolean])
-      ctor.newInstance(jobId, outputPath, isAppend.asInstanceOf[java.lang.Boolean])
-    } catch {
-      case _: NoSuchMethodException =>
-        val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String])
-        ctor.newInstance(jobId, outputPath)
-    }
+    val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String])
+    ctor.newInstance(jobId, outputPath)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3ac60930/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
index 7d846f9..949d8c6 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
@@ -197,8 +197,8 @@ class HadoopMapRedWriteConfigUtil[K, V: ClassTag](conf: SerializableJobConf)
     FileCommitProtocol.instantiate(
       className = classOf[HadoopMapRedCommitProtocol].getName,
       jobId = jobId.toString,
-      outputPath = getConf.get("mapred.output.dir"),
-      isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
+      outputPath = getConf.get("mapred.output.dir")
+    ).asInstanceOf[HadoopMapReduceCommitProtocol]
   }
 
   // --------------------------------------------------------------------------
@@ -325,8 +325,8 @@ class HadoopMapReduceWriteConfigUtil[K, V: ClassTag](conf: SerializableConfigura
     FileCommitProtocol.instantiate(
       className = classOf[HadoopMapReduceCommitProtocol].getName,
       jobId = jobId.toString,
-      outputPath = getConf.get("mapreduce.output.fileoutputformat.outputdir"),
-      isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
+      outputPath = getConf.get("mapreduce.output.fileoutputformat.outputdir")
+    ).asInstanceOf[HadoopMapReduceCommitProtocol]
   }
 
   // --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/spark/blob/3ac60930/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index 60b5955..03b1903 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -265,8 +265,7 @@ private case class OutputCommitFunctions(tempDirPath: String) {
     val committer = FileCommitProtocol.instantiate(
       className = classOf[HadoopMapRedCommitProtocol].getName,
       jobId = jobId.value.getId.toString,
-      outputPath = jobConf.get("mapred.output.dir"),
-      isAppend = false)
+      outputPath = jobConf.get("mapred.output.dir"))
 
     // Create TaskAttemptContext.
     // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it

http://git-wip-us.apache.org/repos/asf/spark/blob/3ac60930/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 55558ca..824908d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -480,8 +480,10 @@ object SQLConf {
 
   // The output committer class used by data sources. The specified class needs to be a
   // subclass of org.apache.hadoop.mapreduce.OutputCommitter.
-  val OUTPUT_COMMITTER_CLASS =
-    buildConf("spark.sql.sources.outputCommitterClass").internal().stringConf.createOptional
+  val OUTPUT_COMMITTER_CLASS = buildConf("spark.sql.sources.outputCommitterClass")
+    .internal()
+    .stringConf
+    .createOptional
 
   val FILE_COMMIT_PROTOCOL_CLASS =
     buildConf("spark.sql.sources.commitProtocolClass")

http://git-wip-us.apache.org/repos/asf/spark/blob/3ac60930/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index c1bcfb8..9ebe1e4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -98,8 +98,7 @@ case class InsertIntoHadoopFsRelationCommand(
     val committer = FileCommitProtocol.instantiate(
       sparkSession.sessionState.conf.fileCommitProtocolClass,
       jobId = java.util.UUID.randomUUID().toString,
-      outputPath = outputPath.toString,
-      isAppend = isAppend)
+      outputPath = outputPath.toString)
 
     val doInsertion = (mode, pathExists) match {
       case (SaveMode.ErrorIfExists, true) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/3ac60930/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala
index 9b9ed28..40825a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala
@@ -29,41 +29,34 @@ import org.apache.spark.sql.internal.SQLConf
  * A variant of [[HadoopMapReduceCommitProtocol]] that allows specifying the actual
  * Hadoop output committer using an option specified in SQLConf.
  */
-class SQLHadoopMapReduceCommitProtocol(jobId: String, path: String, isAppend: Boolean)
+class SQLHadoopMapReduceCommitProtocol(jobId: String, path: String)
   extends HadoopMapReduceCommitProtocol(jobId, path) with Serializable with Logging {
 
   override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
     var committer = context.getOutputFormatClass.newInstance().getOutputCommitter(context)
 
-    if (!isAppend) {
-      // If we are appending data to an existing dir, we will only use the output committer
-      // associated with the file output format since it is not safe to use a custom
-      // committer for appending. For example, in S3, direct parquet output committer may
-      // leave partial data in the destination dir when the appending job fails.
-      // See SPARK-8578 for more details.
-      val configuration = context.getConfiguration
-      val clazz =
-        configuration.getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
+    val configuration = context.getConfiguration
+    val clazz =
+      configuration.getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
 
-      if (clazz != null) {
-        logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
+    if (clazz != null) {
+      logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
 
-        // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
-        // has an associated output committer. To override this output committer,
-        // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
-        // If a data source needs to override the output committer, it needs to set the
-        // output committer in prepareForWrite method.
-        if (classOf[FileOutputCommitter].isAssignableFrom(clazz)) {
-          // The specified output committer is a FileOutputCommitter.
-          // So, we will use the FileOutputCommitter-specified constructor.
-          val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
-          committer = ctor.newInstance(new Path(path), context)
-        } else {
-          // The specified output committer is just an OutputCommitter.
-          // So, we will use the no-argument constructor.
-          val ctor = clazz.getDeclaredConstructor()
-          committer = ctor.newInstance()
-        }
+      // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
+      // has an associated output committer. To override this output committer,
+      // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
+      // If a data source needs to override the output committer, it needs to set the
+      // output committer in prepareForWrite method.
+      if (classOf[FileOutputCommitter].isAssignableFrom(clazz)) {
+        // The specified output committer is a FileOutputCommitter.
+        // So, we will use the FileOutputCommitter-specified constructor.
+        val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
+        committer = ctor.newInstance(new Path(path), context)
+      } else {
+        // The specified output committer is just an OutputCommitter.
+        // So, we will use the no-argument constructor.
+        val ctor = clazz.getDeclaredConstructor()
+        committer = ctor.newInstance()
       }
     }
     logInfo(s"Using output committer class ${committer.getClass.getCanonicalName}")

http://git-wip-us.apache.org/repos/asf/spark/blob/3ac60930/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index 96225ec..0ed2dbe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -102,8 +102,7 @@ class FileStreamSink(
       val committer = FileCommitProtocol.instantiate(
         className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass,
         jobId = batchId.toString,
-        outputPath = path,
-        isAppend = false)
+        outputPath = path)
 
       committer match {
         case manifestCommitter: ManifestFileCommitProtocol =>

http://git-wip-us.apache.org/repos/asf/spark/blob/3ac60930/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
index 6f998aa..0fe33e8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
@@ -32,18 +32,13 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.util.Utils
 
-private class OnlyDetectCustomPathFileCommitProtocol(jobId: String, path: String, isAppend: Boolean)
-  extends SQLHadoopMapReduceCommitProtocol(jobId, path, isAppend)
+private class OnlyDetectCustomPathFileCommitProtocol(jobId: String, path: String)
+  extends SQLHadoopMapReduceCommitProtocol(jobId, path)
     with Serializable with Logging {
 
   override def newTaskTempFileAbsPath(
       taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
-    if (isAppend) {
-      throw new Exception("append data to an existed partitioned table, " +
-        "there should be no custom partition path sent to Task")
-    }
-
-    super.newTaskTempFileAbsPath(taskContext, absoluteDir, ext)
+    throw new Exception("there should be no custom partition path")
   }
 }
 
@@ -115,7 +110,7 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext {
     }
   }
 
-  test("append data to an existed partitioned table without custom partition path") {
+  test("append data to an existing partitioned table without custom partition path") {
     withTable("t") {
       withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
         classOf[OnlyDetectCustomPathFileCommitProtocol].getName) {

http://git-wip-us.apache.org/repos/asf/spark/blob/3ac60930/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index cd263e8..b9461ad 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -337,8 +337,7 @@ case class InsertIntoHiveTable(
     val committer = FileCommitProtocol.instantiate(
       sparkSession.sessionState.conf.fileCommitProtocolClass,
       jobId = java.util.UUID.randomUUID().toString,
-      outputPath = tmpLocation.toString,
-      isAppend = false)
+      outputPath = tmpLocation.toString)
 
     val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name =>
       query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org