You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2021/02/09 07:06:12 UTC

[spark] branch master updated: [SPARK-34355][CORE][SQL][FOLLOWUP] Log commit time in all File Writer

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ea3a33  [SPARK-34355][CORE][SQL][FOLLOWUP] Log commit time in all File Writer
7ea3a33 is described below

commit 7ea3a336b99915f09174a4c3e47fa17f30b88890
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Tue Feb 9 16:05:39 2021 +0900

    [SPARK-34355][CORE][SQL][FOLLOWUP] Log commit time in all File Writer
    
    ### What changes were proposed in this pull request?
    When doing https://issues.apache.org/jira/browse/SPARK-34399 based  on https://github.com/apache/spark/pull/31471
    Found FileBatchWrite will use `FileFormatWrite.processStates()` too. We need log commit duration  in other writer too.
    In this pr:
    
    1. Extract a commit job method in SparkHadoopWriter
    2. address other commit writer
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    No
    
    Closes #31520 from AngersZhuuuu/SPARK-34355-followup.
    
    Authored-by: Angerszhuuuu <an...@gmail.com>
    Signed-off-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
---
 .../main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala | 5 +++--
 .../apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala  | 6 ++++--
 2 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
index 37b4708..4eeec63 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
@@ -96,8 +96,9 @@ object SparkHadoopWriter extends Logging {
           iterator = iter)
       })
 
-      committer.commitJob(jobContext, ret)
-      logInfo(s"Job ${jobContext.getJobID} committed.")
+      logInfo(s"Start to commit write Job ${jobContext.getJobID}.")
+      val (_, duration) = Utils.timeTakenMs { committer.commitJob(jobContext, ret) }
+      logInfo(s"Write Job ${jobContext.getJobID} committed. Elapsed time: $duration ms.")
     } catch {
       case cause: Throwable =>
         logError(s"Aborting job ${jobContext.getJobID}.", cause)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala
index 266c834..7227e48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala
@@ -23,6 +23,7 @@ import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, PhysicalWriteInfo, WriterCommitMessage}
 import org.apache.spark.sql.execution.datasources.{WriteJobDescription, WriteTaskResult}
 import org.apache.spark.sql.execution.datasources.FileFormatWriter.processStats
+import org.apache.spark.util.Utils
 
 class FileBatchWrite(
     job: Job,
@@ -31,8 +32,9 @@ class FileBatchWrite(
   extends BatchWrite with Logging {
   override def commit(messages: Array[WriterCommitMessage]): Unit = {
     val results = messages.map(_.asInstanceOf[WriteTaskResult])
-    committer.commitJob(job, results.map(_.commitMsg))
-    logInfo(s"Write Job ${description.uuid} committed.")
+    logInfo(s"Start to commit write Job ${description.uuid}.")
+    val (_, duration) = Utils.timeTakenMs { committer.commitJob(job, results.map(_.commitMsg)) }
+    logInfo(s"Write Job ${description.uuid} committed. Elapsed time: $duration ms.")
 
     processStats(description.statsTrackers, results.map(_.summary.stats))
     logInfo(s"Finished processing stats for write job ${description.uuid}.")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org