You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/02/14 16:25:56 UTC

[spark] branch master updated: [SPARK-26873][SQL] Use a consistent timestamp to build Hadoop Job IDs.

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 33334e2  [SPARK-26873][SQL] Use a consistent timestamp to build Hadoop Job IDs.
33334e2 is described below

commit 33334e2728f8d2e4cf7d542049435b589ed05a5e
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Thu Feb 14 08:25:33 2019 -0800

    [SPARK-26873][SQL] Use a consistent timestamp to build Hadoop Job IDs.
    
    ## What changes were proposed in this pull request?
    
    Updates FileFormatWriter to create a consistent Hadoop Job ID for a write.
    
    ## How was this patch tested?
    
    Existing tests for regressions.
    
    Closes #23777 from rdblue/SPARK-26873-fix-file-format-writer-job-ids.
    
    Authored-by: Ryan Blue <bl...@apache.org>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 .../apache/spark/sql/execution/datasources/FileFormatWriter.scala    | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index a8d24ce..a9de649 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -162,12 +162,14 @@ object FileFormatWriter extends Logging {
         rdd
       }
 
+      val jobIdInstant = new Date().getTime
       val ret = new Array[WriteTaskResult](rddWithNonEmptyPartitions.partitions.length)
       sparkSession.sparkContext.runJob(
         rddWithNonEmptyPartitions,
         (taskContext: TaskContext, iter: Iterator[InternalRow]) => {
           executeTask(
             description = description,
+            jobIdInstant = jobIdInstant,
             sparkStageId = taskContext.stageId(),
             sparkPartitionId = taskContext.partitionId(),
             sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE,
@@ -200,13 +202,14 @@ object FileFormatWriter extends Logging {
   /** Writes data out in a single Spark task. */
   private def executeTask(
       description: WriteJobDescription,
+      jobIdInstant: Long,
       sparkStageId: Int,
       sparkPartitionId: Int,
       sparkAttemptNumber: Int,
       committer: FileCommitProtocol,
       iterator: Iterator[InternalRow]): WriteTaskResult = {
 
-    val jobId = SparkHadoopWriterUtils.createJobID(new Date, sparkStageId)
+    val jobId = SparkHadoopWriterUtils.createJobID(new Date(jobIdInstant), sparkStageId)
     val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
     val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org