You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/02/27 08:56:46 UTC

[spark] branch branch-3.4 updated: [SPARK-42478] Make a serializable jobTrackerId instead of a non-serializable JobID in FileWriterFactory

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new eb8fa5b713b [SPARK-42478] Make a serializable jobTrackerId instead of a non-serializable JobID in FileWriterFactory
eb8fa5b713b is described below

commit eb8fa5b713bce4e62df60a09d8f9a59527ce55f6
Author: Yikf <yi...@apache.org>
AuthorDate: Mon Feb 27 16:56:04 2023 +0800

    [SPARK-42478] Make a serializable jobTrackerId instead of a non-serializable JobID in FileWriterFactory
    
    ### What changes were proposed in this pull request?
    
    Make a serializable jobTrackerId instead of a non-serializable JobID in FileWriterFactory
    
    ### Why are the changes needed?
    
    [SPARK-41448](https://issues.apache.org/jira/browse/SPARK-41448) make consistent MR job IDs in FileBatchWriter and FileFormatWriter, but it breaks a serializable issue, JobId is non-serializable.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    GA
    
    Closes #40064 from Yikf/write-job-id.
    
    Authored-by: Yikf <yi...@apache.org>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit d46b15d2b23f13b65d781bb364ccde3be6679b99)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/execution/datasources/v2/FileWriterFactory.scala     | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala
index ea13e2deac8..4b1a099d3ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala
@@ -30,7 +30,12 @@ case class FileWriterFactory (
     description: WriteJobDescription,
     committer: FileCommitProtocol) extends DataWriterFactory {
 
-  private val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
+  // SPARK-42478: jobId across tasks should be consistent to meet the contract
+  // expected by Hadoop committers, but `JobId` cannot be serialized.
+  // thus, persist the serializable jobTrackerID in the class and make jobId a
+  // transient lazy val which recreates it each time to ensure jobId is unique.
+  private[this] val jobTrackerID = SparkHadoopWriterUtils.createJobTrackerID(new Date)
+  @transient private lazy val jobId = SparkHadoopWriterUtils.createJobID(jobTrackerID, 0)
 
   override def createWriter(partitionId: Int, realTaskId: Long): DataWriter[InternalRow] = {
     val taskAttemptContext = createTaskAttemptContext(partitionId)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org