You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/06/08 20:37:04 UTC
spark git commit: [SPARK-8121] [SQL] Fixes InsertIntoHadoopFsRelation
job initialization for Hadoop 1.x (branch 1.4 backport based on
https://github.com/apache/spark/pull/6669)
Repository: spark
Updated Branches:
refs/heads/branch-1.4 a3afc2cba -> 69197c3e3
[SPARK-8121] [SQL] Fixes InsertIntoHadoopFsRelation job initialization for Hadoop 1.x (branch 1.4 backport based on https://github.com/apache/spark/pull/6669)
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/69197c3e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/69197c3e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/69197c3e
Branch: refs/heads/branch-1.4
Commit: 69197c3e382abd477e6eeb87ffbda69bfa68fa14
Parents: a3afc2c
Author: Cheng Lian <li...@databricks.com>
Authored: Mon Jun 8 11:35:30 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Mon Jun 8 11:36:42 2015 -0700
----------------------------------------------------------------------
.../scala/org/apache/spark/sql/SQLConf.scala | 1 +
.../apache/spark/sql/parquet/newParquet.scala | 7 +++
.../org/apache/spark/sql/sources/commands.scala | 18 +++++--
.../spark/sql/parquet/ParquetIOSuite.scala | 50 +++++++++++++++++---
4 files changed, 64 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/69197c3e/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 77c6af2..26b4e5f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -73,6 +73,7 @@ private[spark] object SQLConf {
// The output committer class used by FSBasedRelation. The specified class needs to be a
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
+ // NOTE: This property should be set in Hadoop `Configuration` rather than Spark `SQLConf`
val OUTPUT_COMMITTER_CLASS = "spark.sql.sources.outputCommitterClass"
// Whether to perform eager analysis when constructing a dataframe.
http://git-wip-us.apache.org/repos/asf/spark/blob/69197c3e/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index bf55e23..3328e6f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -212,6 +212,13 @@ private[sql] class ParquetRelation2(
classOf[ParquetOutputCommitter],
classOf[ParquetOutputCommitter])
+ if (conf.get("spark.sql.parquet.output.committer.class") == null) {
+ logInfo("Using default output committer for Parquet: " +
+ classOf[ParquetOutputCommitter].getCanonicalName)
+ } else {
+ logInfo("Using user defined output committer for Parquet: " + committerClass.getCanonicalName)
+ }
+
conf.setClass(
SQLConf.OUTPUT_COMMITTER_CLASS,
committerClass,
http://git-wip-us.apache.org/repos/asf/spark/blob/69197c3e/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 9357a56..eb4e8f8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -297,12 +297,16 @@ private[sql] abstract class BaseWriterContainer(
def driverSideSetup(): Unit = {
setupIDs(0, 0, 0)
setupConf()
- taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
- // This preparation must happen before initializing output format and output committer, since
- // their initialization involves the job configuration, which can be potentially decorated in
- // `relation.prepareJobForWrite`.
+ // Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor
+ // clones the Configuration object passed in. If we initialize the TaskAttemptContext first,
+ // configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext.
+ //
+ // Also, the `prepareJobForWrite` call must happen before initializing output format and output
+ // committer, since their initialization involve the job configuration, which can be potentially
+ // decorated in `prepareJobForWrite`.
outputWriterFactory = relation.prepareJobForWrite(job)
+ taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
outputFormatClass = job.getOutputFormatClass
outputCommitter = newOutputCommitter(taskAttemptContext)
@@ -331,6 +335,8 @@ private[sql] abstract class BaseWriterContainer(
SQLConf.OUTPUT_COMMITTER_CLASS, null, classOf[OutputCommitter])
Option(committerClass).map { clazz =>
+ logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
+
// Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
// has an associated output committer. To override this output committer,
// we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
@@ -350,7 +356,9 @@ private[sql] abstract class BaseWriterContainer(
}.getOrElse {
// If output committer class is not set, we will use the one associated with the
// file output format.
- outputFormatClass.newInstance().getOutputCommitter(context)
+ val outputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
+ logInfo(s"Using output committer class ${outputCommitter.getClass.getCanonicalName}")
+ outputCommitter
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/69197c3e/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index dd48bb3..b00a511 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -23,16 +23,18 @@ import scala.reflect.runtime.universe.TypeTag
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.scalatest.BeforeAndAfterAll
import parquet.example.data.simple.SimpleGroup
import parquet.example.data.{Group, GroupWriter}
import parquet.hadoop.api.WriteSupport
import parquet.hadoop.api.WriteSupport.WriteContext
-import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData, CompressionCodecName}
-import parquet.hadoop.{Footer, ParquetFileWriter, ParquetWriter}
+import parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata}
+import parquet.hadoop.{Footer, ParquetFileWriter, ParquetOutputCommitter, ParquetWriter}
import parquet.io.api.RecordConsumer
import parquet.schema.{MessageType, MessageTypeParser}
+import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.util.DateUtils
@@ -200,7 +202,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
withParquetDataFrame(allNulls :: Nil) { df =>
val rows = df.collect()
- assert(rows.size === 1)
+ assert(rows.length === 1)
assert(rows.head === Row(Seq.fill(5)(null): _*))
}
}
@@ -213,7 +215,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
withParquetDataFrame(allNones :: Nil) { df =>
val rows = df.collect()
- assert(rows.size === 1)
+ assert(rows.length === 1)
assert(rows.head === Row(Seq.fill(3)(null): _*))
}
}
@@ -383,6 +385,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
}
test("SPARK-6352 DirectParquetOutputCommitter") {
+ val clonedConf = new Configuration(configuration)
+
// Write to a parquet file and let it fail.
// _temporary should be missing if direct output committer works.
try {
@@ -397,14 +401,46 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
val fs = path.getFileSystem(configuration)
assert(!fs.exists(path))
}
+ } finally {
+ // Hadoop 1 doesn't have `Configuration.unset`
+ configuration.clear()
+ clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
}
- finally {
- configuration.set("spark.sql.parquet.output.committer.class",
- "parquet.hadoop.ParquetOutputCommitter")
+ }
+
+ test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overriden") {
+ withTempPath { dir =>
+ val clonedConf = new Configuration(configuration)
+
+ configuration.set(
+ SQLConf.OUTPUT_COMMITTER_CLASS, classOf[ParquetOutputCommitter].getCanonicalName)
+
+ configuration.set(
+ "spark.sql.parquet.output.committer.class",
+ classOf[BogusParquetOutputCommitter].getCanonicalName)
+
+ try {
+ val message = intercept[SparkException] {
+ sqlContext.range(0, 1).write.parquet(dir.getCanonicalPath)
+ }.getCause.getMessage
+ assert(message === "Intentional exception for testing purposes")
+ } finally {
+ // Hadoop 1 doesn't have `Configuration.unset`
+ configuration.clear()
+ clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
+ }
}
}
}
+class BogusParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
+ extends ParquetOutputCommitter(outputPath, context) {
+
+ override def commitJob(jobContext: JobContext): Unit = {
+ sys.error("Intentional exception for testing purposes")
+ }
+}
+
class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterAll {
val originalConf = sqlContext.conf.parquetUseDataSourceApi
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org