You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/04/14 23:10:22 UTC

spark git commit: Revert "[SPARK-6352] [SQL] Add DirectParquetOutputCommitter"

Repository: spark
Updated Branches:
  refs/heads/master 4d4b24927 -> a76b921a9


Revert "[SPARK-6352] [SQL] Add DirectParquetOutputCommitter"

This reverts commit b29663eeea440b1d1a288d41b5ddf67e77c5bd54.

I'm reverting this because it broke test compilation for the Hadoop 1.x
profiles.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a76b921a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a76b921a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a76b921a

Branch: refs/heads/master
Commit: a76b921a923ac37d3c73ee18d24df4bb611daba3
Parents: 4d4b249
Author: Josh Rosen <jo...@databricks.com>
Authored: Tue Apr 14 14:07:25 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Tue Apr 14 14:10:15 2015 -0700

----------------------------------------------------------------------
 .../parquet/DirectParquetOutputCommitter.scala  | 66 --------------------
 .../sql/parquet/ParquetTableOperations.scala    | 22 -------
 .../spark/sql/parquet/ParquetIOSuite.scala      | 21 -------
 3 files changed, 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a76b921a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
deleted file mode 100644
index 25a66cb..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
-
-import parquet.Log
-import parquet.hadoop.util.ContextUtil
-import parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetOutputCommitter}
-
-private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
-  extends ParquetOutputCommitter(outputPath, context) {
-  val LOG = Log.getLog(classOf[ParquetOutputCommitter])
-
-  override def getWorkPath(): Path = outputPath
-  override def abortTask(taskContext: TaskAttemptContext): Unit = {}
-  override def commitTask(taskContext: TaskAttemptContext): Unit = {}
-  override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true
-  override def setupJob(jobContext: JobContext): Unit = {}
-  override def setupTask(taskContext: TaskAttemptContext): Unit = {}
-
-  override def commitJob(jobContext: JobContext) {
-    try {
-      val configuration = ContextUtil.getConfiguration(jobContext)
-      val fileSystem = outputPath.getFileSystem(configuration)
-      val outputStatus = fileSystem.getFileStatus(outputPath)
-      val footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus)
-      try {
-        ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers)
-        if (configuration.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) {
-          val successPath = new Path(outputPath, FileOutputCommitter.SUCCEEDED_FILE_NAME)
-          fileSystem.create(successPath).close()
-        }
-      } catch {
-        case e: Exception => {
-          LOG.warn("could not write summary file for " + outputPath, e)
-          val metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE)
-          if (fileSystem.exists(metadataPath)) {
-            fileSystem.delete(metadataPath, true)
-          }
-        }
-      }
-    } catch {
-      case e: Exception => LOG.warn("could not write summary file for " + outputPath, e)
-    }
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/spark/blob/a76b921a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 3724bda..1c868da 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -379,8 +379,6 @@ private[sql] case class InsertIntoParquetTable(
  */
 private[parquet] class AppendingParquetOutputFormat(offset: Int)
   extends parquet.hadoop.ParquetOutputFormat[Row] {
-  var committer: OutputCommitter = null
-
   // override to accept existing directories as valid output directory
   override def checkOutputSpecs(job: JobContext): Unit = {}
 
@@ -405,26 +403,6 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int)
   private def getTaskAttemptID(context: TaskAttemptContext): TaskAttemptID = {
     context.getClass.getMethod("getTaskAttemptID").invoke(context).asInstanceOf[TaskAttemptID]
   }
-
-  // override to create output committer from configuration
-  override def getOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
-    if (committer == null) {
-      val output = getOutputPath(context)
-      val cls = context.getConfiguration.getClass("spark.sql.parquet.output.committer.class",
-        classOf[ParquetOutputCommitter], classOf[ParquetOutputCommitter])
-      val ctor = cls.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
-      committer = ctor.newInstance(output, context).asInstanceOf[ParquetOutputCommitter]
-    }
-    committer
-  }
-
-  // FileOutputFormat.getOutputPath takes JobConf in hadoop-1 but JobContext in hadoop-2
-  private def getOutputPath(context: TaskAttemptContext): Path = {
-    context.getConfiguration().get("mapred.output.dir") match {
-      case null => null
-      case name => new Path(name)
-    }
-  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/a76b921a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index 4d0bf7c..97c0f43 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -381,27 +381,6 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
       }
     }
   }
-
-  test("SPARK-6352 DirectParquetOutputCommitter") {
-    // Write to a parquet file and let it fail.
-    // _temporary should be missing if direct output committer works.
-    try {
-      configuration.set("spark.sql.parquet.output.committer.class",
-        "org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
-      sqlContext.udf.register("div0", (x: Int) => x / 0)
-      withTempPath { dir =>
-        intercept[org.apache.spark.SparkException] {
-          sqlContext.sql("select div0(1)").saveAsParquetFile(dir.getCanonicalPath)
-        }
-        val path = new Path(dir.getCanonicalPath, "_temporary")
-        val fs = path.getFileSystem(configuration)
-        assert(!fs.exists(path))
-      }
-    }
-    finally {
-      configuration.unset("spark.sql.parquet.output.committer.class")
-    }
-  }
 }
 
 class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterAll {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org