You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/09 11:05:40 UTC

[GitHub] [spark] steveloughran commented on a diff in pull request #38980: [SPARK-41448] Make consistent MR job IDs in FileBatchWriter and FileFormatWriter

steveloughran commented on code in PR #38980:
URL: https://github.com/apache/spark/pull/38980#discussion_r1044322385


##########
core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala:
##########
@@ -47,11 +47,22 @@ object SparkHadoopWriterUtils {
    * @return a job ID
    */
   def createJobID(time: Date, id: Int): JobID = {
+    val jobTrackerID = createJobTrackerID(time)
+    createJobID(jobTrackerID, id)
+  }
+
+  /**
+   * Create a job ID.

Review Comment:
   how about extending the comment here by noting that the job id needs to be unique across all jobs (linking to SPARK-33402) and consistently across places used (SPARK-26873 + SPARK-41448). That way, whoever next goes near the code knows what is needed



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala:
##########
@@ -201,14 +201,14 @@ object FileFormatWriter extends Logging {
         rdd
       }
 
-      val jobIdInstant = new Date().getTime
+      val jobTrackerID = SparkHadoopWriterUtils.createJobTrackerID(new Date())

Review Comment:
   i must have missed this -but also we've not had any reports of the timstamp clash surfacing.
   
   that's probably because the s3a and abfs/gcs committers all pick up the uuid in "spark.sql.sources.writeJobUUID" in preference to anything else, and they are being generated uniquely



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala:
##########
@@ -29,6 +29,9 @@ import org.apache.spark.sql.execution.datasources.{DynamicPartitionDataSingleWri
 case class FileWriterFactory (
     description: WriteJobDescription,
     committer: FileCommitProtocol) extends DataWriterFactory {
+
+  private val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
+

Review Comment:
   I wonder if a uuid should be created here which is then passed down in that "spark.sql.sources.writeJobUUID" option in `createTaskAttemptContext()`. It would be consistent with the rest and the mapreduce manifest committer would pick it up. (so would the s3a one, but as you can't do dynamic partitioning there it's less relevant)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org