You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/05/05 19:22:07 UTC
[arrow-datafusion] branch master updated: Table provider error propagation (#2438)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 5927bfcee Table provider error propagation (#2438)
5927bfcee is described below
commit 5927bfceeba3a4eab7988289c674d925cc82ac05
Author: Jeremy Dyer <jd...@gmail.com>
AuthorDate: Thu May 5 15:22:02 2022 -0400
Table provider error propagation (#2438)
* Change return type of get_table_provider from Option<T> to Result<T>
* cargo fmt changes
* Update another error location
* Update datafusion/core/src/execution/context.rs
Co-authored-by: Andy Grove <an...@gmail.com>
* Update datafusion/core/src/execution/context.rs
Co-authored-by: Andy Grove <an...@gmail.com>
* Update error messages now that errors are propagated
* linter updates
* Remove commented out code that was left by mistake
* Cargo fmt
Co-authored-by: Andy Grove <an...@gmail.com>
---
datafusion/core/src/execution/context.rs | 14 +++++--
datafusion/core/src/sql/planner.rs | 49 ++++++++++++-------------
datafusion/core/tests/sql/errors.rs | 6 +--
datafusion/core/tests/sql/information_schema.rs | 15 +++++---
4 files changed, 47 insertions(+), 37 deletions(-)
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 0b8189b56..e2a38c795 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -1240,7 +1240,6 @@ impl SessionState {
table_ref: impl Into<TableReference<'a>>,
) -> Result<Arc<dyn SchemaProvider>> {
let resolved_ref = self.resolve_table_ref(table_ref);
-
self.catalog_list
.catalog(resolved_ref.catalog)
.ok_or_else(|| {
@@ -1366,10 +1365,17 @@ impl SessionState {
}
impl ContextProvider for SessionState {
- fn get_table_provider(&self, name: TableReference) -> Option<Arc<dyn TableProvider>> {
+ fn get_table_provider(&self, name: TableReference) -> Result<Arc<dyn TableProvider>> {
let resolved_ref = self.resolve_table_ref(name);
- let schema = self.schema_for_ref(resolved_ref).ok()?;
- schema.table(resolved_ref.table)
+ match self.schema_for_ref(resolved_ref) {
+ Ok(schema) => schema.table(resolved_ref.table).ok_or_else(|| {
+ DataFusionError::Plan(format!(
+ "'{}.{}.{}' not found",
+ resolved_ref.catalog, resolved_ref.schema, resolved_ref.table
+ ))
+ }),
+ Err(e) => Err(e),
+ }
}
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs
index 425c685f9..ccf23511d 100644
--- a/datafusion/core/src/sql/planner.rs
+++ b/datafusion/core/src/sql/planner.rs
@@ -76,7 +76,7 @@ use crate::logical_plan::plan::{Analyze, Explain};
/// functions referenced in SQL statements
pub trait ContextProvider {
/// Getter for a datasource
- fn get_table_provider(&self, name: TableReference) -> Option<Arc<dyn TableProvider>>;
+ fn get_table_provider(&self, name: TableReference) -> Result<Arc<dyn TableProvider>>;
/// Getter for a UDF description
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>>;
/// Getter for a UDAF description
@@ -692,7 +692,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
),
_ => Ok(cte_plan.clone()),
},
- (_, Some(provider)) => {
+ (_, Ok(provider)) => {
let scan =
LogicalPlanBuilder::scan(&table_name, provider, None);
let scan = match table_alias.as_ref() {
@@ -701,10 +701,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
};
scan?.build()
}
- (None, None) => Err(DataFusionError::Plan(format!(
- "Table or CTE with name '{}' not found",
- sql_object_name
- ))),
+ (None, Err(e)) => Err(e),
}?,
alias,
)
@@ -2272,11 +2269,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let table_name = normalize_sql_object_name(sql_table_name);
let table_ref: TableReference = table_name.as_str().into();
- if self.schema_provider.get_table_provider(table_ref).is_none() {
- return Err(DataFusionError::Plan(format!(
- "Unknown relation for SHOW COLUMNS: {}",
- sql_table_name
- )));
+ if let Err(e) = self.schema_provider.get_table_provider(table_ref) {
+ return Err(e);
}
// Figure out the where clause
@@ -2314,7 +2308,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let tables_reference = TableReference::Partial { schema, table };
self.schema_provider
.get_table_provider(tables_reference)
- .is_some()
+ .is_ok()
}
fn sql_array_literal(
@@ -4265,25 +4259,25 @@ mod tests {
fn get_table_provider(
&self,
name: TableReference,
- ) -> Option<Arc<dyn TableProvider>> {
+ ) -> Result<Arc<dyn TableProvider>> {
let schema = match name.table() {
- "test" => Some(Schema::new(vec![
+ "test" => Ok(Schema::new(vec![
Field::new("t_date32", DataType::Date32, false),
Field::new("t_date64", DataType::Date64, false),
])),
- "j1" => Some(Schema::new(vec![
+ "j1" => Ok(Schema::new(vec![
Field::new("j1_id", DataType::Int32, false),
Field::new("j1_string", DataType::Utf8, false),
])),
- "j2" => Some(Schema::new(vec![
+ "j2" => Ok(Schema::new(vec![
Field::new("j2_id", DataType::Int32, false),
Field::new("j2_string", DataType::Utf8, false),
])),
- "j3" => Some(Schema::new(vec![
+ "j3" => Ok(Schema::new(vec![
Field::new("j3_id", DataType::Int32, false),
Field::new("j3_string", DataType::Utf8, false),
])),
- "person" => Some(Schema::new(vec![
+ "person" => Ok(Schema::new(vec![
Field::new("id", DataType::UInt32, false),
Field::new("first_name", DataType::Utf8, false),
Field::new("last_name", DataType::Utf8, false),
@@ -4297,7 +4291,7 @@ mod tests {
),
Field::new("😀", DataType::Int32, false),
])),
- "orders" => Some(Schema::new(vec![
+ "orders" => Ok(Schema::new(vec![
Field::new("order_id", DataType::UInt32, false),
Field::new("customer_id", DataType::UInt32, false),
Field::new("o_item_id", DataType::Utf8, false),
@@ -4305,12 +4299,12 @@ mod tests {
Field::new("price", DataType::Float64, false),
Field::new("delivered", DataType::Boolean, false),
])),
- "lineitem" => Some(Schema::new(vec![
+ "lineitem" => Ok(Schema::new(vec![
Field::new("l_item_id", DataType::UInt32, false),
Field::new("l_description", DataType::Utf8, false),
Field::new("price", DataType::Float64, false),
])),
- "aggregate_test_100" => Some(Schema::new(vec![
+ "aggregate_test_100" => Ok(Schema::new(vec![
Field::new("c1", DataType::Utf8, false),
Field::new("c2", DataType::UInt32, false),
Field::new("c3", DataType::Int8, false),
@@ -4325,11 +4319,16 @@ mod tests {
Field::new("c12", DataType::Float64, false),
Field::new("c13", DataType::Utf8, false),
])),
- _ => None,
+ _ => Err(DataFusionError::Plan(format!(
+ "No table named: {} found",
+ name.table()
+ ))),
};
- schema.map(|s| -> Arc<dyn TableProvider> {
- Arc::new(EmptyTable::new(Arc::new(s)))
- })
+
+ match schema {
+ Ok(t) => Ok(Arc::new(EmptyTable::new(Arc::new(t)))),
+ Err(e) => Err(e),
+ }
}
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
diff --git a/datafusion/core/tests/sql/errors.rs b/datafusion/core/tests/sql/errors.rs
index 7680c3e8f..6a9591213 100644
--- a/datafusion/core/tests/sql/errors.rs
+++ b/datafusion/core/tests/sql/errors.rs
@@ -79,7 +79,7 @@ async fn query_cte_incorrect() -> Result<()> {
assert!(plan.is_err());
assert_eq!(
format!("{}", plan.unwrap_err()),
- "Error during planning: Table or CTE with name \'t\' not found"
+ "Error during planning: 'datafusion.public.t' not found"
);
// forward referencing
@@ -88,7 +88,7 @@ async fn query_cte_incorrect() -> Result<()> {
assert!(plan.is_err());
assert_eq!(
format!("{}", plan.unwrap_err()),
- "Error during planning: Table or CTE with name \'u\' not found"
+ "Error during planning: 'datafusion.public.u' not found"
);
// wrapping should hide u
@@ -97,7 +97,7 @@ async fn query_cte_incorrect() -> Result<()> {
assert!(plan.is_err());
assert_eq!(
format!("{}", plan.unwrap_err()),
- "Error during planning: Table or CTE with name \'u\' not found"
+ "Error during planning: 'datafusion.public.u' not found"
);
Ok(())
diff --git a/datafusion/core/tests/sql/information_schema.rs b/datafusion/core/tests/sql/information_schema.rs
index d3de4d699..a7b6bdb45 100644
--- a/datafusion/core/tests/sql/information_schema.rs
+++ b/datafusion/core/tests/sql/information_schema.rs
@@ -36,7 +36,8 @@ async fn information_schema_tables_not_exist_by_default() {
.unwrap_err();
assert_eq!(
err.to_string(),
- "Error during planning: Table or CTE with name 'information_schema.tables' not found"
+ // Error propagates from SessionState::schema_for_ref
+ "Error during planning: failed to resolve schema: information_schema"
);
}
@@ -313,7 +314,8 @@ async fn information_schema_show_columns() {
.unwrap_err();
assert_eq!(
err.to_string(),
- "Error during planning: Unknown relation for SHOW COLUMNS: \"T\""
+ // Error propagates from SessionState::get_table_provider
+ "Error during planning: 'datafusion.public.T' not found"
);
}
@@ -375,7 +377,8 @@ async fn information_schema_show_table_table_names() {
.unwrap_err();
assert_eq!(
err.to_string(),
- "Error during planning: Unknown relation for SHOW COLUMNS: t2"
+ // Error propagates from SessionState::get_table_provider
+ "Error during planning: 'datafusion.public.t2' not found"
);
let err = plan_and_collect(&ctx, "SHOW columns from datafusion.public.t2")
@@ -383,7 +386,8 @@ async fn information_schema_show_table_table_names() {
.unwrap_err();
assert_eq!(
err.to_string(),
- "Error during planning: Unknown relation for SHOW COLUMNS: datafusion.public.t2"
+ // Error propagates from SessionState::get_table_provider
+ "Error during planning: 'datafusion.public.t2' not found"
);
}
@@ -407,7 +411,8 @@ async fn information_schema_columns_not_exist_by_default() {
.unwrap_err();
assert_eq!(
err.to_string(),
- "Error during planning: Table or CTE with name 'information_schema.columns' not found"
+ // Error propagates from SessionState::schema_for_ref
+ "Error during planning: failed to resolve schema: information_schema"
);
}