You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/02/14 16:25:56 UTC
[spark] branch master updated: [SPARK-26873][SQL] Use a consistent
timestamp to build Hadoop Job IDs.
This is an automated email from the ASF dual-hosted git repository.
vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 33334e2 [SPARK-26873][SQL] Use a consistent timestamp to build Hadoop Job IDs.
33334e2 is described below
commit 33334e2728f8d2e4cf7d542049435b589ed05a5e
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Thu Feb 14 08:25:33 2019 -0800
[SPARK-26873][SQL] Use a consistent timestamp to build Hadoop Job IDs.
## What changes were proposed in this pull request?
Updates FileFormatWriter to create a consistent Hadoop Job ID for a write.
## How was this patch tested?
Existing tests for regressions.
Closes #23777 from rdblue/SPARK-26873-fix-file-format-writer-job-ids.
Authored-by: Ryan Blue <bl...@apache.org>
Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
.../apache/spark/sql/execution/datasources/FileFormatWriter.scala | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index a8d24ce..a9de649 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -162,12 +162,14 @@ object FileFormatWriter extends Logging {
rdd
}
+ val jobIdInstant = new Date().getTime
val ret = new Array[WriteTaskResult](rddWithNonEmptyPartitions.partitions.length)
sparkSession.sparkContext.runJob(
rddWithNonEmptyPartitions,
(taskContext: TaskContext, iter: Iterator[InternalRow]) => {
executeTask(
description = description,
+ jobIdInstant = jobIdInstant,
sparkStageId = taskContext.stageId(),
sparkPartitionId = taskContext.partitionId(),
sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE,
@@ -200,13 +202,14 @@ object FileFormatWriter extends Logging {
/** Writes data out in a single Spark task. */
private def executeTask(
description: WriteJobDescription,
+ jobIdInstant: Long,
sparkStageId: Int,
sparkPartitionId: Int,
sparkAttemptNumber: Int,
committer: FileCommitProtocol,
iterator: Iterator[InternalRow]): WriteTaskResult = {
- val jobId = SparkHadoopWriterUtils.createJobID(new Date, sparkStageId)
+ val jobId = SparkHadoopWriterUtils.createJobID(new Date(jobIdInstant), sparkStageId)
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org