You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/03/01 08:17:06 UTC

[kyuubi] branch master updated: [KYUUBI #4432] jobId across tasks should be consistent to meet the contract expected by Hadoop committers

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 19b4b0a3f [KYUUBI #4432] jobId across tasks should be consistent to meet the contract expected by Hadoop committers
19b4b0a3f is described below

commit 19b4b0a3fd73fb5ecb3f39264fe55d46ffe7489a
Author: Yikf <yi...@apache.org>
AuthorDate: Wed Mar 1 16:16:55 2023 +0800

    [KYUUBI #4432] jobId across tasks should be consistent to meet the contract expected by Hadoop committers
    
    ### _Why are the changes needed?_
    
    jobId across tasks should be consistent to meet the contract expected by Hadoop committers
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4432 from Yikf/jobid.
    
    Closes #4432
    
    4e7401c91 [Yikf] jobId across tasks should be consistent
    
    Authored-by: Yikf <yi...@apache.org>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../connector/hive/write/FileWriterFactory.scala   | 22 +++++++++++++++++++++-
 1 file changed, 21 insertions(+), 1 deletion(-)

diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/FileWriterFactory.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/FileWriterFactory.scala
index 6ebb55f14..3b86d43c7 100644
--- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/FileWriterFactory.scala
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/FileWriterFactory.scala
@@ -19,6 +19,7 @@ package org.apache.kyuubi.spark.connector.hive.write
 
 import java.util.Date
 
+import org.apache.hadoop.mapred.JobID
 import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskID, TaskType}
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.spark.internal.io.FileCommitProtocol
@@ -34,7 +35,12 @@ case class FileWriterFactory(
     description: WriteJobDescription,
     committer: FileCommitProtocol) extends DataWriterFactory {
 
-  @transient private lazy val jobId = sparkHadoopWriterUtils.createJobID(new Date, 0)
+  // SPARK-42478: jobId across tasks should be consistent to meet the contract
+  // expected by Hadoop committers, but `JobId` cannot be serialized.
+  // thus, persist the serializable jobTrackerID in the class and make jobId a
+  // transient lazy val which recreates it each time to ensure jobId is unique.
+  private[this] val jobTrackerID = sparkHadoopWriterUtils.createJobTrackerID(new Date)
+  @transient private lazy val jobId = createJobID(jobTrackerID, 0)
 
   override def createWriter(partitionId: Int, realTaskId: Long): DataWriter[InternalRow] = {
     val taskAttemptContext = createTaskAttemptContext(partitionId)
@@ -59,4 +65,18 @@ case class FileWriterFactory(
 
     new TaskAttemptContextImpl(hadoopConf, taskAttemptId)
   }
+
+  /**
+   * Create a job ID.
+   *
+   * @param jobTrackerID unique job track id
+   * @param id job number
+   * @return a job ID
+   */
+  def createJobID(jobTrackerID: String, id: Int): JobID = {
+    if (id < 0) {
+      throw new IllegalArgumentException("Job number is negative")
+    }
+    new JobID(jobTrackerID, id)
+  }
 }