You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/01/28 13:26:29 UTC

[arrow-datafusion] branch master updated: [BugFix] fix file stream time scanning metrics bug (#5020)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new e676f3c11 [BugFix] fix file stream time scanning metrics bug (#5020)
e676f3c11 is described below

commit e676f3c114ce00972b4bfb68c4e0a87e500a2286
Author: xyz <a9...@gmail.com>
AuthorDate: Sat Jan 28 21:26:24 2023 +0800

    [BugFix] fix file stream time scanning metrics bug (#5020)
    
    currently, file stream time scanning will be 'start()' only once,
    and may be 'stop()' many times. After the first calling to 'stop()',
    the self.start value of time scanning will be replaced to default value by 'take()'.
    The subsequent calling to 'stop()' make no sense.
    
    In this pr, we will 'start()' time scanning again, if the scan of current batch is success.
    
    Signed-off-by: xyz <a9...@gmail.com>
---
 .../src/physical_plan/file_format/file_stream.rs   | 28 +++++++++++++++-------
 1 file changed, 20 insertions(+), 8 deletions(-)

diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs
index 265ff7a4f..b336b19ef 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs
@@ -127,7 +127,9 @@ struct FileStreamMetrics {
     /// Time elapsed for file opening
     pub time_opening: StartableTime,
     /// Time elapsed for file scanning + first record batch of decompression + decoding
-    pub time_scanning: StartableTime,
+    pub time_scanning_until_data: StartableTime,
+    /// Total elapsed time for for scanning + record batch decompression / decoding
+    pub time_scanning_total: StartableTime,
     /// Time elapsed for data decompression + decoding
     pub time_processing: StartableTime,
 }
@@ -140,9 +142,15 @@ impl FileStreamMetrics {
             start: None,
         };
 
-        let time_scanning = StartableTime {
+        let time_scanning_until_data = StartableTime {
             metrics: MetricBuilder::new(metrics)
-                .subset_time("time_elapsed_scanning", partition),
+                .subset_time("time_elapsed_scanning_until_data", partition),
+            start: None,
+        };
+
+        let time_scanning_total = StartableTime {
+            metrics: MetricBuilder::new(metrics)
+                .subset_time("time_elapsed_scanning_total", partition),
             start: None,
         };
 
@@ -154,7 +162,8 @@ impl FileStreamMetrics {
 
         Self {
             time_opening,
-            time_scanning,
+            time_scanning_until_data,
+            time_scanning_total,
             time_processing,
         }
     }
@@ -231,7 +240,8 @@ impl<F: FileOpener> FileStream<F> {
                 } => match ready!(future.poll_unpin(cx)) {
                     Ok(reader) => {
                         self.file_stream_metrics.time_opening.stop();
-                        self.file_stream_metrics.time_scanning.start();
+                        self.file_stream_metrics.time_scanning_until_data.start();
+                        self.file_stream_metrics.time_scanning_total.start();
                         self.state = FileStreamState::Scan {
                             partition_values: std::mem::take(partition_values),
                             reader,
@@ -247,7 +257,8 @@ impl<F: FileOpener> FileStream<F> {
                     partition_values,
                 } => match ready!(reader.poll_next_unpin(cx)) {
                     Some(result) => {
-                        self.file_stream_metrics.time_scanning.stop();
+                        self.file_stream_metrics.time_scanning_until_data.stop();
+                        self.file_stream_metrics.time_scanning_total.stop();
                         let result = result
                             .and_then(|b| self.pc_projector.project(b, partition_values))
                             .map(|batch| match &mut self.remain {
@@ -268,11 +279,12 @@ impl<F: FileOpener> FileStream<F> {
                         if result.is_err() {
                             self.state = FileStreamState::Error
                         }
-
+                        self.file_stream_metrics.time_scanning_total.start();
                         return Poll::Ready(Some(result));
                     }
                     None => {
-                        self.file_stream_metrics.time_scanning.stop();
+                        self.file_stream_metrics.time_scanning_until_data.stop();
+                        self.file_stream_metrics.time_scanning_total.stop();
                         self.state = FileStreamState::Idle;
                     }
                 },