You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/10/26 19:41:51 UTC

[spark] branch branch-3.0 updated: [SPARK-33230][SQL] Hadoop committers to get unique job ID in "spark.sql.sources.writeJobUUID"

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 22392be  [SPARK-33230][SQL] Hadoop committers to get unique job ID in "spark.sql.sources.writeJobUUID"
22392be is described below

commit 22392be4315900346ac884ad32bd6332ffc3e64e
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Mon Oct 26 12:31:05 2020 -0700

    [SPARK-33230][SQL] Hadoop committers to get unique job ID in "spark.sql.sources.writeJobUUID"
    
    ### What changes were proposed in this pull request?
    
    This reinstates the old option `spark.sql.sources.write.jobUUID` to set a unique jobId in the jobconf so that hadoop MR committers have a unique ID which is (a) consistent across tasks and workers and (b) not brittle compared to generated-timestamp job IDs. The latter matches that of what JobID requires, but as they are generated per-thread, may not always be unique within a cluster.
    
    ### Why are the changes needed?
    
    If a committer (e.g s3a staging committer) uses job-attempt-ID as a unique ID then any two jobs started within the same second have the same ID, so can clash.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Good Q. It is "developer-facing" in the context of anyone writing a committer. But it reinstates a property which was in Spark 1.x and "went away"
    
    ### How was this patch tested?
    
    Testing: no test here. You'd have to create a new committer which extracted the value in both job and task(s) and verified consistency. That is possible (with a task output whose records contained the UUID), but it would be pretty convoluted and a high maintenance cost.
    
    Because it's trying to address a race condition, it's hard to regenerate the problem downstream and so verify a fix in a test run...I'll just look at the logs to see what temporary dir is being used in the cluster FS and verify it's a UUID
    
    Closes #30141 from steveloughran/SPARK-33230-jobId.
    
    Authored-by: Steve Loughran <st...@cloudera.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit 02fa19f102122f06e4358cf86c5e903fda28b289)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../apache/spark/sql/execution/datasources/FileFormatWriter.scala   | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index abb88ae..a71aeb47 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -133,7 +133,7 @@ object FileFormatWriter extends Logging {
       fileFormat.prepareWrite(sparkSession, job, caseInsensitiveOptions, dataSchema)
 
     val description = new WriteJobDescription(
-      uuid = UUID.randomUUID().toString,
+      uuid = UUID.randomUUID.toString,
       serializableHadoopConf = new SerializableConfiguration(job.getConfiguration),
       outputWriterFactory = outputWriterFactory,
       allColumns = outputSpec.outputColumns,
@@ -164,6 +164,10 @@ object FileFormatWriter extends Logging {
 
     SQLExecution.checkSQLExecutionId(sparkSession)
 
+    // propagate the decription UUID into the jobs, so that committers
+    // get an ID guaranteed to be unique.
+    job.getConfiguration.set("spark.sql.sources.writeJobUUID", description.uuid)
+
     // This call shouldn't be put into the `try` block below because it only initializes and
     // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
     committer.setupJob(job)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org