You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/03/01 08:17:06 UTC
[kyuubi] branch master updated: [KYUUBI #4432] jobId across tasks should be consistent to meet the contract expected by Hadoop committers
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 19b4b0a3f [KYUUBI #4432] jobId across tasks should be consistent to meet the contract expected by Hadoop committers
19b4b0a3f is described below
commit 19b4b0a3fd73fb5ecb3f39264fe55d46ffe7489a
Author: Yikf <yi...@apache.org>
AuthorDate: Wed Mar 1 16:16:55 2023 +0800
[KYUUBI #4432] jobId across tasks should be consistent to meet the contract expected by Hadoop committers
### _Why are the changes needed?_
jobId across tasks should be consistent to meet the contract expected by Hadoop committers
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4432 from Yikf/jobid.
Closes #4432
4e7401c91 [Yikf] jobId across tasks should be consistent
Authored-by: Yikf <yi...@apache.org>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../connector/hive/write/FileWriterFactory.scala | 22 +++++++++++++++++++++-
1 file changed, 21 insertions(+), 1 deletion(-)
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/FileWriterFactory.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/FileWriterFactory.scala
index 6ebb55f14..3b86d43c7 100644
--- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/FileWriterFactory.scala
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/FileWriterFactory.scala
@@ -19,6 +19,7 @@ package org.apache.kyuubi.spark.connector.hive.write
import java.util.Date
+import org.apache.hadoop.mapred.JobID
import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark.internal.io.FileCommitProtocol
@@ -34,7 +35,12 @@ case class FileWriterFactory(
description: WriteJobDescription,
committer: FileCommitProtocol) extends DataWriterFactory {
- @transient private lazy val jobId = sparkHadoopWriterUtils.createJobID(new Date, 0)
+ // SPARK-42478: jobId across tasks should be consistent to meet the contract
+ // expected by Hadoop committers, but `JobId` cannot be serialized.
+ // thus, persist the serializable jobTrackerID in the class and make jobId a
+ // transient lazy val which recreates it each time to ensure jobId is unique.
+ private[this] val jobTrackerID = sparkHadoopWriterUtils.createJobTrackerID(new Date)
+ @transient private lazy val jobId = createJobID(jobTrackerID, 0)
override def createWriter(partitionId: Int, realTaskId: Long): DataWriter[InternalRow] = {
val taskAttemptContext = createTaskAttemptContext(partitionId)
@@ -59,4 +65,18 @@ case class FileWriterFactory(
new TaskAttemptContextImpl(hadoopConf, taskAttemptId)
}
+
+ /**
+ * Create a job ID.
+ *
+ * @param jobTrackerID unique job track id
+ * @param id job number
+ * @return a job ID
+ */
+ def createJobID(jobTrackerID: String, id: Int): JobID = {
+ if (id < 0) {
+ throw new IllegalArgumentException("Job number is negative")
+ }
+ new JobID(jobTrackerID, id)
+ }
}