You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/10/26 19:41:51 UTC
[spark] branch branch-3.0 updated: [SPARK-33230][SQL] Hadoop
committers to get unique job ID in "spark.sql.sources.writeJobUUID"
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 22392be [SPARK-33230][SQL] Hadoop committers to get unique job ID in "spark.sql.sources.writeJobUUID"
22392be is described below
commit 22392be4315900346ac884ad32bd6332ffc3e64e
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Mon Oct 26 12:31:05 2020 -0700
[SPARK-33230][SQL] Hadoop committers to get unique job ID in "spark.sql.sources.writeJobUUID"
### What changes were proposed in this pull request?
This reinstates the old option `spark.sql.sources.write.jobUUID` to set a unique jobId in the jobconf so that hadoop MR committers have a unique ID which is (a) consistent across tasks and workers and (b) not brittle compared to generated-timestamp job IDs. The latter matches that of what JobID requires, but as they are generated per-thread, may not always be unique within a cluster.
### Why are the changes needed?
If a committer (e.g s3a staging committer) uses job-attempt-ID as a unique ID then any two jobs started within the same second have the same ID, so can clash.
### Does this PR introduce _any_ user-facing change?
Good Q. It is "developer-facing" in the context of anyone writing a committer. But it reinstates a property which was in Spark 1.x and "went away"
### How was this patch tested?
Testing: no test here. You'd have to create a new committer which extracted the value in both job and task(s) and verified consistency. That is possible (with a task output whose records contained the UUID), but it would be pretty convoluted and a high maintenance cost.
Because it's trying to address a race condition, it's hard to regenerate the problem downstream and so verify a fix in a test run...I'll just look at the logs to see what temporary dir is being used in the cluster FS and verify it's a UUID
Closes #30141 from steveloughran/SPARK-33230-jobId.
Authored-by: Steve Loughran <st...@cloudera.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
(cherry picked from commit 02fa19f102122f06e4358cf86c5e903fda28b289)
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../apache/spark/sql/execution/datasources/FileFormatWriter.scala | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index abb88ae..a71aeb47 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -133,7 +133,7 @@ object FileFormatWriter extends Logging {
fileFormat.prepareWrite(sparkSession, job, caseInsensitiveOptions, dataSchema)
val description = new WriteJobDescription(
- uuid = UUID.randomUUID().toString,
+ uuid = UUID.randomUUID.toString,
serializableHadoopConf = new SerializableConfiguration(job.getConfiguration),
outputWriterFactory = outputWriterFactory,
allColumns = outputSpec.outputColumns,
@@ -164,6 +164,10 @@ object FileFormatWriter extends Logging {
SQLExecution.checkSQLExecutionId(sparkSession)
+ // propagate the decription UUID into the jobs, so that committers
+ // get an ID guaranteed to be unique.
+ job.getConfiguration.set("spark.sql.sources.writeJobUUID", description.uuid)
+
// This call shouldn't be put into the `try` block below because it only initializes and
// prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
committer.setupJob(job)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org