You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/10/05 20:19:36 UTC
git commit: [SPARK-3007][SQL] Fixes dynamic partitioning support for
lower Hadoop versions
Repository: spark
Updated Branches:
refs/heads/master a7c73130f -> 1b97a941a
[SPARK-3007][SQL] Fixes dynamic partitioning support for lower Hadoop versions
This is a follow up of #2226 and #2616 to fix Jenkins master SBT build failures for lower Hadoop versions (1.0.x and 2.0.x).
The root cause is the semantics difference of `FileSystem.globStatus()` between different versions of Hadoop, as illustrated by the following test code:
```scala
object GlobExperiments extends App {
val conf = new Configuration()
val fs = FileSystem.getLocal(conf)
fs.globStatus(new Path("/tmp/wh/*/*/*")).foreach { status =>
println(status.getPath)
}
}
```
Target directory structure:
```
/tmp/wh
├── dir0
│ ├── dir1
│ │ └── level2
│ └── level1
└── level0
```
Hadoop 2.4.1 result:
```
file:/tmp/wh/dir0/dir1/level2
```
Hadoop 1.0.4 resuet:
```
file:/tmp/wh/dir0/dir1/level2
file:/tmp/wh/dir0/level1
file:/tmp/wh/level0
```
In #2226 and #2616, we call `FileOutputCommitter.commitJob()` at the end of the job, and the `_SUCCESS` mark file is written. When working with lower Hadoop versions, due to the `globStatus()` semantics issue, `_SUCCESS` is included as a separate partition data file by `Hive.loadDynamicPartitions()`, and fails partition spec checking. The fix introduced in this PR is kind of a hack: when inserting data with dynamic partitioning, we intentionally avoid writing the `_SUCCESS` marker to workaround this issue.
Hive doesn't suffer this issue because `FileSinkOperator` doesn't call `FileOutputCommitter.commitJob()`, instead, it calls `Utilities.mvFileToFinalPath()` to cleanup the output directory and then loads it into Hive warehouse by with `loadDynamicPartitions()`/`loadPartition()`/`loadTable()`. This approach is better because it handles failed job and speculative tasks properly. We should add this step to `InsertIntoHiveTable` in another PR.
Author: Cheng Lian <li...@gmail.com>
Closes #2663 from liancheng/dp-hadoop-1-fix and squashes the following commits:
0177dae [Cheng Lian] Fixes dynamic partitioning support for lower Hadoop versions
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b97a941
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b97a941
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b97a941
Branch: refs/heads/master
Commit: 1b97a941a09a2f63d442f435c1b444d857cd6956
Parents: a7c7313
Author: Cheng Lian <li...@gmail.com>
Authored: Sun Oct 5 11:19:17 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Sun Oct 5 11:19:17 2014 -0700
----------------------------------------------------------------------
.../spark/sql/hive/hiveWriterContainers.scala | 26 +++++++++++++++++---
1 file changed, 22 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/1b97a941/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index ac5c7a8..6ccbc22 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -55,8 +55,8 @@ private[hive] class SparkHiveWriterContainer(
private var taID: SerializableWritable[TaskAttemptID] = null
@transient private var writer: FileSinkOperator.RecordWriter = null
- @transient private lazy val committer = conf.value.getOutputCommitter
- @transient private lazy val jobContext = newJobContext(conf.value, jID.value)
+ @transient protected lazy val committer = conf.value.getOutputCommitter
+ @transient protected lazy val jobContext = newJobContext(conf.value, jID.value)
@transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value)
@transient private lazy val outputFormat =
conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef,Writable]]
@@ -122,8 +122,6 @@ private[hive] class SparkHiveWriterContainer(
}
}
- // ********* Private Functions *********
-
private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {
jobID = jobId
splitID = splitId
@@ -157,12 +155,18 @@ private[hive] object SparkHiveWriterContainer {
}
}
+private[spark] object SparkHiveDynamicPartitionWriterContainer {
+ val SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = "mapreduce.fileoutputcommitter.marksuccessfuljobs"
+}
+
private[spark] class SparkHiveDynamicPartitionWriterContainer(
@transient jobConf: JobConf,
fileSinkConf: FileSinkDesc,
dynamicPartColNames: Array[String])
extends SparkHiveWriterContainer(jobConf, fileSinkConf) {
+ import SparkHiveDynamicPartitionWriterContainer._
+
private val defaultPartName = jobConf.get(
ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal)
@@ -179,6 +183,20 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
commit()
}
+ override def commitJob(): Unit = {
+ // This is a hack to avoid writing _SUCCESS mark file. In lower versions of Hadoop (e.g. 1.0.4),
+ // semantics of FileSystem.globStatus() is different from higher versions (e.g. 2.4.1) and will
+ // include _SUCCESS file when glob'ing for dynamic partition data files.
+ //
+ // Better solution is to add a step similar to what Hive FileSinkOperator.jobCloseOp does:
+ // calling something like Utilities.mvFileToFinalPath to cleanup the output directory and then
+ // load it with loadDynamicPartitions/loadPartition/loadTable.
+ val oldMarker = jobConf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)
+ jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, false)
+ super.commitJob()
+ jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker)
+ }
+
override def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = {
val dynamicPartPath = dynamicPartColNames
.zip(row.takeRight(dynamicPartColNames.length))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org