You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2021/02/09 07:06:12 UTC
[spark] branch master updated: [SPARK-34355][CORE][SQL][FOLLOWUP]
Log commit time in all File Writer
This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7ea3a33 [SPARK-34355][CORE][SQL][FOLLOWUP] Log commit time in all File Writer
7ea3a33 is described below
commit 7ea3a336b99915f09174a4c3e47fa17f30b88890
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Tue Feb 9 16:05:39 2021 +0900
[SPARK-34355][CORE][SQL][FOLLOWUP] Log commit time in all File Writer
### What changes were proposed in this pull request?
When doing https://issues.apache.org/jira/browse/SPARK-34399 based on https://github.com/apache/spark/pull/31471
Found FileBatchWrite will use `FileFormatWrite.processStates()` too. We need log commit duration in other writer too.
In this pr:
1. Extract a commit job method in SparkHadoopWriter
2. address other commit writer
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
No
Closes #31520 from AngersZhuuuu/SPARK-34355-followup.
Authored-by: Angerszhuuuu <an...@gmail.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
---
.../main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala | 5 +++--
.../apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala | 6 ++++--
2 files changed, 7 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
index 37b4708..4eeec63 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
@@ -96,8 +96,9 @@ object SparkHadoopWriter extends Logging {
iterator = iter)
})
- committer.commitJob(jobContext, ret)
- logInfo(s"Job ${jobContext.getJobID} committed.")
+ logInfo(s"Start to commit write Job ${jobContext.getJobID}.")
+ val (_, duration) = Utils.timeTakenMs { committer.commitJob(jobContext, ret) }
+ logInfo(s"Write Job ${jobContext.getJobID} committed. Elapsed time: $duration ms.")
} catch {
case cause: Throwable =>
logError(s"Aborting job ${jobContext.getJobID}.", cause)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala
index 266c834..7227e48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala
@@ -23,6 +23,7 @@ import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, PhysicalWriteInfo, WriterCommitMessage}
import org.apache.spark.sql.execution.datasources.{WriteJobDescription, WriteTaskResult}
import org.apache.spark.sql.execution.datasources.FileFormatWriter.processStats
+import org.apache.spark.util.Utils
class FileBatchWrite(
job: Job,
@@ -31,8 +32,9 @@ class FileBatchWrite(
extends BatchWrite with Logging {
override def commit(messages: Array[WriterCommitMessage]): Unit = {
val results = messages.map(_.asInstanceOf[WriteTaskResult])
- committer.commitJob(job, results.map(_.commitMsg))
- logInfo(s"Write Job ${description.uuid} committed.")
+ logInfo(s"Start to commit write Job ${description.uuid}.")
+ val (_, duration) = Utils.timeTakenMs { committer.commitJob(job, results.map(_.commitMsg)) }
+ logInfo(s"Write Job ${description.uuid} committed. Elapsed time: $duration ms.")
processStats(description.statsTrackers, results.map(_.summary.stats))
logInfo(s"Finished processing stats for write job ${description.uuid}.")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org