You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/03/06 22:06:32 UTC

[spark] branch branch-3.3 updated: [SPARK-42478][SQL][3.3] Make a serializable jobTrackerId instead of a non-serializable JobID in FileWriterFactory

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 2bd20a96790 [SPARK-42478][SQL][3.3] Make a serializable jobTrackerId instead of a non-serializable JobID in FileWriterFactory
2bd20a96790 is described below

commit 2bd20a9679003743c82753f1152ea3e7da2aa96a
Author: Yikf <yi...@apache.org>
AuthorDate: Mon Mar 6 14:06:12 2023 -0800

    [SPARK-42478][SQL][3.3] Make a serializable jobTrackerId instead of a non-serializable JobID in FileWriterFactory
    
    This is a backport of https://github.com/apache/spark/pull/40064 for branch-3.3
    
    ### What changes were proposed in this pull request?
    
    Make a serializable jobTrackerId instead of a non-serializable JobID in FileWriterFactory
    
    ### Why are the changes needed?
    
    [SPARK-41448](https://issues.apache.org/jira/browse/SPARK-41448) make consistent MR job IDs in FileBatchWriter and FileFormatWriter, but it breaks a serializable issue, JobId is non-serializable.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    GA
    
    Closes #40290 from Yikf/backport-SPARK-42478-3.3.
    
    Authored-by: Yikf <yi...@apache.org>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../spark/sql/execution/datasources/v2/FileWriterFactory.scala     | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala
index ea13e2deac8..4b1a099d3ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala
@@ -30,7 +30,12 @@ case class FileWriterFactory (
     description: WriteJobDescription,
     committer: FileCommitProtocol) extends DataWriterFactory {
 
-  private val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
+  // SPARK-42478: jobId across tasks should be consistent to meet the contract
+  // expected by Hadoop committers, but `JobId` cannot be serialized.
+  // thus, persist the serializable jobTrackerID in the class and make jobId a
+  // transient lazy val which recreates it each time to ensure jobId is unique.
+  private[this] val jobTrackerID = SparkHadoopWriterUtils.createJobTrackerID(new Date)
+  @transient private lazy val jobId = SparkHadoopWriterUtils.createJobID(jobTrackerID, 0)
 
   override def createWriter(partitionId: Int, realTaskId: Long): DataWriter[InternalRow] = {
     val taskAttemptContext = createTaskAttemptContext(partitionId)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org