You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2015/04/13 16:03:10 UTC
spark git commit: [SPARK-6352] [SQL] Add DirectParquetOutputCommitter
Repository: spark
Updated Branches:
refs/heads/master 202ebf06e -> b29663eee
[SPARK-6352] [SQL] Add DirectParquetOutputCommitter
Add a DirectParquetOutputCommitter class that skips _temporary directory when saving to s3. Add new config value "spark.sql.parquet.useDirectParquetOutputCommitter" (default false) to choose between the default output committer.
Author: Pei-Lun Lee <pl...@appier.com>
Closes #5042 from ypcat/spark-6352 and squashes the following commits:
e17bf47 [Pei-Lun Lee] Merge branch 'master' of https://github.com/apache/spark into spark-6352
9ae7545 [Pei-Lun Lee] [SPARL-6352] [SQL] Change to allow custom parquet output committer.
0d540b9 [Pei-Lun Lee] [SPARK-6352] [SQL] add license
c42468c [Pei-Lun Lee] [SPARK-6352] [SQL] add test case
0fc03ca [Pei-Lun Lee] [SPARK-6532] [SQL] hide class DirectParquetOutputCommitter
769bd67 [Pei-Lun Lee] DirectParquetOutputCommitter
f75e261 [Pei-Lun Lee] DirectParquetOutputCommitter
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b29663ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b29663ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b29663ee
Branch: refs/heads/master
Commit: b29663eeea440b1d1a288d41b5ddf67e77c5bd54
Parents: 202ebf0
Author: Pei-Lun Lee <pl...@appier.com>
Authored: Mon Apr 13 21:52:00 2015 +0800
Committer: Cheng Lian <li...@databricks.com>
Committed: Mon Apr 13 21:52:00 2015 +0800
----------------------------------------------------------------------
.../parquet/DirectParquetOutputCommitter.scala | 66 ++++++++++++++++++++
.../sql/parquet/ParquetTableOperations.scala | 22 +++++++
.../spark/sql/parquet/ParquetIOSuite.scala | 21 +++++++
3 files changed, 109 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/b29663ee/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
new file mode 100644
index 0000000..25a66cb
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+
+import parquet.Log
+import parquet.hadoop.util.ContextUtil
+import parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetOutputCommitter}
+
+private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
+ extends ParquetOutputCommitter(outputPath, context) {
+ val LOG = Log.getLog(classOf[ParquetOutputCommitter])
+
+ override def getWorkPath(): Path = outputPath
+ override def abortTask(taskContext: TaskAttemptContext): Unit = {}
+ override def commitTask(taskContext: TaskAttemptContext): Unit = {}
+ override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true
+ override def setupJob(jobContext: JobContext): Unit = {}
+ override def setupTask(taskContext: TaskAttemptContext): Unit = {}
+
+ override def commitJob(jobContext: JobContext) {
+ try {
+ val configuration = ContextUtil.getConfiguration(jobContext)
+ val fileSystem = outputPath.getFileSystem(configuration)
+ val outputStatus = fileSystem.getFileStatus(outputPath)
+ val footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus)
+ try {
+ ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers)
+ if (configuration.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) {
+ val successPath = new Path(outputPath, FileOutputCommitter.SUCCEEDED_FILE_NAME)
+ fileSystem.create(successPath).close()
+ }
+ } catch {
+ case e: Exception => {
+ LOG.warn("could not write summary file for " + outputPath, e)
+ val metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE)
+ if (fileSystem.exists(metadataPath)) {
+ fileSystem.delete(metadataPath, true)
+ }
+ }
+ }
+ } catch {
+ case e: Exception => LOG.warn("could not write summary file for " + outputPath, e)
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/spark/blob/b29663ee/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 1c868da..3724bda 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -379,6 +379,8 @@ private[sql] case class InsertIntoParquetTable(
*/
private[parquet] class AppendingParquetOutputFormat(offset: Int)
extends parquet.hadoop.ParquetOutputFormat[Row] {
+ var committer: OutputCommitter = null
+
// override to accept existing directories as valid output directory
override def checkOutputSpecs(job: JobContext): Unit = {}
@@ -403,6 +405,26 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int)
private def getTaskAttemptID(context: TaskAttemptContext): TaskAttemptID = {
context.getClass.getMethod("getTaskAttemptID").invoke(context).asInstanceOf[TaskAttemptID]
}
+
+ // override to create output committer from configuration
+ override def getOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
+ if (committer == null) {
+ val output = getOutputPath(context)
+ val cls = context.getConfiguration.getClass("spark.sql.parquet.output.committer.class",
+ classOf[ParquetOutputCommitter], classOf[ParquetOutputCommitter])
+ val ctor = cls.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
+ committer = ctor.newInstance(output, context).asInstanceOf[ParquetOutputCommitter]
+ }
+ committer
+ }
+
+ // FileOutputFormat.getOutputPath takes JobConf in hadoop-1 but JobContext in hadoop-2
+ private def getOutputPath(context: TaskAttemptContext): Path = {
+ context.getConfiguration().get("mapred.output.dir") match {
+ case null => null
+ case name => new Path(name)
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/b29663ee/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index 97c0f43..4d0bf7c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -381,6 +381,27 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
}
}
}
+
+ test("SPARK-6352 DirectParquetOutputCommitter") {
+ // Write to a parquet file and let it fail.
+ // _temporary should be missing if direct output committer works.
+ try {
+ configuration.set("spark.sql.parquet.output.committer.class",
+ "org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
+ sqlContext.udf.register("div0", (x: Int) => x / 0)
+ withTempPath { dir =>
+ intercept[org.apache.spark.SparkException] {
+ sqlContext.sql("select div0(1)").saveAsParquetFile(dir.getCanonicalPath)
+ }
+ val path = new Path(dir.getCanonicalPath, "_temporary")
+ val fs = path.getFileSystem(configuration)
+ assert(!fs.exists(path))
+ }
+ }
+ finally {
+ configuration.unset("spark.sql.parquet.output.committer.class")
+ }
+ }
}
class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterAll {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org