You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/03/06 22:06:32 UTC
[spark] branch branch-3.3 updated: [SPARK-42478][SQL][3.3] Make a serializable jobTrackerId instead of a non-serializable JobID in FileWriterFactory
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 2bd20a96790 [SPARK-42478][SQL][3.3] Make a serializable jobTrackerId instead of a non-serializable JobID in FileWriterFactory
2bd20a96790 is described below
commit 2bd20a9679003743c82753f1152ea3e7da2aa96a
Author: Yikf <yi...@apache.org>
AuthorDate: Mon Mar 6 14:06:12 2023 -0800
[SPARK-42478][SQL][3.3] Make a serializable jobTrackerId instead of a non-serializable JobID in FileWriterFactory
This is a backport of https://github.com/apache/spark/pull/40064 for branch-3.3
### What changes were proposed in this pull request?
Make a serializable jobTrackerId instead of a non-serializable JobID in FileWriterFactory
### Why are the changes needed?
[SPARK-41448](https://issues.apache.org/jira/browse/SPARK-41448) make consistent MR job IDs in FileBatchWriter and FileFormatWriter, but it breaks a serializable issue, JobId is non-serializable.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA
Closes #40290 from Yikf/backport-SPARK-42478-3.3.
Authored-by: Yikf <yi...@apache.org>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../spark/sql/execution/datasources/v2/FileWriterFactory.scala | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala
index ea13e2deac8..4b1a099d3ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala
@@ -30,7 +30,12 @@ case class FileWriterFactory (
description: WriteJobDescription,
committer: FileCommitProtocol) extends DataWriterFactory {
- private val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
+ // SPARK-42478: jobId across tasks should be consistent to meet the contract
+ // expected by Hadoop committers, but `JobId` cannot be serialized.
+ // thus, persist the serializable jobTrackerID in the class and make jobId a
+ // transient lazy val which recreates it each time to ensure jobId is unique.
+ private[this] val jobTrackerID = SparkHadoopWriterUtils.createJobTrackerID(new Date)
+ @transient private lazy val jobId = SparkHadoopWriterUtils.createJobID(jobTrackerID, 0)
override def createWriter(partitionId: Int, realTaskId: Long): DataWriter[InternalRow] = {
val taskAttemptContext = createTaskAttemptContext(partitionId)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org