You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/06/08 20:37:04 UTC

spark git commit: [SPARK-8121] [SQL] Fixes InsertIntoHadoopFsRelation job initialization for Hadoop 1.x (branch 1.4 backport based on https://github.com/apache/spark/pull/6669)

Repository: spark
Updated Branches:
  refs/heads/branch-1.4 a3afc2cba -> 69197c3e3


[SPARK-8121] [SQL] Fixes InsertIntoHadoopFsRelation job initialization for Hadoop 1.x (branch 1.4 backport based on https://github.com/apache/spark/pull/6669)


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/69197c3e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/69197c3e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/69197c3e

Branch: refs/heads/branch-1.4
Commit: 69197c3e382abd477e6eeb87ffbda69bfa68fa14
Parents: a3afc2c
Author: Cheng Lian <li...@databricks.com>
Authored: Mon Jun 8 11:35:30 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Mon Jun 8 11:36:42 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLConf.scala    |  1 +
 .../apache/spark/sql/parquet/newParquet.scala   |  7 +++
 .../org/apache/spark/sql/sources/commands.scala | 18 +++++--
 .../spark/sql/parquet/ParquetIOSuite.scala      | 50 +++++++++++++++++---
 4 files changed, 64 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/69197c3e/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 77c6af2..26b4e5f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -73,6 +73,7 @@ private[spark] object SQLConf {
 
   // The output committer class used by FSBasedRelation. The specified class needs to be a
   // subclass of org.apache.hadoop.mapreduce.OutputCommitter.
+  // NOTE: This property should be set in Hadoop `Configuration` rather than Spark `SQLConf`
   val OUTPUT_COMMITTER_CLASS = "spark.sql.sources.outputCommitterClass"
 
   // Whether to perform eager analysis when constructing a dataframe.

http://git-wip-us.apache.org/repos/asf/spark/blob/69197c3e/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index bf55e23..3328e6f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -212,6 +212,13 @@ private[sql] class ParquetRelation2(
         classOf[ParquetOutputCommitter],
         classOf[ParquetOutputCommitter])
 
+    if (conf.get("spark.sql.parquet.output.committer.class") == null) {
+      logInfo("Using default output committer for Parquet: " +
+        classOf[ParquetOutputCommitter].getCanonicalName)
+    } else {
+      logInfo("Using user defined output committer for Parquet: " + committerClass.getCanonicalName)
+    }
+
     conf.setClass(
       SQLConf.OUTPUT_COMMITTER_CLASS,
       committerClass,

http://git-wip-us.apache.org/repos/asf/spark/blob/69197c3e/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 9357a56..eb4e8f8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -297,12 +297,16 @@ private[sql] abstract class BaseWriterContainer(
   def driverSideSetup(): Unit = {
     setupIDs(0, 0, 0)
     setupConf()
-    taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
 
-    // This preparation must happen before initializing output format and output committer, since
-    // their initialization involves the job configuration, which can be potentially decorated in
-    // `relation.prepareJobForWrite`.
+    // Order of the following two lines is important.  For Hadoop 1, TaskAttemptContext constructor
+    // clones the Configuration object passed in.  If we initialize the TaskAttemptContext first,
+    // configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext.
+    //
+    // Also, the `prepareJobForWrite` call must happen before initializing output format and output
+    // committer, since their initialization involve the job configuration, which can be potentially
+    // decorated in `prepareJobForWrite`.
     outputWriterFactory = relation.prepareJobForWrite(job)
+    taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
 
     outputFormatClass = job.getOutputFormatClass
     outputCommitter = newOutputCommitter(taskAttemptContext)
@@ -331,6 +335,8 @@ private[sql] abstract class BaseWriterContainer(
       SQLConf.OUTPUT_COMMITTER_CLASS, null, classOf[OutputCommitter])
 
     Option(committerClass).map { clazz =>
+      logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
+
       // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
       // has an associated output committer. To override this output committer,
       // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
@@ -350,7 +356,9 @@ private[sql] abstract class BaseWriterContainer(
     }.getOrElse {
       // If output committer class is not set, we will use the one associated with the
       // file output format.
-      outputFormatClass.newInstance().getOutputCommitter(context)
+      val outputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
+      logInfo(s"Using output committer class ${outputCommitter.getClass.getCanonicalName}")
+      outputCommitter
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/69197c3e/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index dd48bb3..b00a511 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -23,16 +23,18 @@ import scala.reflect.runtime.universe.TypeTag
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
 import org.scalatest.BeforeAndAfterAll
 import parquet.example.data.simple.SimpleGroup
 import parquet.example.data.{Group, GroupWriter}
 import parquet.hadoop.api.WriteSupport
 import parquet.hadoop.api.WriteSupport.WriteContext
-import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData, CompressionCodecName}
-import parquet.hadoop.{Footer, ParquetFileWriter, ParquetWriter}
+import parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata}
+import parquet.hadoop.{Footer, ParquetFileWriter, ParquetOutputCommitter, ParquetWriter}
 import parquet.io.api.RecordConsumer
 import parquet.schema.{MessageType, MessageTypeParser}
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.expressions.Row
 import org.apache.spark.sql.catalyst.util.DateUtils
@@ -200,7 +202,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
 
     withParquetDataFrame(allNulls :: Nil) { df =>
       val rows = df.collect()
-      assert(rows.size === 1)
+      assert(rows.length === 1)
       assert(rows.head === Row(Seq.fill(5)(null): _*))
     }
   }
@@ -213,7 +215,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
 
     withParquetDataFrame(allNones :: Nil) { df =>
       val rows = df.collect()
-      assert(rows.size === 1)
+      assert(rows.length === 1)
       assert(rows.head === Row(Seq.fill(3)(null): _*))
     }
   }
@@ -383,6 +385,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
   }
 
   test("SPARK-6352 DirectParquetOutputCommitter") {
+    val clonedConf = new Configuration(configuration)
+
     // Write to a parquet file and let it fail.
     // _temporary should be missing if direct output committer works.
     try {
@@ -397,14 +401,46 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
         val fs = path.getFileSystem(configuration)
         assert(!fs.exists(path))
       }
+    } finally {
+      // Hadoop 1 doesn't have `Configuration.unset`
+      configuration.clear()
+      clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
     }
-    finally {
-      configuration.set("spark.sql.parquet.output.committer.class",
-        "parquet.hadoop.ParquetOutputCommitter")
+  }
+
+  test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overriden") {
+    withTempPath { dir =>
+      val clonedConf = new Configuration(configuration)
+
+      configuration.set(
+        SQLConf.OUTPUT_COMMITTER_CLASS, classOf[ParquetOutputCommitter].getCanonicalName)
+
+      configuration.set(
+        "spark.sql.parquet.output.committer.class",
+        classOf[BogusParquetOutputCommitter].getCanonicalName)
+
+      try {
+        val message = intercept[SparkException] {
+          sqlContext.range(0, 1).write.parquet(dir.getCanonicalPath)
+        }.getCause.getMessage
+        assert(message === "Intentional exception for testing purposes")
+      } finally {
+        // Hadoop 1 doesn't have `Configuration.unset`
+        configuration.clear()
+        clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
+      }
     }
   }
 }
 
+class BogusParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
+  extends ParquetOutputCommitter(outputPath, context) {
+
+  override def commitJob(jobContext: JobContext): Unit = {
+    sys.error("Intentional exception for testing purposes")
+  }
+}
+
 class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterAll {
   val originalConf = sqlContext.conf.parquetUseDataSourceApi
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org