You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/06/28 10:55:13 UTC
[arrow-datafusion] branch master updated: use Into as
argument type wherever applicable (#615)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new e86f8e9 use Into<String> as argument type wherever applicable (#615)
e86f8e9 is described below
commit e86f8e92a9adb4bd92170b191db6bfaff83cba38
Author: QP Hou <qp...@scribd.com>
AuthorDate: Mon Jun 28 03:53:00 2021 -0700
use Into<String> as argument type wherever applicable (#615)
* use Into<String> as argument type wherever applicable
* switch from Into<String> to AsRef<str> for write_csv and write_parquet
---
datafusion/src/datasource/csv.rs | 7 +++---
datafusion/src/datasource/parquet.rs | 7 +++---
datafusion/src/execution/context.rs | 17 ++++++++-----
datafusion/src/logical_plan/builder.rs | 28 ++++++++++++----------
datafusion/src/logical_plan/expr.rs | 6 ++---
datafusion/src/optimizer/filter_push_down.rs | 12 +++++-----
datafusion/src/optimizer/projection_push_down.rs | 4 ++--
datafusion/src/optimizer/utils.rs | 2 +-
datafusion/src/physical_plan/aggregates.rs | 4 ++--
.../src/physical_plan/expressions/average.rs | 8 +++++--
datafusion/src/physical_plan/expressions/count.rs | 8 +++++--
.../src/physical_plan/expressions/min_max.rs | 16 +++++++++----
.../src/physical_plan/expressions/nth_value.rs | 12 +++++-----
.../src/physical_plan/expressions/row_number.rs | 4 ++--
datafusion/src/physical_plan/expressions/sum.rs | 8 +++++--
datafusion/src/physical_plan/planner.rs | 15 ++++++------
datafusion/src/physical_plan/udaf.rs | 4 ++--
17 files changed, 97 insertions(+), 65 deletions(-)
diff --git a/datafusion/src/datasource/csv.rs b/datafusion/src/datasource/csv.rs
index 906a1ce..987c4fd 100644
--- a/datafusion/src/datasource/csv.rs
+++ b/datafusion/src/datasource/csv.rs
@@ -59,11 +59,12 @@ pub struct CsvFile {
impl CsvFile {
/// Attempt to initialize a new `CsvFile` from a file path
- pub fn try_new(path: &str, options: CsvReadOptions) -> Result<Self> {
+ pub fn try_new(path: impl Into<String>, options: CsvReadOptions) -> Result<Self> {
+ let path = path.into();
let schema = Arc::new(match options.schema {
Some(s) => s.clone(),
None => {
- let filenames = common::build_file_list(path, options.file_extension)?;
+ let filenames = common::build_file_list(&path, options.file_extension)?;
if filenames.is_empty() {
return Err(DataFusionError::Plan(format!(
"No files found at {path} with file extension {file_extension}",
@@ -76,7 +77,7 @@ impl CsvFile {
});
Ok(Self {
- source: Source::Path(path.to_string()),
+ source: Source::Path(path),
schema,
has_header: options.has_header,
delimiter: options.delimiter,
diff --git a/datafusion/src/datasource/parquet.rs b/datafusion/src/datasource/parquet.rs
index abfb81d..fd14741 100644
--- a/datafusion/src/datasource/parquet.rs
+++ b/datafusion/src/datasource/parquet.rs
@@ -42,11 +42,12 @@ pub struct ParquetTable {
impl ParquetTable {
/// Attempt to initialize a new `ParquetTable` from a file path.
- pub fn try_new(path: &str, max_concurrency: usize) -> Result<Self> {
- let parquet_exec = ParquetExec::try_from_path(path, None, None, 0, 1, None)?;
+ pub fn try_new(path: impl Into<String>, max_concurrency: usize) -> Result<Self> {
+ let path = path.into();
+ let parquet_exec = ParquetExec::try_from_path(&path, None, None, 0, 1, None)?;
let schema = parquet_exec.schema();
Ok(Self {
- path: path.to_string(),
+ path,
schema,
statistics: parquet_exec.statistics().to_owned(),
max_concurrency,
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index 17625c9..318ea59 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -270,7 +270,7 @@ impl ExecutionContext {
/// Creates a DataFrame for reading a CSV data source.
pub fn read_csv(
&mut self,
- filename: &str,
+ filename: impl Into<String>,
options: CsvReadOptions,
) -> Result<Arc<dyn DataFrame>> {
Ok(Arc::new(DataFrameImpl::new(
@@ -280,7 +280,10 @@ impl ExecutionContext {
}
/// Creates a DataFrame for reading a Parquet data source.
- pub fn read_parquet(&mut self, filename: &str) -> Result<Arc<dyn DataFrame>> {
+ pub fn read_parquet(
+ &mut self,
+ filename: impl Into<String>,
+ ) -> Result<Arc<dyn DataFrame>> {
Ok(Arc::new(DataFrameImpl::new(
self.state.clone(),
&LogicalPlanBuilder::scan_parquet(
@@ -474,10 +477,11 @@ impl ExecutionContext {
pub async fn write_csv(
&self,
plan: Arc<dyn ExecutionPlan>,
- path: String,
+ path: impl AsRef<str>,
) -> Result<()> {
+ let path = path.as_ref();
// create directory to contain the CSV files (one per partition)
- let fs_path = Path::new(&path);
+ let fs_path = Path::new(path);
match fs::create_dir(fs_path) {
Ok(()) => {
let mut tasks = vec![];
@@ -511,11 +515,12 @@ impl ExecutionContext {
pub async fn write_parquet(
&self,
plan: Arc<dyn ExecutionPlan>,
- path: String,
+ path: impl AsRef<str>,
writer_properties: Option<WriterProperties>,
) -> Result<()> {
+ let path = path.as_ref();
// create directory to contain the Parquet files (one per partition)
- let fs_path = Path::new(&path);
+ let fs_path = Path::new(path);
match fs::create_dir(fs_path) {
Ok(()) => {
let mut tasks = vec![];
diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs
index 17fe663..1a53e21 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -115,19 +115,20 @@ impl LogicalPlanBuilder {
/// Scan a CSV data source
pub fn scan_csv(
- path: &str,
+ path: impl Into<String>,
options: CsvReadOptions,
projection: Option<Vec<usize>>,
) -> Result<Self> {
- Self::scan_csv_with_name(path, options, projection, path)
+ let path = path.into();
+ Self::scan_csv_with_name(path.clone(), options, projection, path)
}
/// Scan a CSV data source and register it with a given table name
pub fn scan_csv_with_name(
- path: &str,
+ path: impl Into<String>,
options: CsvReadOptions,
projection: Option<Vec<usize>>,
- table_name: &str,
+ table_name: impl Into<String>,
) -> Result<Self> {
let provider = Arc::new(CsvFile::try_new(path, options)?);
Self::scan(table_name, provider, projection)
@@ -135,19 +136,20 @@ impl LogicalPlanBuilder {
/// Scan a Parquet data source
pub fn scan_parquet(
- path: &str,
+ path: impl Into<String>,
projection: Option<Vec<usize>>,
max_concurrency: usize,
) -> Result<Self> {
- Self::scan_parquet_with_name(path, projection, max_concurrency, path)
+ let path = path.into();
+ Self::scan_parquet_with_name(path.clone(), projection, max_concurrency, path)
}
/// Scan a Parquet data source and register it with a given table name
pub fn scan_parquet_with_name(
- path: &str,
+ path: impl Into<String>,
projection: Option<Vec<usize>>,
max_concurrency: usize,
- table_name: &str,
+ table_name: impl Into<String>,
) -> Result<Self> {
let provider = Arc::new(ParquetTable::try_new(path, max_concurrency)?);
Self::scan(table_name, provider, projection)
@@ -166,10 +168,12 @@ impl LogicalPlanBuilder {
/// Convert a table provider into a builder with a TableScan
pub fn scan(
- table_name: &str,
+ table_name: impl Into<String>,
provider: Arc<dyn TableProvider>,
projection: Option<Vec<usize>>,
) -> Result<Self> {
+ let table_name = table_name.into();
+
if table_name.is_empty() {
return Err(DataFusionError::Plan(
"table_name cannot be empty".to_string(),
@@ -184,17 +188,17 @@ impl LogicalPlanBuilder {
DFSchema::new(
p.iter()
.map(|i| {
- DFField::from_qualified(table_name, schema.field(*i).clone())
+ DFField::from_qualified(&table_name, schema.field(*i).clone())
})
.collect(),
)
})
.unwrap_or_else(|| {
- DFSchema::try_from_qualified_schema(table_name, &schema)
+ DFSchema::try_from_qualified_schema(&table_name, &schema)
})?;
let table_scan = LogicalPlan::TableScan {
- table_name: table_name.to_string(),
+ table_name,
source: provider,
projected_schema: Arc::new(projected_schema),
projection,
diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs
index d20b1f6..1bf3b65 100644
--- a/datafusion/src/logical_plan/expr.rs
+++ b/datafusion/src/logical_plan/expr.rs
@@ -44,10 +44,10 @@ pub struct Column {
impl Column {
/// Create Column from unqualified name.
- pub fn from_name(name: String) -> Self {
+ pub fn from_name(name: impl Into<String>) -> Self {
Self {
relation: None,
- name,
+ name: name.into(),
}
}
@@ -131,7 +131,7 @@ impl fmt::Display for Column {
/// ```
/// # use datafusion::logical_plan::*;
/// let expr = col("c1");
-/// assert_eq!(expr, Expr::Column(Column::from_name("c1".to_string())));
+/// assert_eq!(expr, Expr::Column(Column::from_name("c1")));
/// ```
///
/// ## Create the expression `c1 + c2` to add columns "c1" and "c2" together
diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs
index 7b1ff32..c1d81fe 100644
--- a/datafusion/src/optimizer/filter_push_down.rs
+++ b/datafusion/src/optimizer/filter_push_down.rs
@@ -890,8 +890,8 @@ mod tests {
.join(
&right,
JoinType::Inner,
- vec![Column::from_name("a".to_string())],
- vec![Column::from_name("a".to_string())],
+ vec![Column::from_name("a")],
+ vec![Column::from_name("a")],
)?
.filter(col("a").lt_eq(lit(1i64)))?
.build()?;
@@ -933,8 +933,8 @@ mod tests {
.join(
&right,
JoinType::Inner,
- vec![Column::from_name("a".to_string())],
- vec![Column::from_name("a".to_string())],
+ vec![Column::from_name("a")],
+ vec![Column::from_name("a")],
)?
// "b" and "c" are not shared by either side: they are only available together after the join
.filter(col("c").lt_eq(col("b")))?
@@ -972,8 +972,8 @@ mod tests {
.join(
&right,
JoinType::Inner,
- vec![Column::from_name("a".to_string())],
- vec![Column::from_name("a".to_string())],
+ vec![Column::from_name("a")],
+ vec![Column::from_name("a")],
)?
.filter(col("b").lt_eq(lit(1i64)))?
.build()?;
diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs
index 4bf2b6e..3c8f1ee 100644
--- a/datafusion/src/optimizer/projection_push_down.rs
+++ b/datafusion/src/optimizer/projection_push_down.rs
@@ -241,7 +241,7 @@ fn optimize_plan(
{
window_expr.iter().try_for_each(|expr| {
let name = &expr.name(schema)?;
- let column = Column::from_name(name.to_string());
+ let column = Column::from_name(name);
if required_columns.contains(&column) {
new_window_expr.push(expr.clone());
new_required_columns.insert(column);
@@ -286,7 +286,7 @@ fn optimize_plan(
let mut new_aggr_expr = Vec::new();
aggr_expr.iter().try_for_each(|expr| {
let name = &expr.name(schema)?;
- let column = Column::from_name(name.to_string());
+ let column = Column::from_name(name);
if required_columns.contains(&column) {
new_aggr_expr.push(expr.clone());
diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs
index 394308f..ae3e196 100644
--- a/datafusion/src/optimizer/utils.rs
+++ b/datafusion/src/optimizer/utils.rs
@@ -516,7 +516,7 @@ mod tests {
&mut accum,
)?;
assert_eq!(1, accum.len());
- assert!(accum.contains(&Column::from_name("a".to_string())));
+ assert!(accum.contains(&Column::from_name("a")));
Ok(())
}
diff --git a/datafusion/src/physical_plan/aggregates.rs b/datafusion/src/physical_plan/aggregates.rs
index 897c78f..c297a95 100644
--- a/datafusion/src/physical_plan/aggregates.rs
+++ b/datafusion/src/physical_plan/aggregates.rs
@@ -110,9 +110,9 @@ pub fn create_aggregate_expr(
distinct: bool,
args: &[Arc<dyn PhysicalExpr>],
input_schema: &Schema,
- name: String,
+ name: impl Into<String>,
) -> Result<Arc<dyn AggregateExpr>> {
- // coerce
+ let name = name.into();
let arg = coerce(args, input_schema, &signature(fun))?;
if arg.is_empty() {
return Err(DataFusionError::Plan(format!(
diff --git a/datafusion/src/physical_plan/expressions/average.rs b/datafusion/src/physical_plan/expressions/average.rs
index 6a63320..2e21819 100644
--- a/datafusion/src/physical_plan/expressions/average.rs
+++ b/datafusion/src/physical_plan/expressions/average.rs
@@ -64,9 +64,13 @@ pub fn avg_return_type(arg_type: &DataType) -> Result<DataType> {
impl Avg {
/// Create a new AVG aggregate function
- pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+ pub fn new(
+ expr: Arc<dyn PhysicalExpr>,
+ name: impl Into<String>,
+ data_type: DataType,
+ ) -> Self {
Self {
- name,
+ name: name.into(),
expr,
data_type,
nullable: true,
diff --git a/datafusion/src/physical_plan/expressions/count.rs b/datafusion/src/physical_plan/expressions/count.rs
index 4a3fbe4..30c44f1 100644
--- a/datafusion/src/physical_plan/expressions/count.rs
+++ b/datafusion/src/physical_plan/expressions/count.rs
@@ -44,9 +44,13 @@ pub struct Count {
impl Count {
/// Create a new COUNT aggregate function.
- pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+ pub fn new(
+ expr: Arc<dyn PhysicalExpr>,
+ name: impl Into<String>,
+ data_type: DataType,
+ ) -> Self {
Self {
- name,
+ name: name.into(),
expr,
data_type,
nullable: true,
diff --git a/datafusion/src/physical_plan/expressions/min_max.rs b/datafusion/src/physical_plan/expressions/min_max.rs
index 680e739..46e41f4 100644
--- a/datafusion/src/physical_plan/expressions/min_max.rs
+++ b/datafusion/src/physical_plan/expressions/min_max.rs
@@ -49,9 +49,13 @@ pub struct Max {
impl Max {
/// Create a new MAX aggregate function
- pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+ pub fn new(
+ expr: Arc<dyn PhysicalExpr>,
+ name: impl Into<String>,
+ data_type: DataType,
+ ) -> Self {
Self {
- name,
+ name: name.into(),
expr,
data_type,
nullable: true,
@@ -352,9 +356,13 @@ pub struct Min {
impl Min {
/// Create a new MIN aggregate function
- pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+ pub fn new(
+ expr: Arc<dyn PhysicalExpr>,
+ name: impl Into<String>,
+ data_type: DataType,
+ ) -> Self {
Self {
- name,
+ name: name.into(),
expr,
data_type,
nullable: true,
diff --git a/datafusion/src/physical_plan/expressions/nth_value.rs b/datafusion/src/physical_plan/expressions/nth_value.rs
index 577c19b..b548f91 100644
--- a/datafusion/src/physical_plan/expressions/nth_value.rs
+++ b/datafusion/src/physical_plan/expressions/nth_value.rs
@@ -45,12 +45,12 @@ pub struct NthValue {
impl NthValue {
/// Create a new FIRST_VALUE window aggregate function
pub fn first_value(
- name: String,
+ name: impl Into<String>,
expr: Arc<dyn PhysicalExpr>,
data_type: DataType,
) -> Self {
Self {
- name,
+ name: name.into(),
expr,
data_type,
kind: NthValueKind::First,
@@ -59,12 +59,12 @@ impl NthValue {
/// Create a new LAST_VALUE window aggregate function
pub fn last_value(
- name: String,
+ name: impl Into<String>,
expr: Arc<dyn PhysicalExpr>,
data_type: DataType,
) -> Self {
Self {
- name,
+ name: name.into(),
expr,
data_type,
kind: NthValueKind::Last,
@@ -73,7 +73,7 @@ impl NthValue {
/// Create a new NTH_VALUE window aggregate function
pub fn nth_value(
- name: String,
+ name: impl Into<String>,
expr: Arc<dyn PhysicalExpr>,
data_type: DataType,
n: u32,
@@ -83,7 +83,7 @@ impl NthValue {
"nth_value expect n to be > 0".to_owned(),
)),
_ => Ok(Self {
- name,
+ name: name.into(),
expr,
data_type,
kind: NthValueKind::Nth(n),
diff --git a/datafusion/src/physical_plan/expressions/row_number.rs b/datafusion/src/physical_plan/expressions/row_number.rs
index 0444ee9..6b488cc 100644
--- a/datafusion/src/physical_plan/expressions/row_number.rs
+++ b/datafusion/src/physical_plan/expressions/row_number.rs
@@ -32,8 +32,8 @@ pub struct RowNumber {
impl RowNumber {
/// Create a new ROW_NUMBER function
- pub fn new(name: String) -> Self {
- Self { name }
+ pub fn new(name: impl Into<String>) -> Self {
+ Self { name: name.into() }
}
}
diff --git a/datafusion/src/physical_plan/expressions/sum.rs b/datafusion/src/physical_plan/expressions/sum.rs
index 7bbbf99..c3f57e3 100644
--- a/datafusion/src/physical_plan/expressions/sum.rs
+++ b/datafusion/src/physical_plan/expressions/sum.rs
@@ -65,9 +65,13 @@ pub fn sum_return_type(arg_type: &DataType) -> Result<DataType> {
impl Sum {
/// Create a new SUM aggregate function
- pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+ pub fn new(
+ expr: Arc<dyn PhysicalExpr>,
+ name: impl Into<String>,
+ data_type: DataType,
+ ) -> Self {
Self {
- name,
+ name: name.into(),
expr,
data_type,
nullable: true,
diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs
index d590042..c3bb9a8 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -1018,11 +1018,12 @@ impl DefaultPhysicalPlanner {
pub fn create_window_expr_with_name(
&self,
e: &Expr,
- name: String,
+ name: impl Into<String>,
logical_input_schema: &DFSchema,
physical_input_schema: &Schema,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn WindowExpr>> {
+ let name = name.into();
match e {
Expr::WindowFunction {
fun,
@@ -1124,7 +1125,7 @@ impl DefaultPhysicalPlanner {
pub fn create_aggregate_expr_with_name(
&self,
e: &Expr,
- name: String,
+ name: impl Into<String>,
logical_input_schema: &DFSchema,
physical_input_schema: &Schema,
ctx_state: &ExecutionContextState,
@@ -1263,7 +1264,7 @@ mod tests {
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
let options = CsvReadOptions::new().schema_infer_max_records(100);
- let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
+ let logical_plan = LogicalPlanBuilder::scan_csv(path, options, None)?
// filter clause needs the type coercion rule applied
.filter(col("c7").lt(lit(5_u8)))?
.project(vec![col("c1"), col("c2")])?
@@ -1308,7 +1309,7 @@ mod tests {
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
let options = CsvReadOptions::new().schema_infer_max_records(100);
- let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
+ let logical_plan = LogicalPlanBuilder::scan_csv(path, options, None)?
.filter(col("c7").lt(col("c12")))?
.build()?;
@@ -1449,7 +1450,7 @@ mod tests {
Expr::Literal(ScalarValue::Boolean(Some(true))),
Expr::Literal(ScalarValue::Utf8(Some("a".to_string()))),
];
- let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
+ let logical_plan = LogicalPlanBuilder::scan_csv(path, options, None)?
// filter clause needs the type coercion rule applied
.filter(col("c12").lt(lit(0.05)))?
.project(vec![col("c12").lt_eq(lit(0.025)).in_list(list, false)])?
@@ -1476,7 +1477,7 @@ mod tests {
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
let options = CsvReadOptions::new().schema_infer_max_records(100);
- let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
+ let logical_plan = LogicalPlanBuilder::scan_csv(path, options, None)?
.aggregate(vec![col("c1")], vec![sum(col("c2"))])?
.build()?;
@@ -1499,7 +1500,7 @@ mod tests {
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
let options = CsvReadOptions::new().schema_infer_max_records(100);
- let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
+ let logical_plan = LogicalPlanBuilder::scan_csv(path, options, None)?
.aggregate(vec![col("c1")], vec![sum(col("c2"))])?
.build()?;
diff --git a/datafusion/src/physical_plan/udaf.rs b/datafusion/src/physical_plan/udaf.rs
index f7515d3..c6d65ad 100644
--- a/datafusion/src/physical_plan/udaf.rs
+++ b/datafusion/src/physical_plan/udaf.rs
@@ -105,7 +105,7 @@ pub fn create_aggregate_expr(
fun: &AggregateUDF,
args: &[Arc<dyn PhysicalExpr>],
input_schema: &Schema,
- name: String,
+ name: impl Into<String>,
) -> Result<Arc<dyn AggregateExpr>> {
// coerce
let args = coerce(args, input_schema, &fun.signature)?;
@@ -119,7 +119,7 @@ pub fn create_aggregate_expr(
fun: fun.clone(),
args: args.clone(),
data_type: (fun.return_type)(&arg_types)?.as_ref().clone(),
- name,
+ name: name.into(),
}))
}