You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/12/17 11:58:26 UTC

[arrow-datafusion] branch master updated: Detect invalid (unsupported) compression types when parsing (#4637)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new c2f199adb Detect invalid (unsupported) compression types when parsing (#4637)
c2f199adb is described below

commit c2f199adbec8b650de5668c97675f8a757607df8
Author: Remzi Yang <59...@users.noreply.github.com>
AuthorDate: Sat Dec 17 19:58:20 2022 +0800

    Detect invalid (unsupported) compression types when parsing (#4637)
    
    * check compression type when parsing
    
    Signed-off-by: remzi <13...@gmail.com>
    
    * refactor
    
    Signed-off-by: remzi <13...@gmail.com>
    
    * test
    
    Signed-off-by: remzi <13...@gmail.com>
    
    Signed-off-by: remzi <13...@gmail.com>
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 datafusion/common/src/parsers.rs                   | 50 ++++++++++++
 datafusion/core/src/catalog/listing_schema.rs      |  3 +-
 .../core/src/datasource/file_format/file_type.rs   | 95 ++++++++++++----------
 datafusion/core/src/datasource/listing/table.rs    |  2 +-
 .../core/src/datasource/listing_table_factory.rs   | 10 +--
 datafusion/expr/src/logical_plan/plan.rs           |  3 +-
 datafusion/proto/src/logical_plan.rs               |  4 +-
 datafusion/sql/src/parser.rs                       | 51 +++++++-----
 datafusion/sql/src/planner.rs                      |  6 +-
 9 files changed, 146 insertions(+), 78 deletions(-)

diff --git a/datafusion/common/src/parsers.rs b/datafusion/common/src/parsers.rs
index c6747007e..aced7c78a 100644
--- a/datafusion/common/src/parsers.rs
+++ b/datafusion/common/src/parsers.rs
@@ -19,11 +19,61 @@
 use sqlparser::parser::ParserError;
 
 use crate::{DataFusionError, Result, ScalarValue};
+use std::result;
 use std::str::FromStr;
 
 const SECONDS_PER_HOUR: f64 = 3_600_f64;
 const NANOS_PER_SECOND: f64 = 1_000_000_000_f64;
 
+/// Readable file compression type
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum CompressionTypeVariant {
+    /// Gzip-ed file
+    GZIP,
+    /// Bzip2-ed file
+    BZIP2,
+    /// Xz-ed file (liblzma)
+    XZ,
+    /// Uncompressed file
+    UNCOMPRESSED,
+}
+
+impl FromStr for CompressionTypeVariant {
+    type Err = ParserError;
+
+    fn from_str(s: &str) -> result::Result<Self, ParserError> {
+        let s = s.to_uppercase();
+        match s.as_str() {
+            "GZIP" | "GZ" => Ok(Self::GZIP),
+            "BZIP2" | "BZ2" => Ok(Self::BZIP2),
+            "XZ" => Ok(Self::XZ),
+            "" => Ok(Self::UNCOMPRESSED),
+            _ => Err(ParserError::ParserError(format!(
+                "Unsupported file compression type {}",
+                s
+            ))),
+        }
+    }
+}
+
+impl ToString for CompressionTypeVariant {
+    fn to_string(&self) -> String {
+        match self {
+            Self::GZIP => "GZIP",
+            Self::BZIP2 => "BZIP2",
+            Self::XZ => "XZ",
+            Self::UNCOMPRESSED => "",
+        }
+        .to_string()
+    }
+}
+
+impl CompressionTypeVariant {
+    pub const fn is_compressed(&self) -> bool {
+        !matches!(self, &Self::UNCOMPRESSED)
+    }
+}
+
 #[derive(Clone, Copy)]
 #[repr(u16)]
 enum IntervalType {
diff --git a/datafusion/core/src/catalog/listing_schema.rs b/datafusion/core/src/catalog/listing_schema.rs
index 5fc2f48ad..be2337839 100644
--- a/datafusion/core/src/catalog/listing_schema.rs
+++ b/datafusion/core/src/catalog/listing_schema.rs
@@ -20,6 +20,7 @@ use crate::catalog::schema::SchemaProvider;
 use crate::datasource::datasource::TableProviderFactory;
 use crate::datasource::TableProvider;
 use crate::execution::context::SessionState;
+use datafusion_common::parsers::CompressionTypeVariant;
 use datafusion_common::{DFSchema, DataFusionError, OwnedTableReference};
 use datafusion_expr::CreateExternalTable;
 use futures::TryStreamExt;
@@ -136,7 +137,7 @@ impl ListingSchemaProvider {
                             table_partition_cols: vec![],
                             if_not_exists: false,
                             definition: None,
-                            file_compression_type: "".to_string(),
+                            file_compression_type: CompressionTypeVariant::UNCOMPRESSED,
                             options: Default::default(),
                         },
                     )
diff --git a/datafusion/core/src/datasource/file_format/file_type.rs b/datafusion/core/src/datasource/file_format/file_type.rs
index 5a7b3c348..3aa802e45 100644
--- a/datafusion/core/src/datasource/file_format/file_type.rs
+++ b/datafusion/core/src/datasource/file_format/file_type.rs
@@ -31,6 +31,7 @@ use async_compression::tokio::bufread::{
 use bytes::Bytes;
 #[cfg(feature = "compression")]
 use bzip2::read::BzDecoder;
+use datafusion_common::parsers::CompressionTypeVariant;
 #[cfg(feature = "compression")]
 use flate2::read::GzDecoder;
 use futures::Stream;
@@ -41,6 +42,7 @@ use std::str::FromStr;
 use tokio_util::io::{ReaderStream, StreamReader};
 #[cfg(feature = "compression")]
 use xz2::read::XzDecoder;
+use CompressionTypeVariant::*;
 
 /// Define each `FileType`/`FileCompressionType`'s extension
 pub trait GetExt {
@@ -50,48 +52,59 @@ pub trait GetExt {
 
 /// Readable file compression type
 #[derive(Debug, Clone, PartialEq, Eq)]
-pub enum FileCompressionType {
-    /// Gzip-ed file
-    GZIP,
-    /// Bzip2-ed file
-    BZIP2,
-    /// Xz-ed file (liblzma)
-    XZ,
-    /// Uncompressed file
-    UNCOMPRESSED,
+pub struct FileCompressionType {
+    variant: CompressionTypeVariant,
 }
 
 impl GetExt for FileCompressionType {
     fn get_ext(&self) -> String {
-        match self {
-            FileCompressionType::GZIP => ".gz".to_owned(),
-            FileCompressionType::BZIP2 => ".bz2".to_owned(),
-            FileCompressionType::XZ => ".xz".to_owned(),
-            FileCompressionType::UNCOMPRESSED => "".to_owned(),
+        match self.variant {
+            GZIP => ".gz".to_owned(),
+            BZIP2 => ".bz2".to_owned(),
+            XZ => ".xz".to_owned(),
+            UNCOMPRESSED => "".to_owned(),
         }
     }
 }
 
+impl From<CompressionTypeVariant> for FileCompressionType {
+    fn from(t: CompressionTypeVariant) -> Self {
+        Self { variant: t }
+    }
+}
+
 impl FromStr for FileCompressionType {
     type Err = DataFusionError;
 
     fn from_str(s: &str) -> Result<Self> {
-        let s = s.to_uppercase();
-        match s.as_str() {
-            "GZIP" | "GZ" => Ok(FileCompressionType::GZIP),
-            "BZIP2" | "BZ2" => Ok(FileCompressionType::BZIP2),
-            "XZ" => Ok(FileCompressionType::XZ),
-            "" => Ok(FileCompressionType::UNCOMPRESSED),
-            _ => Err(DataFusionError::NotImplemented(format!(
-                "Unknown FileCompressionType: {}",
-                s
-            ))),
-        }
+        let variant = CompressionTypeVariant::from_str(s).map_err(|_| {
+            DataFusionError::NotImplemented(format!("Unknown FileCompressionType: {}", s))
+        })?;
+        Ok(Self { variant })
     }
 }
 
 /// `FileCompressionType` implementation
 impl FileCompressionType {
+    /// Gzip-ed file
+    pub const GZIP: Self = Self { variant: GZIP };
+
+    /// Bzip2-ed file
+    pub const BZIP2: Self = Self { variant: BZIP2 };
+
+    /// Xz-ed file (liblzma)
+    pub const XZ: Self = Self { variant: XZ };
+
+    /// Uncompressed file
+    pub const UNCOMPRESSED: Self = Self {
+        variant: UNCOMPRESSED,
+    };
+
+    /// The file is compressed or not
+    pub const fn is_compressed(&self) -> bool {
+        self.variant.is_compressed()
+    }
+
     /// Given a `Stream`, create a `Stream` which data are decompressed with `FileCompressionType`.
     pub fn convert_stream<T: Stream<Item = Result<Bytes>> + Unpin + Send + 'static>(
         &self,
@@ -111,31 +124,29 @@ impl FileCompressionType {
             None => Into::<DataFusionError>::into(e),
         };
 
-        Ok(match self {
+        Ok(match self.variant {
             #[cfg(feature = "compression")]
-            FileCompressionType::GZIP => Box::new(
+            GZIP => Box::new(
                 ReaderStream::new(AsyncGzDecoder::new(StreamReader::new(s)))
                     .map_err(err_converter),
             ),
             #[cfg(feature = "compression")]
-            FileCompressionType::BZIP2 => Box::new(
+            BZIP2 => Box::new(
                 ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s)))
                     .map_err(err_converter),
             ),
             #[cfg(feature = "compression")]
-            FileCompressionType::XZ => Box::new(
+            XZ => Box::new(
                 ReaderStream::new(AsyncXzDecoder::new(StreamReader::new(s)))
                     .map_err(err_converter),
             ),
             #[cfg(not(feature = "compression"))]
-            FileCompressionType::GZIP
-            | FileCompressionType::BZIP2
-            | FileCompressionType::XZ => {
+            GZIP | BZIP2 | XZ => {
                 return Err(DataFusionError::NotImplemented(
                     "Compression feature is not enabled".to_owned(),
                 ))
             }
-            FileCompressionType::UNCOMPRESSED => Box::new(s),
+            UNCOMPRESSED => Box::new(s),
         })
     }
 
@@ -144,22 +155,20 @@ impl FileCompressionType {
         &self,
         r: T,
     ) -> Result<Box<dyn std::io::Read + Send>> {
-        Ok(match self {
+        Ok(match self.variant {
             #[cfg(feature = "compression")]
-            FileCompressionType::GZIP => Box::new(GzDecoder::new(r)),
+            GZIP => Box::new(GzDecoder::new(r)),
             #[cfg(feature = "compression")]
-            FileCompressionType::BZIP2 => Box::new(BzDecoder::new(r)),
+            BZIP2 => Box::new(BzDecoder::new(r)),
             #[cfg(feature = "compression")]
-            FileCompressionType::XZ => Box::new(XzDecoder::new(r)),
+            XZ => Box::new(XzDecoder::new(r)),
             #[cfg(not(feature = "compression"))]
-            FileCompressionType::GZIP
-            | FileCompressionType::BZIP2
-            | FileCompressionType::XZ => {
+            GZIP | BZIP2 | XZ => {
                 return Err(DataFusionError::NotImplemented(
                     "Compression feature is not enabled".to_owned(),
                 ))
             }
-            FileCompressionType::UNCOMPRESSED => Box::new(r),
+            UNCOMPRESSED => Box::new(r),
         })
     }
 }
@@ -213,8 +222,8 @@ impl FileType {
 
         match self {
             FileType::JSON | FileType::CSV => Ok(format!("{}{}", ext, c.get_ext())),
-            FileType::PARQUET | FileType::AVRO => match c {
-                FileCompressionType::UNCOMPRESSED => Ok(ext),
+            FileType::PARQUET | FileType::AVRO => match c.variant {
+                UNCOMPRESSED => Ok(ext),
                 _ => Err(DataFusionError::Internal(
                     "FileCompressionType can be specified for CSV/JSON FileType.".into(),
                 )),
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 3ea62dbd0..34f88b86c 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -122,7 +122,7 @@ impl ListingTableConfig {
         let file_compression_type = FileCompressionType::from_str(splitted)
             .unwrap_or(FileCompressionType::UNCOMPRESSED);
 
-        if file_compression_type != FileCompressionType::UNCOMPRESSED {
+        if file_compression_type.is_compressed() {
             splitted = exts.next().unwrap_or("");
         }
 
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs
index ad6b820ba..b22bc0596 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -59,15 +59,7 @@ impl TableProviderFactory for ListingTableFactory {
         state: &SessionState,
         cmd: &CreateExternalTable,
     ) -> datafusion_common::Result<Arc<dyn TableProvider>> {
-        let file_compression_type = FileCompressionType::from_str(
-            cmd.file_compression_type.as_str(),
-        )
-        .map_err(|_| {
-            DataFusionError::Execution(format!(
-                "Unknown FileCompressionType {}",
-                cmd.file_compression_type.as_str()
-            ))
-        })?;
+        let file_compression_type = FileCompressionType::from(cmd.file_compression_type);
         let file_type = FileType::from_str(cmd.file_type.as_str()).map_err(|_| {
             DataFusionError::Execution(format!("Unknown FileType {}", cmd.file_type))
         })?;
diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs
index 9368b1ec4..14dfe7143 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -29,6 +29,7 @@ use crate::{
     build_join_schema, Expr, ExprSchemable, TableProviderFilterPushDown, TableSource,
 };
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion_common::parsers::CompressionTypeVariant;
 use datafusion_common::{
     plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference,
     ScalarValue,
@@ -1491,7 +1492,7 @@ pub struct CreateExternalTable {
     /// SQL used to create the table, if available
     pub definition: Option<String>,
     /// File compression type (GZIP, BZIP2, XZ)
-    pub file_compression_type: String,
+    pub file_compression_type: CompressionTypeVariant,
     /// Table(provider) specific options
     pub options: HashMap<String, String>,
 }
diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs
index f2000d7a7..5f0bd35ae 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -38,6 +38,7 @@ use datafusion::{
     datasource::{provider_as_source, source_as_provider},
     prelude::SessionContext,
 };
+use datafusion_common::parsers::CompressionTypeVariant;
 use datafusion_common::{context, DataFusionError, OwnedTableReference};
 use datafusion_expr::logical_plan::{builder::project, Prepare};
 use datafusion_expr::{
@@ -51,6 +52,7 @@ use datafusion_expr::{
 use prost::bytes::BufMut;
 use prost::Message;
 use std::fmt::Debug;
+use std::str::FromStr;
 use std::sync::Arc;
 
 fn byte_to_string(b: u8) -> Result<String, DataFusionError> {
@@ -607,7 +609,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                         .table_partition_cols
                         .clone(),
                     if_not_exists: create_extern_table.if_not_exists,
-                    file_compression_type: create_extern_table.file_compression_type.to_string(),
+                    file_compression_type: CompressionTypeVariant::from_str(&create_extern_table.file_compression_type).map_err(|_| DataFusionError::NotImplemented(format!("Unsupported file compression type {}", create_extern_table.file_compression_type)))?,
                     definition,
                     options: create_extern_table.options.clone(),
                 }))
diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs
index d4af9f869..4fbae5083 100644
--- a/datafusion/sql/src/parser.rs
+++ b/datafusion/sql/src/parser.rs
@@ -19,6 +19,7 @@
 //!
 //! Declares a SQL parser based on sqlparser that handles custom formats that we need.
 
+use datafusion_common::parsers::CompressionTypeVariant;
 use sqlparser::{
     ast::{
         ColumnDef, ColumnOptionDef, ObjectName, Statement as SQLStatement,
@@ -28,7 +29,7 @@ use sqlparser::{
     parser::{Parser, ParserError},
     tokenizer::{Token, Tokenizer},
 };
-use std::collections::HashMap;
+use std::{collections::HashMap, str::FromStr};
 use std::{collections::VecDeque, fmt};
 
 // Use `Parser::expected` instead, if possible
@@ -42,10 +43,6 @@ fn parse_file_type(s: &str) -> Result<String, ParserError> {
     Ok(s.to_uppercase())
 }
 
-fn parse_file_compression_type(s: &str) -> Result<String, ParserError> {
-    Ok(s.to_uppercase())
-}
-
 /// DataFusion extension DDL for `CREATE EXTERNAL TABLE`
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub struct CreateExternalTable {
@@ -66,7 +63,7 @@ pub struct CreateExternalTable {
     /// Option to not error if table already exists
     pub if_not_exists: bool,
     /// File compression type (GZIP, BZIP2, XZ)
-    pub file_compression_type: String,
+    pub file_compression_type: CompressionTypeVariant,
     /// Table(provider) specific options
     pub options: HashMap<String, String>,
 }
@@ -341,7 +338,7 @@ impl<'a> DFParser<'a> {
         let file_compression_type = if self.parse_has_file_compression_type() {
             self.parse_file_compression_type()?
         } else {
-            "".to_string()
+            CompressionTypeVariant::UNCOMPRESSED
         };
 
         let table_partition_cols = if self.parse_has_partition() {
@@ -383,9 +380,11 @@ impl<'a> DFParser<'a> {
     }
 
     /// Parses the set of
-    fn parse_file_compression_type(&mut self) -> Result<String, ParserError> {
+    fn parse_file_compression_type(
+        &mut self,
+    ) -> Result<CompressionTypeVariant, ParserError> {
         match self.parser.next_token() {
-            Token::Word(w) => parse_file_compression_type(&w.value),
+            Token::Word(w) => CompressionTypeVariant::from_str(&w.value),
             unexpected => self.expected("one of GZIP, BZIP2, XZ", unexpected),
         }
     }
@@ -451,6 +450,7 @@ impl<'a> DFParser<'a> {
 mod tests {
     use super::*;
     use sqlparser::ast::{DataType, Ident};
+    use CompressionTypeVariant::UNCOMPRESSED;
 
     fn expect_parse_ok(sql: &str, expected: Statement) -> Result<(), ParserError> {
         let statements = DFParser::parse_sql(sql)?;
@@ -510,7 +510,7 @@ mod tests {
             location: "foo.csv".into(),
             table_partition_cols: vec![],
             if_not_exists: false,
-            file_compression_type: "".to_string(),
+            file_compression_type: UNCOMPRESSED,
             options: HashMap::new(),
         });
         expect_parse_ok(sql, expected)?;
@@ -527,7 +527,7 @@ mod tests {
             location: "foo.csv".into(),
             table_partition_cols: vec![],
             if_not_exists: false,
-            file_compression_type: "".to_string(),
+            file_compression_type: UNCOMPRESSED,
             options: HashMap::new(),
         });
         expect_parse_ok(sql, expected)?;
@@ -544,7 +544,7 @@ mod tests {
             location: "foo.csv".into(),
             table_partition_cols: vec!["p1".to_string(), "p2".to_string()],
             if_not_exists: false,
-            file_compression_type: "".to_string(),
+            file_compression_type: UNCOMPRESSED,
             options: HashMap::new(),
         });
         expect_parse_ok(sql, expected)?;
@@ -564,7 +564,7 @@ mod tests {
                 location: "foo.csv".into(),
                 table_partition_cols: vec![],
                 if_not_exists: false,
-                file_compression_type: "".to_string(),
+                file_compression_type: UNCOMPRESSED,
                 options: HashMap::new(),
             });
             expect_parse_ok(sql, expected)?;
@@ -586,7 +586,9 @@ mod tests {
                 location: "foo.csv".into(),
                 table_partition_cols: vec![],
                 if_not_exists: false,
-                file_compression_type: file_compression_type.to_owned(),
+                file_compression_type: CompressionTypeVariant::from_str(
+                    file_compression_type,
+                )?,
                 options: HashMap::new(),
             });
             expect_parse_ok(sql, expected)?;
@@ -603,7 +605,7 @@ mod tests {
             location: "foo.parquet".into(),
             table_partition_cols: vec![],
             if_not_exists: false,
-            file_compression_type: "".to_string(),
+            file_compression_type: UNCOMPRESSED,
             options: HashMap::new(),
         });
         expect_parse_ok(sql, expected)?;
@@ -619,7 +621,7 @@ mod tests {
             location: "foo.parquet".into(),
             table_partition_cols: vec![],
             if_not_exists: false,
-            file_compression_type: "".to_string(),
+            file_compression_type: UNCOMPRESSED,
             options: HashMap::new(),
         });
         expect_parse_ok(sql, expected)?;
@@ -635,7 +637,7 @@ mod tests {
             location: "foo.avro".into(),
             table_partition_cols: vec![],
             if_not_exists: false,
-            file_compression_type: "".to_string(),
+            file_compression_type: UNCOMPRESSED,
             options: HashMap::new(),
         });
         expect_parse_ok(sql, expected)?;
@@ -652,7 +654,7 @@ mod tests {
             location: "foo.parquet".into(),
             table_partition_cols: vec![],
             if_not_exists: true,
-            file_compression_type: "".to_string(),
+            file_compression_type: UNCOMPRESSED,
             options: HashMap::new(),
         });
         expect_parse_ok(sql, expected)?;
@@ -674,7 +676,7 @@ mod tests {
             location: "blahblah".into(),
             table_partition_cols: vec![],
             if_not_exists: false,
-            file_compression_type: "".to_string(),
+            file_compression_type: UNCOMPRESSED,
             options: HashMap::from([("k1".into(), "v1".into())]),
         });
         expect_parse_ok(sql, expected)?;
@@ -691,7 +693,7 @@ mod tests {
             location: "blahblah".into(),
             table_partition_cols: vec![],
             if_not_exists: false,
-            file_compression_type: "".to_string(),
+            file_compression_type: UNCOMPRESSED,
             options: HashMap::from([
                 ("k1".into(), "v1".into()),
                 ("k2".into(), "v2".into()),
@@ -724,4 +726,13 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn invalid_compression_type() {
+        let sql = "CREATE EXTERNAL TABLE t STORED AS CSV COMPRESSION TYPE ZZZ LOCATION 'blahblah'";
+        expect_parse_error(
+            sql,
+            "sql parser error: Unsupported file compression type ZZZ",
+        )
+    }
 }
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index 7f12e9dec..da62ebf99 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -36,7 +36,7 @@ use sqlparser::ast::{ObjectType, OrderByExpr, Statement};
 use sqlparser::ast::{TimezoneInfo, WildcardAdditionalOptions};
 use sqlparser::parser::ParserError::ParserError;
 
-use datafusion_common::parsers::parse_interval;
+use datafusion_common::parsers::{parse_interval, CompressionTypeVariant};
 use datafusion_common::ToDFSchema;
 use datafusion_common::{
     field_not_found, Column, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue,
@@ -620,7 +620,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             ))?;
         }
 
-        if file_type != "CSV" && file_type != "JSON" && !file_compression_type.is_empty()
+        if file_type != "CSV"
+            && file_type != "JSON"
+            && file_compression_type != CompressionTypeVariant::UNCOMPRESSED
         {
             Err(DataFusionError::Plan(
                 "File compression type can be specified for CSV/JSON files.".into(),