You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/02/27 08:56:46 UTC
[spark] branch branch-3.4 updated: [SPARK-42478] Make a serializable jobTrackerId instead of a non-serializable JobID in FileWriterFactory
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new eb8fa5b713b [SPARK-42478] Make a serializable jobTrackerId instead of a non-serializable JobID in FileWriterFactory
eb8fa5b713b is described below
commit eb8fa5b713bce4e62df60a09d8f9a59527ce55f6
Author: Yikf <yi...@apache.org>
AuthorDate: Mon Feb 27 16:56:04 2023 +0800
[SPARK-42478] Make a serializable jobTrackerId instead of a non-serializable JobID in FileWriterFactory
### What changes were proposed in this pull request?
Make a serializable jobTrackerId instead of a non-serializable JobID in FileWriterFactory
### Why are the changes needed?
[SPARK-41448](https://issues.apache.org/jira/browse/SPARK-41448) make consistent MR job IDs in FileBatchWriter and FileFormatWriter, but it breaks a serializable issue, JobId is non-serializable.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA
Closes #40064 from Yikf/write-job-id.
Authored-by: Yikf <yi...@apache.org>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit d46b15d2b23f13b65d781bb364ccde3be6679b99)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/execution/datasources/v2/FileWriterFactory.scala | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala
index ea13e2deac8..4b1a099d3ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala
@@ -30,7 +30,12 @@ case class FileWriterFactory (
description: WriteJobDescription,
committer: FileCommitProtocol) extends DataWriterFactory {
- private val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
+ // SPARK-42478: jobId across tasks should be consistent to meet the contract
+ // expected by Hadoop committers, but `JobId` cannot be serialized.
+ // thus, persist the serializable jobTrackerID in the class and make jobId a
+ // transient lazy val which recreates it each time to ensure jobId is unique.
+ private[this] val jobTrackerID = SparkHadoopWriterUtils.createJobTrackerID(new Date)
+ @transient private lazy val jobId = SparkHadoopWriterUtils.createJobID(jobTrackerID, 0)
override def createWriter(partitionId: Int, realTaskId: Long): DataWriter[InternalRow] = {
val taskAttemptContext = createTaskAttemptContext(partitionId)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org