You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2020/12/09 11:44:30 UTC

[arrow] branch master updated: ARROW-10781:[Rust] [DataFusion] add the 'Statistics' interface in data source

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 38cfdc3  ARROW-10781:[Rust] [DataFusion] add the 'Statistics' interface in data source
38cfdc3 is described below

commit 38cfdc3c95c0066b7b6a9b7fb5a47494decf7805
Author: shawnding(丁晓坤) <sh...@tencent.com>
AuthorDate: Wed Dec 9 06:43:22 2020 -0500

    ARROW-10781:[Rust] [DataFusion] add the 'Statistics' interface in data source
    
    Add the 'Statistics' interface in data source
    
    Closes #8866 from XiaokunDing/statistics_interface
    
    Authored-by: shawnding(丁晓坤) <sh...@tencent.com>
    Signed-off-by: Andrew Lamb <an...@nerdnetworks.org>
---
 rust/datafusion/src/datasource/csv.rs        |  7 +++++++
 rust/datafusion/src/datasource/datasource.rs | 14 ++++++++++++++
 rust/datafusion/src/datasource/memory.rs     |  7 +++++++
 rust/datafusion/src/datasource/parquet.rs    |  7 +++++++
 rust/datafusion/tests/dataframe.rs           |  5 +++++
 5 files changed, 40 insertions(+)

diff --git a/rust/datafusion/src/datasource/csv.rs b/rust/datafusion/src/datasource/csv.rs
index 351eaff..c2b9b57 100644
--- a/rust/datafusion/src/datasource/csv.rs
+++ b/rust/datafusion/src/datasource/csv.rs
@@ -38,6 +38,7 @@ use std::any::Any;
 use std::string::String;
 use std::sync::Arc;
 
+use crate::datasource::datasource::Statistics;
 use crate::datasource::TableProvider;
 use crate::error::{DataFusionError, Result};
 use crate::physical_plan::csv::CsvExec;
@@ -52,6 +53,7 @@ pub struct CsvFile {
     has_header: bool,
     delimiter: u8,
     file_extension: String,
+    statistics: Option<Statistics>,
 }
 
 impl CsvFile {
@@ -75,6 +77,7 @@ impl CsvFile {
             has_header: options.has_header,
             delimiter: options.delimiter,
             file_extension: String::from(options.file_extension),
+            statistics: None,
         })
     }
 }
@@ -104,4 +107,8 @@ impl TableProvider for CsvFile {
             batch_size,
         )?))
     }
+
+    fn statistics(&self) -> Option<Statistics> {
+        self.statistics.clone()
+    }
 }
diff --git a/rust/datafusion/src/datasource/datasource.rs b/rust/datafusion/src/datasource/datasource.rs
index e7371ea..5882336 100644
--- a/rust/datafusion/src/datasource/datasource.rs
+++ b/rust/datafusion/src/datasource/datasource.rs
@@ -24,6 +24,16 @@ use crate::arrow::datatypes::SchemaRef;
 use crate::error::Result;
 use crate::physical_plan::ExecutionPlan;
 
+/// This table statistics are estimates.
+/// It can not be used directly in the precise compute
+#[derive(Clone)]
+pub struct Statistics {
+    /// The number of table rows
+    pub num_rows: usize,
+    /// total byte of the table rows
+    pub total_byte_size: usize,
+}
+
 /// Source table
 pub trait TableProvider {
     /// Returns the table provider as [`Any`](std::any::Any) so that it can be
@@ -39,4 +49,8 @@ pub trait TableProvider {
         projection: &Option<Vec<usize>>,
         batch_size: usize,
     ) -> Result<Arc<dyn ExecutionPlan>>;
+
+    /// Returns the table Statistics
+    /// Statistics should be optional because not all data sources can provide statistics.
+    fn statistics(&self) -> Option<Statistics>;
 }
diff --git a/rust/datafusion/src/datasource/memory.rs b/rust/datafusion/src/datasource/memory.rs
index 8fa140b..a62b79c 100644
--- a/rust/datafusion/src/datasource/memory.rs
+++ b/rust/datafusion/src/datasource/memory.rs
@@ -25,6 +25,7 @@ use std::sync::Arc;
 use arrow::datatypes::{Field, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
 
+use crate::datasource::datasource::Statistics;
 use crate::datasource::TableProvider;
 use crate::error::{DataFusionError, Result};
 use crate::physical_plan::common;
@@ -35,6 +36,7 @@ use crate::physical_plan::ExecutionPlan;
 pub struct MemTable {
     schema: SchemaRef,
     batches: Vec<Vec<RecordBatch>>,
+    statistics: Option<Statistics>,
 }
 
 impl MemTable {
@@ -48,6 +50,7 @@ impl MemTable {
             Ok(Self {
                 schema,
                 batches: partitions,
+                statistics: None,
             })
         } else {
             Err(DataFusionError::Plan(
@@ -132,6 +135,10 @@ impl TableProvider for MemTable {
             projection.clone(),
         )?))
     }
+
+    fn statistics(&self) -> Option<Statistics> {
+        self.statistics.clone()
+    }
 }
 
 #[cfg(test)]
diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs
index be65e63..6d7ec02 100644
--- a/rust/datafusion/src/datasource/parquet.rs
+++ b/rust/datafusion/src/datasource/parquet.rs
@@ -23,6 +23,7 @@ use std::sync::Arc;
 
 use arrow::datatypes::*;
 
+use crate::datasource::datasource::Statistics;
 use crate::datasource::TableProvider;
 use crate::error::Result;
 use crate::physical_plan::parquet::ParquetExec;
@@ -32,6 +33,7 @@ use crate::physical_plan::ExecutionPlan;
 pub struct ParquetTable {
     path: String,
     schema: SchemaRef,
+    statistics: Option<Statistics>,
 }
 
 impl ParquetTable {
@@ -42,6 +44,7 @@ impl ParquetTable {
         Ok(Self {
             path: path.to_string(),
             schema,
+            statistics: None,
         })
     }
 }
@@ -69,6 +72,10 @@ impl TableProvider for ParquetTable {
             batch_size,
         )?))
     }
+
+    fn statistics(&self) -> Option<Statistics> {
+        self.statistics.clone()
+    }
 }
 
 #[cfg(test)]
diff --git a/rust/datafusion/tests/dataframe.rs b/rust/datafusion/tests/dataframe.rs
index d17deff..6bf6c23 100644
--- a/rust/datafusion/tests/dataframe.rs
+++ b/rust/datafusion/tests/dataframe.rs
@@ -20,6 +20,7 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
 
+use datafusion::datasource::datasource::Statistics;
 use datafusion::datasource::TableProvider;
 use datafusion::error::{DataFusionError, Result};
 
@@ -145,6 +146,10 @@ impl TableProvider for CustomTableProvider {
             projection: projection.clone(),
         }))
     }
+
+    fn statistics(&self) -> Option<Statistics> {
+        None
+    }
 }
 
 #[tokio::test]