You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/08/19 17:19:07 UTC

[GitHub] [arrow-datafusion] alamb opened a new pull request #908: Improve SQLMetric APIs, port existing metrics

alamb opened a new pull request #908:
URL: https://github.com/apache/arrow-datafusion/pull/908


   # Which issue does this PR close?
   
   Closes https://github.com/apache/arrow-datafusion/issues/679. See https://github.com/apache/arrow-datafusion/pull/901 for an earlier version with feedback from @tustvold  and @andygrove 
   
    # Rationale for this change
   See the description on https://github.com/apache/arrow-datafusion/issues/679#issue-937015511 for the full rationale, but the TLDR version is:
   1. Better align the metric data model with industry best practice to ease integration in other metric systems (e.g. prometheus, influxdb, etc)
   2. Ability to get per-partition metrics
   3. Ability to get current metric values *during* execution without allocation
   
   
   # What changes are included in this PR?
   1. Update the `SQLMetric` API to be in its own module, have labels, know about partitions, and allow for real time inspection
   2. Rename `SQLMetric` --> `Metric` 
   2. Update uses of metrics in DataFusion and Ballista to the new API 
   2. Functionality to aggregate (sum) metrics via predicate and via partition (as requested by @Dandandan  in https://github.com/apache/arrow-datafusion/issues/679#issuecomment-874341354 and @andygrove  in https://github.com/apache/arrow-datafusion/issues/679#issuecomment-874187741)
   3. Rename metric names to snake case (`output_rows`) rather than camel 🐫  case  (`outputRows`) to conform to Rust expectations (see note from @andygrove  on the reason the names were camelCase to begin with: https://github.com/apache/arrow-datafusion/pull/901#issuecomment-901362155)
   
   
   # Are there any user-facing changes?
   YES! 
   
   The `SQLMetric` / `Metric` API is now totally different so any code that creates / uses `SQLMetrics` would have to be updated. The updates are fairly mechanical as you can see in this PR)
   
   # Notes
   In keeping with Rust's tradition of static typing, I also changed to using more strongly typed versions of the metric values to avoid mistakes such as adding a "time" to a counter value, as well as allowing other counter specific operations.
   
   # Open Questions:
   The current SQL counters use "camel case" for the counter names (e.g. `numRows`) rather than the Rust standard "snake case" (e.g. `num_rows`). I kept the same naming convention in this PR, but I wonder if we want to make them more Rust standard snake case given we are messing with them all anyways.
   
   # Not included in this PR:
   1. Ensure that all operators have reasonable metrics: (I plan this in a follow on PR for #866, using this API)
   2. Support for a global "operator id" as described by @andygrove in https://github.com/apache/arrow-datafusion/issues/679#issuecomment-874187741
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on pull request #908: Improve SQLMetric APIs, port existing metrics

Posted by GitBox <gi...@apache.org>.
houqp commented on pull request #908:
URL: https://github.com/apache/arrow-datafusion/pull/908#issuecomment-904877449


   Late to the party, this is really great work, thanks @alamb !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #908: Improve SQLMetric APIs, port existing metrics

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #908:
URL: https://github.com/apache/arrow-datafusion/pull/908#discussion_r694849912



##########
File path: datafusion/tests/sql.rs
##########
@@ -2172,6 +2172,8 @@ async fn csv_explain_analyze() {
     let formatted = arrow::util::pretty::pretty_format_batches(&actual).unwrap();
     let formatted = normalize_for_explain(&formatted);
 
+    println!("ANALYZE EXPLAIN:\n{}", formatted);

Review comment:
       Really it is more like a development tool. I will plan to remove it as a follow on PR. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #908: Improve SQLMetric APIs, port existing metrics

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #908:
URL: https://github.com/apache/arrow-datafusion/pull/908#issuecomment-904496312


   Thanks @andygrove  -- I plan to merge this in later today unless I anyone objects or would like more time to review. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] returnString commented on a change in pull request #908: Improve SQLMetric APIs, port existing metrics

Posted by GitBox <gi...@apache.org>.
returnString commented on a change in pull request #908:
URL: https://github.com/apache/arrow-datafusion/pull/908#discussion_r694843031



##########
File path: datafusion/tests/sql.rs
##########
@@ -2172,6 +2172,8 @@ async fn csv_explain_analyze() {
     let formatted = arrow::util::pretty::pretty_format_batches(&actual).unwrap();
     let formatted = normalize_for_explain(&formatted);
 
+    println!("ANALYZE EXPLAIN:\n{}", formatted);

Review comment:
       Stray debug logging?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #908: Improve SQLMetric APIs, port existing metrics

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #908:
URL: https://github.com/apache/arrow-datafusion/pull/908#issuecomment-903964893


   Here is an example of what is needed to use the new API in IOx: https://github.com/influxdata/influxdb_iox/pull/2385


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb merged pull request #908: Improve SQLMetric APIs, port existing metrics

Posted by GitBox <gi...@apache.org>.
alamb merged pull request #908:
URL: https://github.com/apache/arrow-datafusion/pull/908


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #908: Improve SQLMetric APIs, port existing metrics

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #908:
URL: https://github.com/apache/arrow-datafusion/pull/908#issuecomment-904730265


   For anyone else who is updating code that uses the existing `SQLMetric` API, here is an example PR that updates IOx https://github.com/influxdata/influxdb_iox/pull/2385 in case those patterns are helpful


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #908: Improve SQLMetric APIs, port existing metrics

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #908:
URL: https://github.com/apache/arrow-datafusion/pull/908#discussion_r694167112



##########
File path: datafusion/src/physical_plan/metrics/mod.rs
##########
@@ -0,0 +1,528 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics for recording information about execution
+
+mod builder;
+mod value;
+
+use std::{
+    borrow::Cow,
+    fmt::{Debug, Display},
+    sync::{Arc, Mutex},
+};
+
+use hashbrown::HashMap;
+
+// public exports
+pub use builder::MetricBuilder;
+pub use value::{Count, MetricValue, ScopedTimerGuard, Time};
+
+/// Something that tracks a value of interest (metric) of a DataFusion
+/// [`ExecutionPlan`] execution.
+///
+/// Typically [`Metric`]s are not created directly, but instead
+/// are created using [`MetricBuilder`] or methods on
+/// [`ExecutionPlanMetricsSet`].
+///
+/// ```
+///  use datafusion::physical_plan::metrics::*;
+///
+///  let metrics = ExecutionPlanMetricsSet::new();
+///  assert!(metrics.clone_inner().output_rows().is_none());
+///
+///  // Create a counter to increment using the MetricBuilder
+///  let partition = 1;
+///  let output_rows = MetricBuilder::new(&metrics)
+///      .output_rows(partition);
+///
+///  // Counter can be incremented
+///  output_rows.add(13);
+///
+///  // The value can be retrieved directly:
+///  assert_eq!(output_rows.value(), 13);
+///
+///  // As well as from the metrics set
+///  assert_eq!(metrics.clone_inner().output_rows(), Some(13));
+/// ```
+
+#[derive(Debug)]
+pub struct Metric {
+    /// The value the metric
+    value: MetricValue,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+
+    /// To which partition of an operators output did this metric
+    /// apply? If None means all partitions.
+    partition: Option<usize>,
+}
+
+impl Display for Metric {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.value.name())?;
+
+        let mut iter = self
+            .partition
+            .iter()
+            .map(|partition| Label::new("partition", partition.to_string()))
+            .chain(self.labels().iter().cloned())
+            .peekable();
+
+        // print out the labels specially
+        if iter.peek().is_some() {
+            write!(f, "{{")?;
+
+            let mut is_first = true;
+            for i in iter {
+                if !is_first {
+                    write!(f, ", ")?;
+                } else {
+                    is_first = false;
+                }
+
+                write!(f, "{}", i)?;
+            }
+
+            write!(f, "}}")?;
+        }
+
+        // and now the value
+        write!(f, "={}", self.value)
+    }
+}
+
+impl Metric {
+    /// Create a new [`Metric`]. Consider using [`MetricBuilder`]
+    /// rather than this function directly.
+    pub fn new(value: MetricValue, partition: Option<usize>) -> Self {
+        Self {
+            value,
+            labels: vec![],
+            partition,
+        }
+    }
+
+    /// Create a new [`Metric`]. Consider using [`MetricBuilder`]
+    /// rather than this function directly.
+    pub fn new_with_labels(
+        value: MetricValue,
+        partition: Option<usize>,
+        labels: Vec<Label>,
+    ) -> Self {
+        Self {
+            value,
+            labels,
+            partition,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn with(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// What labels are present for this metric?
+    fn labels(&self) -> &[Label] {
+        &self.labels
+    }
+
+    /// return a reference to the value of this metric
+    pub fn value(&self) -> &MetricValue {
+        &self.value
+    }
+
+    /// return a mutable reference to the value of this metric
+    pub fn value_mut(&mut self) -> &mut MetricValue {
+        &mut self.value
+    }
+
+    /// return a reference to the partition
+    pub fn partition(&self) -> &Option<usize> {
+        &self.partition
+    }
+}
+
+/// A snapshot of the metrics for a particular operator (`dyn
+/// ExecutionPlan`).
+#[derive(Default, Debug, Clone)]
+pub struct MetricsSet {
+    metrics: Vec<Arc<Metric>>,
+}
+
+impl MetricsSet {
+    /// Create a new container of metrics
+    pub fn new() -> Self {
+        Default::default()
+    }
+
+    /// Add the specified metric
+    pub fn push(&mut self, metric: Arc<Metric>) {
+        self.metrics.push(metric)
+    }
+
+    /// Returns an interator across all metrics
+    pub fn iter(&self) -> impl Iterator<Item = &Arc<Metric>> {
+        self.metrics.iter()
+    }
+
+    /// convenience: return the number of rows produced, aggregated
+    /// across partitions or None if no metric is present
+    pub fn output_rows(&self) -> Option<usize> {
+        self.sum(|metric| matches!(metric.value(), MetricValue::OutputRows(_)))
+            .map(|v| v.as_usize())
+    }
+
+    /// convenience: return the amount of CPU time spent, aggregated
+    /// across partitions or None if no metric is present
+    pub fn cpu_time(&self) -> Option<usize> {
+        self.sum(|metric| matches!(metric.value(), MetricValue::CPUTime(_)))
+            .map(|v| v.as_usize())
+    }
+
+    /// Sums the values for metrics for which `f(metric)` returns
+    /// true, and returns the value. Returns None if no metrics match
+    /// the predicate.
+    pub fn sum<F>(&self, mut f: F) -> Option<MetricValue>
+    where
+        F: FnMut(&Metric) -> bool,
+    {
+        let mut iter = self
+            .metrics
+            .iter()
+            .filter(|metric| f(metric.as_ref()))
+            .peekable();
+
+        let mut accum = match iter.peek() {
+            None => {
+                return None;
+            }
+            Some(metric) => metric.value().new_empty(),
+        };
+
+        iter.for_each(|metric| accum.add(metric.value()));
+
+        Some(accum)
+    }
+
+    /// Returns returns a new derived `MetricsSet` where all metrics
+    /// that had the same name and partition=`Some(..)` have been
+    /// aggregated together. The resulting `MetricsSet` has all
+    /// metrics with `Partition=None`
+    pub fn aggregate_by_partition(&self) -> Self {

Review comment:
       Here is the code that can aggregate metrics by partition (and `sum` above is also useful)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org