You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/12/17 11:58:26 UTC
[arrow-datafusion] branch master updated: Detect invalid (unsupported) compression types when parsing (#4637)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new c2f199adb Detect invalid (unsupported) compression types when parsing (#4637)
c2f199adb is described below
commit c2f199adbec8b650de5668c97675f8a757607df8
Author: Remzi Yang <59...@users.noreply.github.com>
AuthorDate: Sat Dec 17 19:58:20 2022 +0800
Detect invalid (unsupported) compression types when parsing (#4637)
* check compression type when parsing
Signed-off-by: remzi <13...@gmail.com>
* refactor
Signed-off-by: remzi <13...@gmail.com>
* test
Signed-off-by: remzi <13...@gmail.com>
Signed-off-by: remzi <13...@gmail.com>
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
datafusion/common/src/parsers.rs | 50 ++++++++++++
datafusion/core/src/catalog/listing_schema.rs | 3 +-
.../core/src/datasource/file_format/file_type.rs | 95 ++++++++++++----------
datafusion/core/src/datasource/listing/table.rs | 2 +-
.../core/src/datasource/listing_table_factory.rs | 10 +--
datafusion/expr/src/logical_plan/plan.rs | 3 +-
datafusion/proto/src/logical_plan.rs | 4 +-
datafusion/sql/src/parser.rs | 51 +++++++-----
datafusion/sql/src/planner.rs | 6 +-
9 files changed, 146 insertions(+), 78 deletions(-)
diff --git a/datafusion/common/src/parsers.rs b/datafusion/common/src/parsers.rs
index c6747007e..aced7c78a 100644
--- a/datafusion/common/src/parsers.rs
+++ b/datafusion/common/src/parsers.rs
@@ -19,11 +19,61 @@
use sqlparser::parser::ParserError;
use crate::{DataFusionError, Result, ScalarValue};
+use std::result;
use std::str::FromStr;
const SECONDS_PER_HOUR: f64 = 3_600_f64;
const NANOS_PER_SECOND: f64 = 1_000_000_000_f64;
+/// Readable file compression type
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum CompressionTypeVariant {
+ /// Gzip-ed file
+ GZIP,
+ /// Bzip2-ed file
+ BZIP2,
+ /// Xz-ed file (liblzma)
+ XZ,
+ /// Uncompressed file
+ UNCOMPRESSED,
+}
+
+impl FromStr for CompressionTypeVariant {
+ type Err = ParserError;
+
+ fn from_str(s: &str) -> result::Result<Self, ParserError> {
+ let s = s.to_uppercase();
+ match s.as_str() {
+ "GZIP" | "GZ" => Ok(Self::GZIP),
+ "BZIP2" | "BZ2" => Ok(Self::BZIP2),
+ "XZ" => Ok(Self::XZ),
+ "" => Ok(Self::UNCOMPRESSED),
+ _ => Err(ParserError::ParserError(format!(
+ "Unsupported file compression type {}",
+ s
+ ))),
+ }
+ }
+}
+
+impl ToString for CompressionTypeVariant {
+ fn to_string(&self) -> String {
+ match self {
+ Self::GZIP => "GZIP",
+ Self::BZIP2 => "BZIP2",
+ Self::XZ => "XZ",
+ Self::UNCOMPRESSED => "",
+ }
+ .to_string()
+ }
+}
+
+impl CompressionTypeVariant {
+ pub const fn is_compressed(&self) -> bool {
+ !matches!(self, &Self::UNCOMPRESSED)
+ }
+}
+
#[derive(Clone, Copy)]
#[repr(u16)]
enum IntervalType {
diff --git a/datafusion/core/src/catalog/listing_schema.rs b/datafusion/core/src/catalog/listing_schema.rs
index 5fc2f48ad..be2337839 100644
--- a/datafusion/core/src/catalog/listing_schema.rs
+++ b/datafusion/core/src/catalog/listing_schema.rs
@@ -20,6 +20,7 @@ use crate::catalog::schema::SchemaProvider;
use crate::datasource::datasource::TableProviderFactory;
use crate::datasource::TableProvider;
use crate::execution::context::SessionState;
+use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{DFSchema, DataFusionError, OwnedTableReference};
use datafusion_expr::CreateExternalTable;
use futures::TryStreamExt;
@@ -136,7 +137,7 @@ impl ListingSchemaProvider {
table_partition_cols: vec![],
if_not_exists: false,
definition: None,
- file_compression_type: "".to_string(),
+ file_compression_type: CompressionTypeVariant::UNCOMPRESSED,
options: Default::default(),
},
)
diff --git a/datafusion/core/src/datasource/file_format/file_type.rs b/datafusion/core/src/datasource/file_format/file_type.rs
index 5a7b3c348..3aa802e45 100644
--- a/datafusion/core/src/datasource/file_format/file_type.rs
+++ b/datafusion/core/src/datasource/file_format/file_type.rs
@@ -31,6 +31,7 @@ use async_compression::tokio::bufread::{
use bytes::Bytes;
#[cfg(feature = "compression")]
use bzip2::read::BzDecoder;
+use datafusion_common::parsers::CompressionTypeVariant;
#[cfg(feature = "compression")]
use flate2::read::GzDecoder;
use futures::Stream;
@@ -41,6 +42,7 @@ use std::str::FromStr;
use tokio_util::io::{ReaderStream, StreamReader};
#[cfg(feature = "compression")]
use xz2::read::XzDecoder;
+use CompressionTypeVariant::*;
/// Define each `FileType`/`FileCompressionType`'s extension
pub trait GetExt {
@@ -50,48 +52,59 @@ pub trait GetExt {
/// Readable file compression type
#[derive(Debug, Clone, PartialEq, Eq)]
-pub enum FileCompressionType {
- /// Gzip-ed file
- GZIP,
- /// Bzip2-ed file
- BZIP2,
- /// Xz-ed file (liblzma)
- XZ,
- /// Uncompressed file
- UNCOMPRESSED,
+pub struct FileCompressionType {
+ variant: CompressionTypeVariant,
}
impl GetExt for FileCompressionType {
fn get_ext(&self) -> String {
- match self {
- FileCompressionType::GZIP => ".gz".to_owned(),
- FileCompressionType::BZIP2 => ".bz2".to_owned(),
- FileCompressionType::XZ => ".xz".to_owned(),
- FileCompressionType::UNCOMPRESSED => "".to_owned(),
+ match self.variant {
+ GZIP => ".gz".to_owned(),
+ BZIP2 => ".bz2".to_owned(),
+ XZ => ".xz".to_owned(),
+ UNCOMPRESSED => "".to_owned(),
}
}
}
+impl From<CompressionTypeVariant> for FileCompressionType {
+ fn from(t: CompressionTypeVariant) -> Self {
+ Self { variant: t }
+ }
+}
+
impl FromStr for FileCompressionType {
type Err = DataFusionError;
fn from_str(s: &str) -> Result<Self> {
- let s = s.to_uppercase();
- match s.as_str() {
- "GZIP" | "GZ" => Ok(FileCompressionType::GZIP),
- "BZIP2" | "BZ2" => Ok(FileCompressionType::BZIP2),
- "XZ" => Ok(FileCompressionType::XZ),
- "" => Ok(FileCompressionType::UNCOMPRESSED),
- _ => Err(DataFusionError::NotImplemented(format!(
- "Unknown FileCompressionType: {}",
- s
- ))),
- }
+ let variant = CompressionTypeVariant::from_str(s).map_err(|_| {
+ DataFusionError::NotImplemented(format!("Unknown FileCompressionType: {}", s))
+ })?;
+ Ok(Self { variant })
}
}
/// `FileCompressionType` implementation
impl FileCompressionType {
+ /// Gzip-ed file
+ pub const GZIP: Self = Self { variant: GZIP };
+
+ /// Bzip2-ed file
+ pub const BZIP2: Self = Self { variant: BZIP2 };
+
+ /// Xz-ed file (liblzma)
+ pub const XZ: Self = Self { variant: XZ };
+
+ /// Uncompressed file
+ pub const UNCOMPRESSED: Self = Self {
+ variant: UNCOMPRESSED,
+ };
+
+ /// The file is compressed or not
+ pub const fn is_compressed(&self) -> bool {
+ self.variant.is_compressed()
+ }
+
/// Given a `Stream`, create a `Stream` which data are decompressed with `FileCompressionType`.
pub fn convert_stream<T: Stream<Item = Result<Bytes>> + Unpin + Send + 'static>(
&self,
@@ -111,31 +124,29 @@ impl FileCompressionType {
None => Into::<DataFusionError>::into(e),
};
- Ok(match self {
+ Ok(match self.variant {
#[cfg(feature = "compression")]
- FileCompressionType::GZIP => Box::new(
+ GZIP => Box::new(
ReaderStream::new(AsyncGzDecoder::new(StreamReader::new(s)))
.map_err(err_converter),
),
#[cfg(feature = "compression")]
- FileCompressionType::BZIP2 => Box::new(
+ BZIP2 => Box::new(
ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s)))
.map_err(err_converter),
),
#[cfg(feature = "compression")]
- FileCompressionType::XZ => Box::new(
+ XZ => Box::new(
ReaderStream::new(AsyncXzDecoder::new(StreamReader::new(s)))
.map_err(err_converter),
),
#[cfg(not(feature = "compression"))]
- FileCompressionType::GZIP
- | FileCompressionType::BZIP2
- | FileCompressionType::XZ => {
+ GZIP | BZIP2 | XZ => {
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
))
}
- FileCompressionType::UNCOMPRESSED => Box::new(s),
+ UNCOMPRESSED => Box::new(s),
})
}
@@ -144,22 +155,20 @@ impl FileCompressionType {
&self,
r: T,
) -> Result<Box<dyn std::io::Read + Send>> {
- Ok(match self {
+ Ok(match self.variant {
#[cfg(feature = "compression")]
- FileCompressionType::GZIP => Box::new(GzDecoder::new(r)),
+ GZIP => Box::new(GzDecoder::new(r)),
#[cfg(feature = "compression")]
- FileCompressionType::BZIP2 => Box::new(BzDecoder::new(r)),
+ BZIP2 => Box::new(BzDecoder::new(r)),
#[cfg(feature = "compression")]
- FileCompressionType::XZ => Box::new(XzDecoder::new(r)),
+ XZ => Box::new(XzDecoder::new(r)),
#[cfg(not(feature = "compression"))]
- FileCompressionType::GZIP
- | FileCompressionType::BZIP2
- | FileCompressionType::XZ => {
+ GZIP | BZIP2 | XZ => {
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
))
}
- FileCompressionType::UNCOMPRESSED => Box::new(r),
+ UNCOMPRESSED => Box::new(r),
})
}
}
@@ -213,8 +222,8 @@ impl FileType {
match self {
FileType::JSON | FileType::CSV => Ok(format!("{}{}", ext, c.get_ext())),
- FileType::PARQUET | FileType::AVRO => match c {
- FileCompressionType::UNCOMPRESSED => Ok(ext),
+ FileType::PARQUET | FileType::AVRO => match c.variant {
+ UNCOMPRESSED => Ok(ext),
_ => Err(DataFusionError::Internal(
"FileCompressionType can be specified for CSV/JSON FileType.".into(),
)),
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 3ea62dbd0..34f88b86c 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -122,7 +122,7 @@ impl ListingTableConfig {
let file_compression_type = FileCompressionType::from_str(splitted)
.unwrap_or(FileCompressionType::UNCOMPRESSED);
- if file_compression_type != FileCompressionType::UNCOMPRESSED {
+ if file_compression_type.is_compressed() {
splitted = exts.next().unwrap_or("");
}
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs
index ad6b820ba..b22bc0596 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -59,15 +59,7 @@ impl TableProviderFactory for ListingTableFactory {
state: &SessionState,
cmd: &CreateExternalTable,
) -> datafusion_common::Result<Arc<dyn TableProvider>> {
- let file_compression_type = FileCompressionType::from_str(
- cmd.file_compression_type.as_str(),
- )
- .map_err(|_| {
- DataFusionError::Execution(format!(
- "Unknown FileCompressionType {}",
- cmd.file_compression_type.as_str()
- ))
- })?;
+ let file_compression_type = FileCompressionType::from(cmd.file_compression_type);
let file_type = FileType::from_str(cmd.file_type.as_str()).map_err(|_| {
DataFusionError::Execution(format!("Unknown FileType {}", cmd.file_type))
})?;
diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs
index 9368b1ec4..14dfe7143 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -29,6 +29,7 @@ use crate::{
build_join_schema, Expr, ExprSchemable, TableProviderFilterPushDown, TableSource,
};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{
plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference,
ScalarValue,
@@ -1491,7 +1492,7 @@ pub struct CreateExternalTable {
/// SQL used to create the table, if available
pub definition: Option<String>,
/// File compression type (GZIP, BZIP2, XZ)
- pub file_compression_type: String,
+ pub file_compression_type: CompressionTypeVariant,
/// Table(provider) specific options
pub options: HashMap<String, String>,
}
diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs
index f2000d7a7..5f0bd35ae 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -38,6 +38,7 @@ use datafusion::{
datasource::{provider_as_source, source_as_provider},
prelude::SessionContext,
};
+use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{context, DataFusionError, OwnedTableReference};
use datafusion_expr::logical_plan::{builder::project, Prepare};
use datafusion_expr::{
@@ -51,6 +52,7 @@ use datafusion_expr::{
use prost::bytes::BufMut;
use prost::Message;
use std::fmt::Debug;
+use std::str::FromStr;
use std::sync::Arc;
fn byte_to_string(b: u8) -> Result<String, DataFusionError> {
@@ -607,7 +609,7 @@ impl AsLogicalPlan for LogicalPlanNode {
.table_partition_cols
.clone(),
if_not_exists: create_extern_table.if_not_exists,
- file_compression_type: create_extern_table.file_compression_type.to_string(),
+ file_compression_type: CompressionTypeVariant::from_str(&create_extern_table.file_compression_type).map_err(|_| DataFusionError::NotImplemented(format!("Unsupported file compression type {}", create_extern_table.file_compression_type)))?,
definition,
options: create_extern_table.options.clone(),
}))
diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs
index d4af9f869..4fbae5083 100644
--- a/datafusion/sql/src/parser.rs
+++ b/datafusion/sql/src/parser.rs
@@ -19,6 +19,7 @@
//!
//! Declares a SQL parser based on sqlparser that handles custom formats that we need.
+use datafusion_common::parsers::CompressionTypeVariant;
use sqlparser::{
ast::{
ColumnDef, ColumnOptionDef, ObjectName, Statement as SQLStatement,
@@ -28,7 +29,7 @@ use sqlparser::{
parser::{Parser, ParserError},
tokenizer::{Token, Tokenizer},
};
-use std::collections::HashMap;
+use std::{collections::HashMap, str::FromStr};
use std::{collections::VecDeque, fmt};
// Use `Parser::expected` instead, if possible
@@ -42,10 +43,6 @@ fn parse_file_type(s: &str) -> Result<String, ParserError> {
Ok(s.to_uppercase())
}
-fn parse_file_compression_type(s: &str) -> Result<String, ParserError> {
- Ok(s.to_uppercase())
-}
-
/// DataFusion extension DDL for `CREATE EXTERNAL TABLE`
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CreateExternalTable {
@@ -66,7 +63,7 @@ pub struct CreateExternalTable {
/// Option to not error if table already exists
pub if_not_exists: bool,
/// File compression type (GZIP, BZIP2, XZ)
- pub file_compression_type: String,
+ pub file_compression_type: CompressionTypeVariant,
/// Table(provider) specific options
pub options: HashMap<String, String>,
}
@@ -341,7 +338,7 @@ impl<'a> DFParser<'a> {
let file_compression_type = if self.parse_has_file_compression_type() {
self.parse_file_compression_type()?
} else {
- "".to_string()
+ CompressionTypeVariant::UNCOMPRESSED
};
let table_partition_cols = if self.parse_has_partition() {
@@ -383,9 +380,11 @@ impl<'a> DFParser<'a> {
}
/// Parses the set of
- fn parse_file_compression_type(&mut self) -> Result<String, ParserError> {
+ fn parse_file_compression_type(
+ &mut self,
+ ) -> Result<CompressionTypeVariant, ParserError> {
match self.parser.next_token() {
- Token::Word(w) => parse_file_compression_type(&w.value),
+ Token::Word(w) => CompressionTypeVariant::from_str(&w.value),
unexpected => self.expected("one of GZIP, BZIP2, XZ", unexpected),
}
}
@@ -451,6 +450,7 @@ impl<'a> DFParser<'a> {
mod tests {
use super::*;
use sqlparser::ast::{DataType, Ident};
+ use CompressionTypeVariant::UNCOMPRESSED;
fn expect_parse_ok(sql: &str, expected: Statement) -> Result<(), ParserError> {
let statements = DFParser::parse_sql(sql)?;
@@ -510,7 +510,7 @@ mod tests {
location: "foo.csv".into(),
table_partition_cols: vec![],
if_not_exists: false,
- file_compression_type: "".to_string(),
+ file_compression_type: UNCOMPRESSED,
options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
@@ -527,7 +527,7 @@ mod tests {
location: "foo.csv".into(),
table_partition_cols: vec![],
if_not_exists: false,
- file_compression_type: "".to_string(),
+ file_compression_type: UNCOMPRESSED,
options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
@@ -544,7 +544,7 @@ mod tests {
location: "foo.csv".into(),
table_partition_cols: vec!["p1".to_string(), "p2".to_string()],
if_not_exists: false,
- file_compression_type: "".to_string(),
+ file_compression_type: UNCOMPRESSED,
options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
@@ -564,7 +564,7 @@ mod tests {
location: "foo.csv".into(),
table_partition_cols: vec![],
if_not_exists: false,
- file_compression_type: "".to_string(),
+ file_compression_type: UNCOMPRESSED,
options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
@@ -586,7 +586,9 @@ mod tests {
location: "foo.csv".into(),
table_partition_cols: vec![],
if_not_exists: false,
- file_compression_type: file_compression_type.to_owned(),
+ file_compression_type: CompressionTypeVariant::from_str(
+ file_compression_type,
+ )?,
options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
@@ -603,7 +605,7 @@ mod tests {
location: "foo.parquet".into(),
table_partition_cols: vec![],
if_not_exists: false,
- file_compression_type: "".to_string(),
+ file_compression_type: UNCOMPRESSED,
options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
@@ -619,7 +621,7 @@ mod tests {
location: "foo.parquet".into(),
table_partition_cols: vec![],
if_not_exists: false,
- file_compression_type: "".to_string(),
+ file_compression_type: UNCOMPRESSED,
options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
@@ -635,7 +637,7 @@ mod tests {
location: "foo.avro".into(),
table_partition_cols: vec![],
if_not_exists: false,
- file_compression_type: "".to_string(),
+ file_compression_type: UNCOMPRESSED,
options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
@@ -652,7 +654,7 @@ mod tests {
location: "foo.parquet".into(),
table_partition_cols: vec![],
if_not_exists: true,
- file_compression_type: "".to_string(),
+ file_compression_type: UNCOMPRESSED,
options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
@@ -674,7 +676,7 @@ mod tests {
location: "blahblah".into(),
table_partition_cols: vec![],
if_not_exists: false,
- file_compression_type: "".to_string(),
+ file_compression_type: UNCOMPRESSED,
options: HashMap::from([("k1".into(), "v1".into())]),
});
expect_parse_ok(sql, expected)?;
@@ -691,7 +693,7 @@ mod tests {
location: "blahblah".into(),
table_partition_cols: vec![],
if_not_exists: false,
- file_compression_type: "".to_string(),
+ file_compression_type: UNCOMPRESSED,
options: HashMap::from([
("k1".into(), "v1".into()),
("k2".into(), "v2".into()),
@@ -724,4 +726,13 @@ mod tests {
Ok(())
}
+
+ #[test]
+ fn invalid_compression_type() {
+ let sql = "CREATE EXTERNAL TABLE t STORED AS CSV COMPRESSION TYPE ZZZ LOCATION 'blahblah'";
+ expect_parse_error(
+ sql,
+ "sql parser error: Unsupported file compression type ZZZ",
+ )
+ }
}
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index 7f12e9dec..da62ebf99 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -36,7 +36,7 @@ use sqlparser::ast::{ObjectType, OrderByExpr, Statement};
use sqlparser::ast::{TimezoneInfo, WildcardAdditionalOptions};
use sqlparser::parser::ParserError::ParserError;
-use datafusion_common::parsers::parse_interval;
+use datafusion_common::parsers::{parse_interval, CompressionTypeVariant};
use datafusion_common::ToDFSchema;
use datafusion_common::{
field_not_found, Column, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue,
@@ -620,7 +620,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
))?;
}
- if file_type != "CSV" && file_type != "JSON" && !file_compression_type.is_empty()
+ if file_type != "CSV"
+ && file_type != "JSON"
+ && file_compression_type != CompressionTypeVariant::UNCOMPRESSED
{
Err(DataFusionError::Plan(
"File compression type can be specified for CSV/JSON files.".into(),