You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/08/20 17:23:10 UTC

[GitHub] [arrow-datafusion] Igosuki opened a new pull request #910: Avro Table Provider

Igosuki opened a new pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910


   # Which issue does this PR close?
   
   Closes #903.
   
    # Rationale for this change
   Enables loading avro data files through datafusion. 
   
   # What changes are included in this PR?
   Avro is added as a table provider and a supported file format. 
   Avro schemas can be translated into arrow schemas.
   
   # Are there any user-facing changes?
   Yes, as one can now call register_avro on df and 'STORED AS AVRO' in sqsl.
   
   # N.B.: 
   - Need to add tests in avro_to_arrow/arrow_array_reader.rs 
   
   # Missing :
   - Writing back to avro
   
   I find there is duplication between modules with these additions, I should probably do some refactoring.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-920080604


   Thanks for merging, hope to contribute some more. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r708037099



##########
File path: datafusion/src/physical_plan/mod.rs
##########
@@ -570,6 +570,7 @@ pub trait Accumulator: Send + Sync + Debug {
 pub mod aggregates;
 pub mod analyze;
 pub mod array_expressions;
+pub mod avro;

Review comment:
       @alamb The physical plan is used everywhere in the codebase, so feature gating in the avro submod of physical plan seemed to be the best thing to do




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-909936222


   Rebased again after new conflict


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r696541461



##########
File path: datafusion/src/avro_to_arrow/reader.rs
##########
@@ -0,0 +1,289 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::datatypes::{Schema, SchemaRef};
+use crate::arrow::record_batch::RecordBatch;
+use crate::avro_to_arrow::arrow_array_reader::AvroArrowArrayReader;
+use crate::error::Result;
+use arrow::error::Result as ArrowResult;
+use avro_rs::Reader as AvroReader;
+use std::io::{Read, Seek, SeekFrom};
+use std::sync::Arc;
+
+/// Avro file reader builder
+#[derive(Debug)]
+pub struct ReaderBuilder {
+    /// Optional schema for the Avro file
+    ///
+    /// If the schema is not supplied, the reader will try to infer the schema
+    /// based on the Avro structure.
+    schema: Option<SchemaRef>,
+    /// Batch size (number of records to load each time)
+    ///
+    /// The default batch size when using the `ReaderBuilder` is 1024 records
+    batch_size: usize,
+    /// Optional projection for which columns to load (zero-based column indices)
+    projection: Option<Vec<String>>,
+}
+
+impl Default for ReaderBuilder {
+    fn default() -> Self {
+        Self {
+            schema: None,
+            batch_size: 1024,
+            projection: None,
+        }
+    }
+}
+
+impl ReaderBuilder {
+    /// Create a new builder for configuring Avro parsing options.
+    ///
+    /// To convert a builder into a reader, call `Reader::from_builder`
+    ///
+    /// # Example
+    ///
+    /// ```
+    /// extern crate avro_rs;
+    ///
+    /// use std::fs::File;
+    ///
+    /// fn example() -> crate::datafusion::avro_to_arrow::Reader<'static, File> {
+    ///     let file = File::open("test/data/basic.avro").unwrap();
+    ///
+    ///     // create a builder, inferring the schema with the first 100 records
+    ///     let builder = crate::datafusion::avro_to_arrow::ReaderBuilder::new().infer_schema().with_batch_size(100);
+    ///
+    ///     let reader = builder.build::<File>(file).unwrap();
+    ///
+    ///     reader
+    /// }
+    /// ```
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Set the Avro file's schema
+    pub fn with_schema(mut self, schema: SchemaRef) -> Self {
+        self.schema = Some(schema);
+        self
+    }
+
+    /// Set the Avro reader to infer the schema of the file
+    pub fn infer_schema(mut self) -> Self {
+        // remove any schema that is set
+        self.schema = None;
+        self
+    }
+
+    /// Set the batch size (number of records to load at one time)
+    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+        self.batch_size = batch_size;
+        self
+    }
+
+    /// Set the reader's column projection
+    pub fn with_projection(mut self, projection: Vec<String>) -> Self {
+        self.projection = Some(projection);
+        self
+    }
+
+    /// Create a new `Reader` from the `ReaderBuilder`
+    pub fn build<'a, R>(self, source: R) -> Result<Reader<'a, R>>
+    where
+        R: Read + Seek,
+    {
+        let mut source = source;
+
+        // check if schema should be inferred
+        let schema = match self.schema {
+            Some(schema) => schema,
+            None => Arc::new(infer_avro_schema_from_reader(&mut source)?),
+        };
+        source.seek(SeekFrom::Start(0))?;
+        Reader::try_new(source, schema, self.batch_size, self.projection)
+    }
+}
+
+/// Avro file record  reader
+pub struct Reader<'a, R: Read> {
+    array_reader: AvroArrowArrayReader<'a, R>,
+    schema: SchemaRef,
+    batch_size: usize,
+}
+
+impl<'a, R: Read> Reader<'a, R> {
+    /// Create a new Avro Reader from any value that implements the `Read` trait.
+    ///
+    /// If reading a `File`, you can customise the Reader, such as to enable schema
+    /// inference, use `ReaderBuilder`.
+    pub fn try_new(
+        reader: R,
+        schema: SchemaRef,
+        batch_size: usize,
+        projection: Option<Vec<String>>,
+    ) -> Result<Self> {
+        Ok(Self {
+            array_reader: AvroArrowArrayReader::try_new(
+                AvroReader::new(reader)?,
+                schema.clone(),
+                projection,
+            )?,
+            schema,
+            batch_size,
+        })
+    }
+
+    /// Returns the schema of the reader, useful for getting the schema without reading
+    /// record batches
+    pub fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Read the next batch of records
+    #[allow(clippy::should_implement_trait)]
+    pub fn next(&mut self) -> ArrowResult<Option<RecordBatch>> {
+        self.array_reader.next_batch(self.batch_size)
+    }
+}
+
+impl<'a, R: Read> Iterator for Reader<'a, R> {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.next().transpose()
+    }
+}
+
+/// Infer Avro schema given a reader
+pub fn infer_avro_schema_from_reader<R: Read + Seek>(reader: &mut R) -> Result<Schema> {
+    let avro_reader = avro_rs::Reader::new(reader)?;
+    let schema = avro_reader.writer_schema();
+    super::to_arrow_schema(schema)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::arrow::array::*;
+    use crate::arrow::datatypes::{DataType, Field};
+    use arrow::datatypes::TimeUnit;
+    use std::fs::File;
+
+    fn build_reader(name: &str) -> Reader<File> {
+        let testdata = crate::test_util::arrow_test_data();
+        let filename = format!("{}/avro/{}", testdata, name);
+        let builder = ReaderBuilder::new().infer_schema().with_batch_size(64);
+        builder.build(File::open(filename).unwrap()).unwrap()
+    }
+
+    fn get_col<'a, T: 'static>(
+        batch: &'a RecordBatch,
+        col: (usize, &Field),
+    ) -> Option<&'a T> {
+        batch.column(col.0).as_any().downcast_ref::<T>()
+    }
+
+    #[test]
+    fn test_avro_basic() {
+        let mut reader = build_reader("alltypes_dictionary.avro");
+        let batch = reader.next().unwrap().unwrap();
+
+        assert_eq!(11, batch.num_columns());
+        assert_eq!(2, batch.num_rows());
+
+        let schema = reader.schema();
+        let batch_schema = batch.schema();
+        assert_eq!(schema, batch_schema);
+
+        let id = schema.column_with_name("id").unwrap();
+        assert_eq!(0, id.0);
+        assert_eq!(&DataType::Int32, id.1.data_type());
+        let col = get_col::<Int32Array>(&batch, id).unwrap();

Review comment:
       No but you're right, with the current recordbatch API being what it is checking the output is likely better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r697963678



##########
File path: datafusion/Cargo.toml
##########
@@ -69,6 +70,8 @@ regex = { version = "^1.4.3", optional = true }
 lazy_static = { version = "^1.4.0", optional = true }
 smallvec = { version = "1.6", features = ["union"] }
 rand = "0.8"
+avro-rs = { version = "0.13", features = ["snappy"], optional = true }
+num-traits = "0.2"

Review comment:
       ~~I am not seeing `num_traits` being used in the code?~~
   
   NVM, I see it's used in the avro reader code :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-905002675


   Thanks @Igosuki  -- I didn't have a chance to review today -- I am hoping to get it tomorrow.
   
   cc @houqp


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-908379864


   rebased


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r696536060



##########
File path: datafusion/src/avro_to_arrow/reader.rs
##########
@@ -0,0 +1,289 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::datatypes::{Schema, SchemaRef};
+use crate::arrow::record_batch::RecordBatch;
+use crate::avro_to_arrow::arrow_array_reader::AvroArrowArrayReader;
+use crate::error::Result;
+use arrow::error::Result as ArrowResult;
+use avro_rs::Reader as AvroReader;
+use std::io::{Read, Seek, SeekFrom};
+use std::sync::Arc;
+
+/// Avro file reader builder
+#[derive(Debug)]
+pub struct ReaderBuilder {
+    /// Optional schema for the Avro file
+    ///
+    /// If the schema is not supplied, the reader will try to infer the schema
+    /// based on the Avro structure.
+    schema: Option<SchemaRef>,
+    /// Batch size (number of records to load each time)
+    ///
+    /// The default batch size when using the `ReaderBuilder` is 1024 records
+    batch_size: usize,
+    /// Optional projection for which columns to load (zero-based column indices)
+    projection: Option<Vec<String>>,
+}
+
+impl Default for ReaderBuilder {
+    fn default() -> Self {
+        Self {
+            schema: None,
+            batch_size: 1024,
+            projection: None,
+        }
+    }
+}
+
+impl ReaderBuilder {
+    /// Create a new builder for configuring Avro parsing options.
+    ///
+    /// To convert a builder into a reader, call `Reader::from_builder`
+    ///
+    /// # Example
+    ///
+    /// ```
+    /// extern crate avro_rs;
+    ///
+    /// use std::fs::File;
+    ///
+    /// fn example() -> crate::datafusion::avro_to_arrow::Reader<'static, File> {
+    ///     let file = File::open("test/data/basic.avro").unwrap();
+    ///
+    ///     // create a builder, inferring the schema with the first 100 records
+    ///     let builder = crate::datafusion::avro_to_arrow::ReaderBuilder::new().infer_schema().with_batch_size(100);
+    ///
+    ///     let reader = builder.build::<File>(file).unwrap();
+    ///
+    ///     reader
+    /// }
+    /// ```
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Set the Avro file's schema
+    pub fn with_schema(mut self, schema: SchemaRef) -> Self {
+        self.schema = Some(schema);
+        self
+    }
+
+    /// Set the Avro reader to infer the schema of the file

Review comment:
       It's the same docstring as in the arrow-rs crate regarding csv and json, if not provided, the schema will be taken from the records, but here every file is expected to have it schema set in the file header metadata and the module isn't (yet) able to grow a schema from streaming avro records, but it is a possibility for the future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-917616688


   
   The  test failure https://github.com/apache/arrow-datafusion/pull/910/checks?check_run_id=3579109774 is
   
   ```
   thread 'tests::run_q12' has overflowed its stack
   ```
   
   @NGA-TRAN  saw something similar on https://github.com/apache/arrow-datafusion/issues/419 🤔  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r696540252



##########
File path: datafusion/src/datasource/avro.rs
##########
@@ -0,0 +1,420 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Line-delimited Avro data source
+//!
+//! This data source allows Line-delimited Avro records or files to be used as input for queries.
+//!
+
+use std::{
+    any::Any,
+    io::{Read, Seek},
+    sync::{Arc, Mutex},
+};
+
+use arrow::datatypes::SchemaRef;
+
+use crate::physical_plan::avro::{AvroExec, AvroReadOptions};
+use crate::{
+    datasource::{Source, TableProvider},
+    error::{DataFusionError, Result},
+    physical_plan::{common, ExecutionPlan},
+};
+
+use super::datasource::Statistics;
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+
+/// Represents a  line-delimited Avro file with a provided schema
+pub struct AvroFile {
+    source: Source<Box<dyn SeekRead + Send + Sync + 'static>>,
+    schema: SchemaRef,
+    file_extension: String,
+    statistics: Statistics,
+}
+
+impl AvroFile {
+    /// Attempt to initialize a `AvroFile` from a path. The schema can be inferred automatically.
+    pub fn try_new(path: &str, options: AvroReadOptions) -> Result<Self> {
+        let schema = if let Some(schema) = options.schema {
+            schema
+        } else {
+            let filenames =
+                common::build_checked_file_list(path, options.file_extension)?;
+            Arc::new(AvroExec::try_infer_schema(&filenames)?)
+        };
+
+        Ok(Self {
+            source: Source::Path(path.to_string()),
+            schema,
+            file_extension: options.file_extension.to_string(),
+            statistics: Statistics::default(),
+        })
+    }
+
+    /// Attempt to initialize a `AvroFile` from a reader. The schema MUST be provided in options
+    pub fn try_new_from_reader<R: Read + Seek + Send + Sync + 'static>(
+        reader: R,
+        options: AvroReadOptions,
+    ) -> Result<Self> {
+        let schema = match options.schema {
+            Some(s) => s,
+            None => {
+                return Err(DataFusionError::Execution(
+                    "Schema must be provided to CsvRead".to_string(),
+                ));
+            }
+        };
+        Ok(Self {
+            source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+            schema,
+            statistics: Statistics::default(),
+            file_extension: String::new(),
+        })
+    }
+
+    /// Attempt to initialize an AvroFile from a reader impls Seek. The schema can be inferred automatically.
+    pub fn try_new_from_reader_infer_schema<R: Read + Seek + Send + Sync + 'static>(
+        mut reader: R,
+        options: AvroReadOptions,
+    ) -> Result<Self> {
+        let schema = {
+            if let Some(schema) = options.schema {
+                schema
+            } else {
+                Arc::new(crate::avro_to_arrow::infer_avro_schema_from_reader(
+                    &mut reader,
+                )?)
+            }
+        };
+
+        Ok(Self {
+            source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+            schema,
+            statistics: Statistics::default(),
+            file_extension: String::new(),
+        })
+    }
+
+    /// Get the path for Avro file(s) represented by this AvroFile instance
+    pub fn path(&self) -> &str {
+        match &self.source {
+            Source::Reader(_) => "",
+            Source::Path(path) => path,
+        }
+    }
+
+    /// Get the file extension for the Avro file(s) represented by this AvroFile instance
+    pub fn file_extension(&self) -> &str {
+        &self.file_extension
+    }
+}
+
+impl TableProvider for AvroFile {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn scan(
+        &self,
+        projection: &Option<Vec<usize>>,
+        batch_size: usize,
+        _filters: &[crate::logical_plan::Expr],
+        limit: Option<usize>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let opts = AvroReadOptions {
+            schema: Some(self.schema.clone()),
+            file_extension: self.file_extension.as_str(),
+        };
+        let batch_size = limit
+            .map(|l| std::cmp::min(l, batch_size))
+            .unwrap_or(batch_size);
+
+        let exec = match &self.source {
+            Source::Reader(maybe_reader) => {
+                if let Some(rdr) = maybe_reader.lock().unwrap().take() {
+                    AvroExec::try_new_from_reader(
+                        rdr,
+                        opts,
+                        projection.clone(),
+                        batch_size,
+                        limit,
+                    )?
+                } else {
+                    return Err(DataFusionError::Execution(
+                        "You can only read once if the data comes from a reader"
+                            .to_string(),
+                    ));
+                }
+            }
+            Source::Path(p) => {
+                AvroExec::try_from_path(p, opts, projection.clone(), batch_size, limit)?
+            }
+        };
+        Ok(Arc::new(exec))
+    }
+
+    fn statistics(&self) -> Statistics {
+        self.statistics.clone()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use arrow::array::{
+        BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
+        TimestampMicrosecondArray,
+    };
+    use arrow::record_batch::RecordBatch;
+    use futures::StreamExt;
+
+    use super::*;
+
+    #[tokio::test]
+    async fn read_small_batches() -> Result<()> {
+        let table = load_table("alltypes_plain.avro")?;
+        let projection = None;
+        let exec = table.scan(&projection, 2, &[], None)?;
+        let stream = exec.execute(0).await?;
+
+        let _ = stream
+            .map(|batch| {
+                let batch = batch.unwrap();
+                assert_eq!(11, batch.num_columns());
+                assert_eq!(2, batch.num_rows());
+            })
+            .fold(0, |acc, _| async move { acc + 1i32 })
+            .await;
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn read_alltypes_plain_avro() -> Result<()> {
+        let table = load_table("alltypes_plain.avro")?;
+
+        let x: Vec<String> = table
+            .schema()
+            .fields()
+            .iter()
+            .map(|f| format!("{}: {:?}", f.name(), f.data_type()))
+            .collect();
+        let y = x.join("\n");
+        assert_eq!(
+            "id: Int32\n\
+             bool_col: Boolean\n\
+             tinyint_col: Int32\n\
+             smallint_col: Int32\n\
+             int_col: Int32\n\
+             bigint_col: Int64\n\
+             float_col: Float32\n\
+             double_col: Float64\n\
+             date_string_col: Binary\n\
+             string_col: Binary\n\
+             timestamp_col: Timestamp(Microsecond, None)",
+            y
+        );
+
+        let projection = None;
+        let batch = get_first_batch(table, &projection).await?;
+
+        assert_eq!(11, batch.num_columns());
+        assert_eq!(8, batch.num_rows());

Review comment:
       Thanks for pointing that out




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan edited a comment on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Dandandan edited a comment on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-919390155


   It seems adding `--release` to the workspace tests is a workaround https://github.com/apache/arrow-datafusion/pull/1003/checks?check_run_id=3601789569
   
   (compile time obviously increased by quite a bit)
   
   ```
   running 44 tests
   test tests::ballista_round_trip::q1 ... ok
   test tests::ballista_round_trip::q10 ... ok
   test tests::ballista_round_trip::q13 ... ok
   test tests::ballista_round_trip::q12 ... ok
   test tests::ballista_round_trip::q5 ... ok
   test tests::ballista_round_trip::q3 ... ok
   test tests::ballista_round_trip::q6 ... ok
   test tests::ballista_round_trip::q7 ... ok
   test tests::ballista_round_trip::q8 ... ok
   test tests::q1 ... ok
   test tests::ballista_round_trip::q9 ... ok
   test tests::q11 ... ok
   test tests::q12 ... ok
   test tests::q10 ... ok
   test tests::q14 ... ok
   test tests::q13 ... ok
   test tests::q15 ... ok
   test tests::q16 ... ok
   test tests::q17 ... ok
   test tests::q18 ... ok
   test tests::q19 ... ok
   test tests::q2 ... ok
   test tests::q20 ... ok
   test tests::q21 ... ok
   test tests::q22 ... ok
   test tests::q3 ... ok
   test tests::q4 ... ok
   test tests::q5 ... ok
   test tests::q6 ... ok
   test tests::q7 ... ok
   test tests::q8 ... ok
   test tests::q9 ... ok
   test tests::run_q1 ... ok
   test tests::run_q10 ... ok
   test tests::run_q12 ... ok
   test tests::run_q13 ... ok
   test tests::run_q14 ... ok
   test tests::run_q19 ... ok
   test tests::run_q3 ... ok
   test tests::run_q6 ... ok
   test tests::run_q5 ... ok
   test tests::run_q7 ... ok
   test tests::run_q8 ... ok
   test tests::run_q9 ... ok
   
   test result: ok. 44 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.05s
   
        Running unittests (/github/home/target/release/deps/ballista-65bbceb93df8237f)
   ```
   
   However, those tests now fail somewhere else (extract), because the arrow test data is not available here:
   
   ```
   failures:
   
   ---- avro_to_arrow::arrow_array_reader::test::test_avro_iterator stdout ----
   thread 'avro_to_arrow::arrow_array_reader::test::test_avro_iterator' panicked at 'called `Result::unwrap()` on an `Err` value: Os { code: 2, kind: NotFound, message: "No such file or directory" }', datafusion/src/avro_to_arrow/arrow_array_reader.rs:984:44
   note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
   
   ---- avro_to_arrow::arrow_array_reader::test::test_avro_read_list stdout ----
   thread 'avro_to_arrow::arrow_array_reader::test::test_avro_read_list' panicked at 'called `Result::unwrap()` on an `Err` value: Os { code: 2, kind: NotFound, message: "No such file or directory" }', datafusion/src/avro_to_arrow/arrow_array_reader.rs:984:44
   ``` 
   
    


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r696552135



##########
File path: datafusion/src/avro_to_arrow/arrow_array_reader.rs
##########
@@ -0,0 +1,1093 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Avro to Arrow array readers
+
+use crate::arrow::array::{

Review comment:
       Basically at the time of inclusion in Apache, Parquet was to be the columnar storage while Avro is was the messaging format




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-904124923


   I plan to start reviewing this PR tomorrow (FYI @houqp  in case you want to check it out too). Thank you @Igosuki 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r697963678



##########
File path: datafusion/Cargo.toml
##########
@@ -69,6 +70,8 @@ regex = { version = "^1.4.3", optional = true }
 lazy_static = { version = "^1.4.0", optional = true }
 smallvec = { version = "1.6", features = ["union"] }
 rand = "0.8"
+avro-rs = { version = "0.13", features = ["snappy"], optional = true }
+num-traits = "0.2"

Review comment:
       I am not seeing `num_traits` being used in the code?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r701265046



##########
File path: datafusion/src/physical_plan/mod.rs
##########
@@ -570,6 +570,7 @@ pub trait Accumulator: Send + Sync + Debug {
 pub mod aggregates;
 pub mod analyze;
 pub mod array_expressions;
+pub mod avro;

Review comment:
       That implies adding the avro feature flag in a number of other places such as execution/context.rs, is that what you are asking for implicitly ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-919390155


   It seems adding `--release` to the workspace tests is working https://github.com/apache/arrow-datafusion/pull/1003/checks?check_run_id=3601789569
   
   (compile time obviously increased by quite a bit)
   
   ```
   running 44 tests
   test tests::ballista_round_trip::q1 ... ok
   test tests::ballista_round_trip::q10 ... ok
   test tests::ballista_round_trip::q13 ... ok
   test tests::ballista_round_trip::q12 ... ok
   test tests::ballista_round_trip::q5 ... ok
   test tests::ballista_round_trip::q3 ... ok
   test tests::ballista_round_trip::q6 ... ok
   test tests::ballista_round_trip::q7 ... ok
   test tests::ballista_round_trip::q8 ... ok
   test tests::q1 ... ok
   test tests::ballista_round_trip::q9 ... ok
   test tests::q11 ... ok
   test tests::q12 ... ok
   test tests::q10 ... ok
   test tests::q14 ... ok
   test tests::q13 ... ok
   test tests::q15 ... ok
   test tests::q16 ... ok
   test tests::q17 ... ok
   test tests::q18 ... ok
   test tests::q19 ... ok
   test tests::q2 ... ok
   test tests::q20 ... ok
   test tests::q21 ... ok
   test tests::q22 ... ok
   test tests::q3 ... ok
   test tests::q4 ... ok
   test tests::q5 ... ok
   test tests::q6 ... ok
   test tests::q7 ... ok
   test tests::q8 ... ok
   test tests::q9 ... ok
   test tests::run_q1 ... ok
   test tests::run_q10 ... ok
   test tests::run_q12 ... ok
   test tests::run_q13 ... ok
   test tests::run_q14 ... ok
   test tests::run_q19 ... ok
   test tests::run_q3 ... ok
   test tests::run_q6 ... ok
   test tests::run_q5 ... ok
   test tests::run_q7 ... ok
   test tests::run_q8 ... ok
   test tests::run_q9 ... ok
   
   test result: ok. 44 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.05s
   
        Running unittests (/github/home/target/release/deps/ballista-65bbceb93df8237f)
   ```
   
   However, those tests now fail somewhere else (extract), because the arrow test data is not available here:
   
   ```
   failures:
   
   ---- avro_to_arrow::arrow_array_reader::test::test_avro_iterator stdout ----
   thread 'avro_to_arrow::arrow_array_reader::test::test_avro_iterator' panicked at 'called `Result::unwrap()` on an `Err` value: Os { code: 2, kind: NotFound, message: "No such file or directory" }', datafusion/src/avro_to_arrow/arrow_array_reader.rs:984:44
   note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
   
   ---- avro_to_arrow::arrow_array_reader::test::test_avro_read_list stdout ----
   thread 'avro_to_arrow::arrow_array_reader::test::test_avro_read_list' panicked at 'called `Result::unwrap()` on an `Err` value: Os { code: 2, kind: NotFound, message: "No such file or directory" }', datafusion/src/avro_to_arrow/arrow_array_reader.rs:984:44
   ``` 
   
    


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-917634636


   I re-ran the tests against a version with updated dependencies for everything and that wasn't it.
   @Dandandan Tests pass when running with the same options as the default bench profile for dev and test
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki edited a comment on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki edited a comment on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-917635206


   cargo test runs benches on the test profile, running ``` cargo test --features=avro --release``` passes in the same way as my previous comment


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] nevi-me commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
nevi-me commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r706651319



##########
File path: .gitmodules
##########
@@ -3,4 +3,5 @@
 	url = https://github.com/apache/parquet-testing.git
 [submodule "testing"]
 	path = testing
-	url = https://github.com/apache/arrow-testing
+	url = https://github.com/Igosuki/arrow-testing.git

Review comment:
       This should be good to change back now that the PR is merged.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-917637560


   Cargo test uses a dev profile by default.
   
   I think the usage of the test runner + Tokio runtime + the current design of DataFusion to recurse into the execution plan and expressions increase the usage of stack space.
   Running the test without optimizations will increase the usage of stack space, as almost no optimizations are done in this case.
   
   There could be made some improvements like using a explicit stack on the heap, e.g. for the `evaluate` function, and redesigning the execution model of DataFusion to limit the use of the call stack.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-917090793


   > If no decision is made here, we could defer the clippy changes until they land on stable
   
   We have included the clippy change related to naming on master under the aegis of https://github.com/apache/arrow-datafusion/pull/986 so on rebase of this PR these changes should no longer be present
   
   @nevi-me  I think all your comments have been addressed. Are you satisfied with this PR now?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-917606658


   Rebased, submodule updated and fixed sql.rs tests for avro


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r696603576



##########
File path: datafusion/src/physical_plan/avro.rs
##########
@@ -0,0 +1,395 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for reading line-delimited Avro files
+use async_trait::async_trait;
+use futures::Stream;
+
+use super::{common, source::Source, ExecutionPlan, Partitioning, RecordBatchStream};
+use crate::avro_to_arrow::infer_avro_schema_from_reader;
+use crate::error::{DataFusionError, Result};
+use arrow::{
+    datatypes::{Schema, SchemaRef},
+    error::Result as ArrowResult,
+    record_batch::RecordBatch,
+};
+use std::fs::File;
+use std::{any::Any, io::Seek};
+use std::{
+    io::Read,
+    pin::Pin,
+    sync::{Arc, Mutex},
+    task::{Context, Poll},
+};
+
+/// Line-delimited Avro read options
+#[derive(Clone)]
+pub struct AvroReadOptions<'a> {
+    /// The data source schema.
+    pub schema: Option<SchemaRef>,
+
+    /// File extension; only files with this extension are selected for data input.
+    /// Defaults to ".avro".
+    pub file_extension: &'a str,
+}
+
+impl<'a> Default for AvroReadOptions<'a> {
+    fn default() -> Self {
+        Self {
+            schema: None,
+            file_extension: ".avro",
+        }
+    }
+}
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+/// Execution plan for scanning Avro data source
+#[derive(Debug)]
+pub struct AvroExec {
+    source: Source<Box<dyn SeekRead + Send + Sync>>,
+    schema: SchemaRef,
+    projection: Option<Vec<usize>>,
+    projected_schema: SchemaRef,
+    file_extension: String,
+    batch_size: usize,
+    limit: Option<usize>,
+}
+
+impl AvroExec {
+    /// Create a new execution plan for reading from a path
+    pub fn try_from_path(
+        path: &str,
+        options: AvroReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let file_extension = options.file_extension.to_string();
+
+        let filenames = common::build_file_list(path, &file_extension)?;
+
+        if filenames.is_empty() {
+            return Err(DataFusionError::Execution(format!(
+                "No files found at {path} with file extension {file_extension}",
+                path = path,
+                file_extension = file_extension.as_str()
+            )));
+        }
+
+        let schema = match options.schema {
+            Some(s) => s,
+            None => Arc::new(AvroExec::try_infer_schema(filenames.as_slice())?),
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::PartitionedFiles {
+                path: path.to_string(),
+                filenames,
+            },
+            schema,
+            projected_schema,
+            file_extension,
+            projection,
+            batch_size,
+            limit,
+        })
+    }
+    /// Create a new execution plan for reading from a reader
+    pub fn try_new_from_reader(
+        reader: impl Read + Seek + Send + Sync + 'static,
+        options: AvroReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let schema = match options.schema {
+            Some(s) => s,
+            None => {
+                return Err(DataFusionError::Execution(
+                    "The schema must be provided in options when reading from a reader"
+                        .to_string(),
+                ));
+            }
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+            schema,
+            file_extension: String::new(),
+            projection,
+            projected_schema,
+            batch_size,
+            limit,
+        })
+    }
+
+    /// Path to directory containing partitioned CSV files with the same schema
+    pub fn path(&self) -> &str {
+        self.source.path()
+    }
+
+    /// The individual files under path
+    pub fn filenames(&self) -> &[String] {
+        self.source.filenames()
+    }
+
+    /// File extension
+    pub fn file_extension(&self) -> &str {
+        &self.file_extension
+    }
+
+    /// Get the schema of the avro file
+    pub fn file_schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Optional projection for which columns to load
+    pub fn projection(&self) -> Option<&Vec<usize>> {
+        self.projection.as_ref()
+    }
+
+    /// Batch size
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Limit
+    pub fn limit(&self) -> Option<usize> {
+        self.limit
+    }
+
+    /// Infer schema for given Avro dataset
+    pub fn try_infer_schema(filenames: &[String]) -> Result<Schema> {
+        let mut schemas = Vec::new();
+        for filename in filenames {
+            let mut file = File::open(filename)?;
+            let schema = infer_avro_schema_from_reader(&mut file)?;
+            schemas.push(schema);
+        }
+
+        Ok(Schema::try_merge(schemas)?)
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for AvroExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.projected_schema.clone()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(match &self.source {
+            Source::PartitionedFiles { filenames, .. } => filenames.len(),
+            Source::Reader(_) => 1,
+        })
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        Vec::new()
+    }
+
+    fn with_new_children(

Review comment:
       That feature works with directories, but how do you register the same file twice since you only register the table once with a filename ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-914120564


   Fixed tests after changing things in external props handling. Was out of home for a few days just came back to this.
   The latest commit fixes external props, and physical plan tests when avro flag isn't enabled.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-909936222


   Rebased again after new conflict


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-904935851


   I've added nested field access for pg/generic dialect for things like ```select array[0][0]``` in https://github.com/Igosuki/arrow-datafusion/tree/map_access and https://github.com/Igosuki/sqlparser-rs/ based on this branch, which I'm now using in production (although I need to patch ballista to accept these).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-907770121


   (And welcome back @nevi-me ! )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] nevi-me commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
nevi-me commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r700593118



##########
File path: datafusion/src/avro_to_arrow/mod.rs
##########
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains utilities to manipulate avro metadata.
+
+#[cfg(feature = "avro")]

Review comment:
       It should be possible to move this feature flag to `datafusion/src/lib.rs` so you don't include it for each module here

##########
File path: datafusion/src/avro_to_arrow/schema.rs
##########
@@ -0,0 +1,452 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::datatypes::{DataType, IntervalUnit, Schema, TimeUnit};
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::Field;
+use avro_rs::schema::Name;
+use avro_rs::types::Value;
+use avro_rs::Schema as AvroSchema;
+use std::collections::BTreeMap;
+use std::convert::TryFrom;
+
+/// Converts an avro schema to an arrow schema
+pub fn to_arrow_schema(avro_schema: &avro_rs::Schema) -> Result<Schema> {
+    let mut schema_fields = vec![];
+    match avro_schema {
+        AvroSchema::Record { fields, .. } => {
+            for field in fields {
+                schema_fields.push(schema_to_field_with_props(
+                    &field.schema,
+                    Some(&field.name),
+                    false,
+                    Some(&external_props(&field.schema)),
+                )?)
+            }
+        }
+        schema => schema_fields.push(schema_to_field(schema, Some(""), false)?),
+    }
+
+    let schema = Schema::new(schema_fields);
+    Ok(schema)
+}
+
+fn schema_to_field(
+    schema: &avro_rs::Schema,
+    name: Option<&str>,
+    nullable: bool,
+) -> Result<Field> {
+    schema_to_field_with_props(schema, name, nullable, Some(&Default::default()))
+}
+
+fn schema_to_field_with_props(
+    schema: &AvroSchema,
+    name: Option<&str>,
+    nullable: bool,
+    props: Option<&BTreeMap<String, String>>,
+) -> Result<Field> {
+    let mut nullable = nullable;
+    let field_type: DataType = match schema {
+        AvroSchema::Null => DataType::Null,
+        AvroSchema::Boolean => DataType::Boolean,
+        AvroSchema::Int => DataType::Int32,
+        AvroSchema::Long => DataType::Int64,
+        AvroSchema::Float => DataType::Float32,
+        AvroSchema::Double => DataType::Float64,
+        AvroSchema::Bytes => DataType::Binary,
+        AvroSchema::String => DataType::Utf8,
+        AvroSchema::Array(item_schema) => DataType::List(Box::new(
+            schema_to_field_with_props(item_schema, None, false, None)?,
+        )),
+        AvroSchema::Map(value_schema) => {
+            let value_field =
+                schema_to_field_with_props(value_schema, Some("value"), false, None)?;
+            DataType::Dictionary(
+                Box::new(DataType::Utf8),
+                Box::new(value_field.data_type().clone()),
+            )
+        }
+        AvroSchema::Union(us) => {
+            // If there are only two variants and one of them is null, set the other type as the field data type
+            let has_nullable = us.find_schema(&Value::Null).is_some();
+            let sub_schemas = us.variants();
+            if has_nullable && sub_schemas.len() == 2 {
+                nullable = true;
+                if let Some(schema) = sub_schemas
+                    .iter()
+                    .find(|&schema| !matches!(schema, AvroSchema::Null))
+                {
+                    schema_to_field_with_props(schema, None, has_nullable, None)?
+                        .data_type()
+                        .clone()
+                } else {
+                    return Err(DataFusionError::AvroError(
+                        avro_rs::Error::GetUnionDuplicate,
+                    ));
+                }
+            } else {
+                let fields = sub_schemas
+                    .iter()
+                    .map(|s| schema_to_field_with_props(s, None, has_nullable, None))
+                    .collect::<Result<Vec<Field>>>()?;
+                DataType::Union(fields)
+            }
+        }
+        AvroSchema::Record { name, fields, .. } => {
+            let fields: Result<Vec<Field>> = fields
+                .iter()
+                .map(|field| {
+                    let mut props = BTreeMap::new();
+                    if let Some(doc) = &field.doc {
+                        props.insert("doc".to_string(), doc.clone());
+                    }
+                    /*if let Some(aliases) = fields.aliases {
+                        props.insert("aliases", aliases);
+                    }*/
+                    schema_to_field_with_props(
+                        &field.schema,
+                        Some(&format!("{}.{}", name.fullname(None), field.name)),
+                        false,
+                        Some(&props),
+                    )
+                })
+                .collect();
+            DataType::Struct(fields?)
+        }
+        AvroSchema::Enum { symbols, name, .. } => {

Review comment:
       This is cool 👍 

##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -17,6 +17,7 @@
 
 //! DataFusion data sources
 
+pub mod avro;

Review comment:
       Yes, then the feature flags in the `avro` module can be removed

##########
File path: datafusion/src/avro_to_arrow/schema.rs
##########
@@ -0,0 +1,452 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::datatypes::{DataType, IntervalUnit, Schema, TimeUnit};
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::Field;
+use avro_rs::schema::Name;
+use avro_rs::types::Value;
+use avro_rs::Schema as AvroSchema;
+use std::collections::BTreeMap;
+use std::convert::TryFrom;
+
+/// Converts an avro schema to an arrow schema
+pub fn to_arrow_schema(avro_schema: &avro_rs::Schema) -> Result<Schema> {
+    let mut schema_fields = vec![];
+    match avro_schema {
+        AvroSchema::Record { fields, .. } => {
+            for field in fields {
+                schema_fields.push(schema_to_field_with_props(
+                    &field.schema,
+                    Some(&field.name),
+                    false,
+                    Some(&external_props(&field.schema)),
+                )?)
+            }
+        }
+        schema => schema_fields.push(schema_to_field(schema, Some(""), false)?),
+    }
+
+    let schema = Schema::new(schema_fields);
+    Ok(schema)
+}
+
+fn schema_to_field(
+    schema: &avro_rs::Schema,
+    name: Option<&str>,
+    nullable: bool,
+) -> Result<Field> {
+    schema_to_field_with_props(schema, name, nullable, Some(&Default::default()))

Review comment:
       You could pass `None` as the props, and then only set the metadata if there are actual properties.
   
   We can adjust this when moving this module to arrow-rs though

##########
File path: datafusion/src/avro_to_arrow/schema.rs
##########
@@ -0,0 +1,452 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::datatypes::{DataType, IntervalUnit, Schema, TimeUnit};
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::Field;
+use avro_rs::schema::Name;
+use avro_rs::types::Value;
+use avro_rs::Schema as AvroSchema;
+use std::collections::BTreeMap;
+use std::convert::TryFrom;
+
+/// Converts an avro schema to an arrow schema
+pub fn to_arrow_schema(avro_schema: &avro_rs::Schema) -> Result<Schema> {
+    let mut schema_fields = vec![];
+    match avro_schema {
+        AvroSchema::Record { fields, .. } => {
+            for field in fields {
+                schema_fields.push(schema_to_field_with_props(
+                    &field.schema,
+                    Some(&field.name),
+                    false,
+                    Some(&external_props(&field.schema)),
+                )?)
+            }
+        }
+        schema => schema_fields.push(schema_to_field(schema, Some(""), false)?),
+    }
+
+    let schema = Schema::new(schema_fields);
+    Ok(schema)
+}
+
+fn schema_to_field(
+    schema: &avro_rs::Schema,
+    name: Option<&str>,
+    nullable: bool,
+) -> Result<Field> {
+    schema_to_field_with_props(schema, name, nullable, Some(&Default::default()))
+}
+
+fn schema_to_field_with_props(
+    schema: &AvroSchema,
+    name: Option<&str>,
+    nullable: bool,
+    props: Option<&BTreeMap<String, String>>,
+) -> Result<Field> {
+    let mut nullable = nullable;
+    let field_type: DataType = match schema {
+        AvroSchema::Null => DataType::Null,
+        AvroSchema::Boolean => DataType::Boolean,
+        AvroSchema::Int => DataType::Int32,
+        AvroSchema::Long => DataType::Int64,
+        AvroSchema::Float => DataType::Float32,
+        AvroSchema::Double => DataType::Float64,
+        AvroSchema::Bytes => DataType::Binary,
+        AvroSchema::String => DataType::Utf8,
+        AvroSchema::Array(item_schema) => DataType::List(Box::new(
+            schema_to_field_with_props(item_schema, None, false, None)?,
+        )),
+        AvroSchema::Map(value_schema) => {
+            let value_field =
+                schema_to_field_with_props(value_schema, Some("value"), false, None)?;
+            DataType::Dictionary(
+                Box::new(DataType::Utf8),
+                Box::new(value_field.data_type().clone()),
+            )
+        }
+        AvroSchema::Union(us) => {
+            // If there are only two variants and one of them is null, set the other type as the field data type
+            let has_nullable = us.find_schema(&Value::Null).is_some();
+            let sub_schemas = us.variants();
+            if has_nullable && sub_schemas.len() == 2 {
+                nullable = true;
+                if let Some(schema) = sub_schemas
+                    .iter()
+                    .find(|&schema| !matches!(schema, AvroSchema::Null))
+                {
+                    schema_to_field_with_props(schema, None, has_nullable, None)?
+                        .data_type()
+                        .clone()
+                } else {
+                    return Err(DataFusionError::AvroError(
+                        avro_rs::Error::GetUnionDuplicate,
+                    ));
+                }
+            } else {
+                let fields = sub_schemas
+                    .iter()
+                    .map(|s| schema_to_field_with_props(s, None, has_nullable, None))
+                    .collect::<Result<Vec<Field>>>()?;
+                DataType::Union(fields)
+            }
+        }
+        AvroSchema::Record { name, fields, .. } => {
+            let fields: Result<Vec<Field>> = fields
+                .iter()
+                .map(|field| {
+                    let mut props = BTreeMap::new();
+                    if let Some(doc) = &field.doc {
+                        props.insert("doc".to_string(), doc.clone());
+                    }
+                    /*if let Some(aliases) = fields.aliases {
+                        props.insert("aliases", aliases);
+                    }*/
+                    schema_to_field_with_props(
+                        &field.schema,
+                        Some(&format!("{}.{}", name.fullname(None), field.name)),
+                        false,
+                        Some(&props),
+                    )
+                })
+                .collect();
+            DataType::Struct(fields?)
+        }
+        AvroSchema::Enum { symbols, name, .. } => {
+            return Ok(Field::new_dict(
+                &name.fullname(None),
+                index_type(symbols.len()),
+                false,
+                0,
+                false,
+            ))
+        }
+        AvroSchema::Fixed { size, .. } => DataType::FixedSizeBinary(*size as i32),
+        AvroSchema::Decimal {
+            precision, scale, ..
+        } => DataType::Decimal(*precision, *scale),
+        AvroSchema::Uuid => DataType::Utf8,

Review comment:
       This should be a binary field, `DataType::FixedSizeLen` ideally because you otherwise lose the type information (that you're dealing with an UUID).
   
   Extension arrays will also be helpful in future for this. UUID is common enough across different data sources (SQL, Parquet) that we might want to preserve its properties.

##########
File path: datafusion/src/datasource/avro.rs
##########
@@ -0,0 +1,434 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Line-delimited Avro data source
+//!
+//! This data source allows Line-delimited Avro records or files to be used as input for queries.
+//!
+
+use std::{
+    any::Any,
+    io::{Read, Seek},
+    sync::{Arc, Mutex},
+};
+
+use arrow::datatypes::SchemaRef;
+
+use crate::physical_plan::avro::{AvroExec, AvroReadOptions};
+use crate::{
+    datasource::{Source, TableProvider},
+    error::{DataFusionError, Result},
+    physical_plan::{common, ExecutionPlan},
+};
+
+use super::datasource::Statistics;
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+
+/// Represents a  line-delimited Avro file with a provided schema

Review comment:
       nit: is it line-delmited? I'd expect that from text sources, not binary ones

##########
File path: datafusion/src/avro_to_arrow/mod.rs
##########
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains utilities to manipulate avro metadata.
+
+#[cfg(feature = "avro")]
+mod arrow_array_reader;
+#[cfg(feature = "avro")]
+mod reader;
+#[cfg(feature = "avro")]
+mod schema;
+
+use crate::arrow::datatypes::Schema;
+use crate::error::Result;
+#[cfg(feature = "avro")]
+pub use reader::{Reader, ReaderBuilder};
+use std::io::{Read, Seek};
+
+#[cfg(feature = "avro")]
+/// Infer Avro schema given a reader
+pub fn infer_avro_schema_from_reader<R: Read + Seek>(reader: &mut R) -> Result<Schema> {

Review comment:
       nit: is it inference, or reading the schema from the file/input?

##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -78,7 +78,7 @@ impl NthValue {
     }
 
     /// Create a new NTH_VALUE window aggregate function
-    pub fn nth_value(
+    pub fn value(

Review comment:
       If no decision is made here, we could defer the clippy changes until they land on stable

##########
File path: datafusion/src/physical_plan/datetime_expressions.rs
##########
@@ -236,14 +236,8 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
     let f = |x: Option<i64>| x.map(|x| date_trunc_single(granularity, x)).transpose();
 
     Ok(match array {
-        ColumnarValue::Scalar(scalar) => {

Review comment:
       is this change related to the avro addition?

##########
File path: datafusion/src/avro_to_arrow/schema.rs
##########
@@ -0,0 +1,452 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::datatypes::{DataType, IntervalUnit, Schema, TimeUnit};
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::Field;
+use avro_rs::schema::Name;
+use avro_rs::types::Value;
+use avro_rs::Schema as AvroSchema;
+use std::collections::BTreeMap;
+use std::convert::TryFrom;
+
+/// Converts an avro schema to an arrow schema
+pub fn to_arrow_schema(avro_schema: &avro_rs::Schema) -> Result<Schema> {
+    let mut schema_fields = vec![];
+    match avro_schema {
+        AvroSchema::Record { fields, .. } => {
+            for field in fields {
+                schema_fields.push(schema_to_field_with_props(
+                    &field.schema,
+                    Some(&field.name),
+                    false,
+                    Some(&external_props(&field.schema)),
+                )?)
+            }
+        }
+        schema => schema_fields.push(schema_to_field(schema, Some(""), false)?),
+    }
+
+    let schema = Schema::new(schema_fields);
+    Ok(schema)
+}
+
+fn schema_to_field(
+    schema: &avro_rs::Schema,
+    name: Option<&str>,
+    nullable: bool,
+) -> Result<Field> {
+    schema_to_field_with_props(schema, name, nullable, Some(&Default::default()))
+}
+
+fn schema_to_field_with_props(
+    schema: &AvroSchema,
+    name: Option<&str>,
+    nullable: bool,
+    props: Option<&BTreeMap<String, String>>,
+) -> Result<Field> {
+    let mut nullable = nullable;
+    let field_type: DataType = match schema {
+        AvroSchema::Null => DataType::Null,
+        AvroSchema::Boolean => DataType::Boolean,
+        AvroSchema::Int => DataType::Int32,
+        AvroSchema::Long => DataType::Int64,
+        AvroSchema::Float => DataType::Float32,
+        AvroSchema::Double => DataType::Float64,
+        AvroSchema::Bytes => DataType::Binary,
+        AvroSchema::String => DataType::Utf8,
+        AvroSchema::Array(item_schema) => DataType::List(Box::new(
+            schema_to_field_with_props(item_schema, None, false, None)?,
+        )),
+        AvroSchema::Map(value_schema) => {
+            let value_field =
+                schema_to_field_with_props(value_schema, Some("value"), false, None)?;
+            DataType::Dictionary(
+                Box::new(DataType::Utf8),
+                Box::new(value_field.data_type().clone()),
+            )
+        }
+        AvroSchema::Union(us) => {
+            // If there are only two variants and one of them is null, set the other type as the field data type
+            let has_nullable = us.find_schema(&Value::Null).is_some();
+            let sub_schemas = us.variants();
+            if has_nullable && sub_schemas.len() == 2 {
+                nullable = true;
+                if let Some(schema) = sub_schemas
+                    .iter()
+                    .find(|&schema| !matches!(schema, AvroSchema::Null))
+                {
+                    schema_to_field_with_props(schema, None, has_nullable, None)?
+                        .data_type()
+                        .clone()
+                } else {
+                    return Err(DataFusionError::AvroError(
+                        avro_rs::Error::GetUnionDuplicate,
+                    ));
+                }
+            } else {
+                let fields = sub_schemas
+                    .iter()
+                    .map(|s| schema_to_field_with_props(s, None, has_nullable, None))
+                    .collect::<Result<Vec<Field>>>()?;
+                DataType::Union(fields)
+            }
+        }
+        AvroSchema::Record { name, fields, .. } => {
+            let fields: Result<Vec<Field>> = fields
+                .iter()
+                .map(|field| {
+                    let mut props = BTreeMap::new();
+                    if let Some(doc) = &field.doc {
+                        props.insert("doc".to_string(), doc.clone());
+                    }
+                    /*if let Some(aliases) = fields.aliases {
+                        props.insert("aliases", aliases);
+                    }*/
+                    schema_to_field_with_props(
+                        &field.schema,
+                        Some(&format!("{}.{}", name.fullname(None), field.name)),
+                        false,
+                        Some(&props),
+                    )
+                })
+                .collect();
+            DataType::Struct(fields?)
+        }
+        AvroSchema::Enum { symbols, name, .. } => {
+            return Ok(Field::new_dict(
+                &name.fullname(None),
+                index_type(symbols.len()),
+                false,
+                0,
+                false,
+            ))
+        }
+        AvroSchema::Fixed { size, .. } => DataType::FixedSizeBinary(*size as i32),
+        AvroSchema::Decimal {
+            precision, scale, ..
+        } => DataType::Decimal(*precision, *scale),
+        AvroSchema::Uuid => DataType::Utf8,
+        AvroSchema::Date => DataType::Date32,
+        AvroSchema::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
+        AvroSchema::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
+        AvroSchema::TimestampMillis => DataType::Timestamp(TimeUnit::Millisecond, None),
+        AvroSchema::TimestampMicros => DataType::Timestamp(TimeUnit::Microsecond, None),
+        AvroSchema::Duration => DataType::Duration(TimeUnit::Millisecond),
+    };
+
+    let data_type = field_type.clone();
+    let name = name.unwrap_or_else(|| default_field_name(&data_type));
+
+    let mut field = Field::new(name, field_type, nullable);
+    field.set_metadata(props.cloned());
+    Ok(field)
+}
+
+fn default_field_name(dt: &DataType) -> &str {
+    match dt {
+        DataType::Null => "null",
+        DataType::Boolean => "bit",
+        DataType::Int8 => "tinyint",
+        DataType::Int16 => "smallint",
+        DataType::Int32 => "int",
+        DataType::Int64 => "bigint",
+        DataType::UInt8 => "uint1",
+        DataType::UInt16 => "uint2",
+        DataType::UInt32 => "uint4",
+        DataType::UInt64 => "uint8",
+        DataType::Float16 => "float2",
+        DataType::Float32 => "float4",
+        DataType::Float64 => "float8",
+        DataType::Date32 => "dateday",
+        DataType::Date64 => "datemilli",
+        DataType::Time32(tu) | DataType::Time64(tu) => match tu {
+            TimeUnit::Second => "timesec",
+            TimeUnit::Millisecond => "timemilli",
+            TimeUnit::Microsecond => "timemicro",
+            TimeUnit::Nanosecond => "timenano",
+        },
+        DataType::Timestamp(tu, tz) => {
+            if tz.is_some() {
+                match tu {
+                    TimeUnit::Second => "timestampsectz",
+                    TimeUnit::Millisecond => "timestampmillitz",
+                    TimeUnit::Microsecond => "timestampmicrotz",
+                    TimeUnit::Nanosecond => "timestampnanotz",
+                }
+            } else {
+                match tu {
+                    TimeUnit::Second => "timestampsec",
+                    TimeUnit::Millisecond => "timestampmilli",
+                    TimeUnit::Microsecond => "timestampmicro",
+                    TimeUnit::Nanosecond => "timestampnano",
+                }
+            }
+        }
+        DataType::Duration(_) => "duration",
+        DataType::Interval(unit) => match unit {
+            IntervalUnit::YearMonth => "intervalyear",
+            IntervalUnit::DayTime => "intervalmonth",
+        },
+        DataType::Binary => "varbinary",
+        DataType::FixedSizeBinary(_) => "fixedsizebinary",
+        DataType::LargeBinary => "largevarbinary",
+        DataType::Utf8 => "varchar",
+        DataType::LargeUtf8 => "largevarchar",
+        DataType::List(_) => "list",
+        DataType::FixedSizeList(_, _) => "fixed_size_list",
+        DataType::LargeList(_) => "largelist",
+        DataType::Struct(_) => "struct",
+        DataType::Union(_) => "union",
+        DataType::Dictionary(_, _) => "map",
+        DataType::Decimal(_, _) => "decimal",
+    }
+}
+
+fn index_type(len: usize) -> DataType {
+    if len <= usize::from(u8::MAX) {
+        DataType::Int8
+    } else if len <= usize::from(u16::MAX) {
+        DataType::Int16
+    } else if usize::try_from(u32::MAX).map(|i| len < i).unwrap_or(false) {
+        DataType::Int32
+    } else {
+        DataType::Int64
+    }
+}
+
+fn external_props(schema: &AvroSchema) -> BTreeMap<String, String> {
+    let mut props = BTreeMap::new();
+    match &schema {
+        AvroSchema::Record {
+            doc: Some(ref doc), ..
+        }
+        | AvroSchema::Enum {
+            doc: Some(ref doc), ..
+        } => {
+            props.insert("doc".to_string(), doc.clone());

Review comment:
       I haven't read through the Avro spec yet, so this might be a silly question.
   
   Is this embedding the avro schema in the Arrow schema? If so, 👍 as this would in future become useful if we pass a record batch to an Avro writer, that would use this schema to preserve some metadata about the originally read Avro source.
   
   If the above is true, a future enhancement could be using a well-known prefix, like `AVRO::doc`

##########
File path: datafusion/src/physical_plan/mod.rs
##########
@@ -570,6 +570,7 @@ pub trait Accumulator: Send + Sync + Debug {
 pub mod aggregates;
 pub mod analyze;
 pub mod array_expressions;
+pub mod avro;

Review comment:
       This should also be feature-flagged so that the flags inside its file can be removed. There's no use compiling the avro module if the flag is disabled.

##########
File path: datafusion/src/avro_to_arrow/schema.rs
##########
@@ -0,0 +1,452 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::datatypes::{DataType, IntervalUnit, Schema, TimeUnit};
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::Field;
+use avro_rs::schema::Name;
+use avro_rs::types::Value;
+use avro_rs::Schema as AvroSchema;
+use std::collections::BTreeMap;
+use std::convert::TryFrom;
+
+/// Converts an avro schema to an arrow schema
+pub fn to_arrow_schema(avro_schema: &avro_rs::Schema) -> Result<Schema> {
+    let mut schema_fields = vec![];
+    match avro_schema {
+        AvroSchema::Record { fields, .. } => {
+            for field in fields {
+                schema_fields.push(schema_to_field_with_props(
+                    &field.schema,
+                    Some(&field.name),
+                    false,
+                    Some(&external_props(&field.schema)),
+                )?)
+            }
+        }
+        schema => schema_fields.push(schema_to_field(schema, Some(""), false)?),
+    }
+
+    let schema = Schema::new(schema_fields);
+    Ok(schema)
+}
+
+fn schema_to_field(
+    schema: &avro_rs::Schema,
+    name: Option<&str>,
+    nullable: bool,
+) -> Result<Field> {
+    schema_to_field_with_props(schema, name, nullable, Some(&Default::default()))
+}
+
+fn schema_to_field_with_props(
+    schema: &AvroSchema,
+    name: Option<&str>,
+    nullable: bool,
+    props: Option<&BTreeMap<String, String>>,
+) -> Result<Field> {
+    let mut nullable = nullable;
+    let field_type: DataType = match schema {
+        AvroSchema::Null => DataType::Null,
+        AvroSchema::Boolean => DataType::Boolean,
+        AvroSchema::Int => DataType::Int32,
+        AvroSchema::Long => DataType::Int64,
+        AvroSchema::Float => DataType::Float32,
+        AvroSchema::Double => DataType::Float64,
+        AvroSchema::Bytes => DataType::Binary,
+        AvroSchema::String => DataType::Utf8,
+        AvroSchema::Array(item_schema) => DataType::List(Box::new(
+            schema_to_field_with_props(item_schema, None, false, None)?,
+        )),
+        AvroSchema::Map(value_schema) => {
+            let value_field =
+                schema_to_field_with_props(value_schema, Some("value"), false, None)?;
+            DataType::Dictionary(
+                Box::new(DataType::Utf8),
+                Box::new(value_field.data_type().clone()),
+            )
+        }
+        AvroSchema::Union(us) => {
+            // If there are only two variants and one of them is null, set the other type as the field data type
+            let has_nullable = us.find_schema(&Value::Null).is_some();
+            let sub_schemas = us.variants();
+            if has_nullable && sub_schemas.len() == 2 {
+                nullable = true;
+                if let Some(schema) = sub_schemas
+                    .iter()
+                    .find(|&schema| !matches!(schema, AvroSchema::Null))
+                {
+                    schema_to_field_with_props(schema, None, has_nullable, None)?
+                        .data_type()
+                        .clone()
+                } else {
+                    return Err(DataFusionError::AvroError(
+                        avro_rs::Error::GetUnionDuplicate,
+                    ));
+                }
+            } else {
+                let fields = sub_schemas
+                    .iter()
+                    .map(|s| schema_to_field_with_props(s, None, has_nullable, None))
+                    .collect::<Result<Vec<Field>>>()?;
+                DataType::Union(fields)
+            }
+        }
+        AvroSchema::Record { name, fields, .. } => {
+            let fields: Result<Vec<Field>> = fields
+                .iter()
+                .map(|field| {
+                    let mut props = BTreeMap::new();
+                    if let Some(doc) = &field.doc {
+                        props.insert("doc".to_string(), doc.clone());
+                    }
+                    /*if let Some(aliases) = fields.aliases {
+                        props.insert("aliases", aliases);
+                    }*/
+                    schema_to_field_with_props(
+                        &field.schema,
+                        Some(&format!("{}.{}", name.fullname(None), field.name)),
+                        false,
+                        Some(&props),
+                    )
+                })
+                .collect();
+            DataType::Struct(fields?)
+        }
+        AvroSchema::Enum { symbols, name, .. } => {
+            return Ok(Field::new_dict(
+                &name.fullname(None),
+                index_type(symbols.len()),
+                false,
+                0,
+                false,
+            ))
+        }
+        AvroSchema::Fixed { size, .. } => DataType::FixedSizeBinary(*size as i32),
+        AvroSchema::Decimal {
+            precision, scale, ..
+        } => DataType::Decimal(*precision, *scale),
+        AvroSchema::Uuid => DataType::Utf8,
+        AvroSchema::Date => DataType::Date32,
+        AvroSchema::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
+        AvroSchema::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
+        AvroSchema::TimestampMillis => DataType::Timestamp(TimeUnit::Millisecond, None),
+        AvroSchema::TimestampMicros => DataType::Timestamp(TimeUnit::Microsecond, None),
+        AvroSchema::Duration => DataType::Duration(TimeUnit::Millisecond),
+    };
+
+    let data_type = field_type.clone();
+    let name = name.unwrap_or_else(|| default_field_name(&data_type));
+
+    let mut field = Field::new(name, field_type, nullable);
+    field.set_metadata(props.cloned());
+    Ok(field)
+}
+
+fn default_field_name(dt: &DataType) -> &str {
+    match dt {
+        DataType::Null => "null",
+        DataType::Boolean => "bit",
+        DataType::Int8 => "tinyint",
+        DataType::Int16 => "smallint",
+        DataType::Int32 => "int",
+        DataType::Int64 => "bigint",
+        DataType::UInt8 => "uint1",
+        DataType::UInt16 => "uint2",
+        DataType::UInt32 => "uint4",
+        DataType::UInt64 => "uint8",
+        DataType::Float16 => "float2",
+        DataType::Float32 => "float4",
+        DataType::Float64 => "float8",
+        DataType::Date32 => "dateday",
+        DataType::Date64 => "datemilli",
+        DataType::Time32(tu) | DataType::Time64(tu) => match tu {
+            TimeUnit::Second => "timesec",
+            TimeUnit::Millisecond => "timemilli",
+            TimeUnit::Microsecond => "timemicro",
+            TimeUnit::Nanosecond => "timenano",
+        },
+        DataType::Timestamp(tu, tz) => {
+            if tz.is_some() {
+                match tu {
+                    TimeUnit::Second => "timestampsectz",
+                    TimeUnit::Millisecond => "timestampmillitz",
+                    TimeUnit::Microsecond => "timestampmicrotz",
+                    TimeUnit::Nanosecond => "timestampnanotz",
+                }
+            } else {
+                match tu {
+                    TimeUnit::Second => "timestampsec",
+                    TimeUnit::Millisecond => "timestampmilli",
+                    TimeUnit::Microsecond => "timestampmicro",
+                    TimeUnit::Nanosecond => "timestampnano",
+                }
+            }
+        }
+        DataType::Duration(_) => "duration",
+        DataType::Interval(unit) => match unit {
+            IntervalUnit::YearMonth => "intervalyear",
+            IntervalUnit::DayTime => "intervalmonth",
+        },
+        DataType::Binary => "varbinary",
+        DataType::FixedSizeBinary(_) => "fixedsizebinary",
+        DataType::LargeBinary => "largevarbinary",
+        DataType::Utf8 => "varchar",
+        DataType::LargeUtf8 => "largevarchar",
+        DataType::List(_) => "list",
+        DataType::FixedSizeList(_, _) => "fixed_size_list",
+        DataType::LargeList(_) => "largelist",
+        DataType::Struct(_) => "struct",
+        DataType::Union(_) => "union",
+        DataType::Dictionary(_, _) => "map",
+        DataType::Decimal(_, _) => "decimal",
+    }
+}
+
+fn index_type(len: usize) -> DataType {
+    if len <= usize::from(u8::MAX) {
+        DataType::Int8
+    } else if len <= usize::from(u16::MAX) {
+        DataType::Int16
+    } else if usize::try_from(u32::MAX).map(|i| len < i).unwrap_or(false) {
+        DataType::Int32
+    } else {
+        DataType::Int64
+    }
+}
+
+fn external_props(schema: &AvroSchema) -> BTreeMap<String, String> {
+    let mut props = BTreeMap::new();
+    match &schema {
+        AvroSchema::Record {
+            doc: Some(ref doc), ..
+        }
+        | AvroSchema::Enum {
+            doc: Some(ref doc), ..
+        } => {
+            props.insert("doc".to_string(), doc.clone());
+        }
+        _ => {}
+    }
+    match &schema {
+        AvroSchema::Record {
+            name:
+                Name {
+                    aliases: Some(aliases),
+                    namespace,
+                    ..
+                },
+            ..
+        }
+        | AvroSchema::Enum {
+            name:
+                Name {
+                    aliases: Some(aliases),
+                    namespace,
+                    ..
+                },
+            ..
+        }
+        | AvroSchema::Fixed {
+            name:
+                Name {
+                    aliases: Some(aliases),
+                    namespace,
+                    ..
+                },
+            ..
+        } => {
+            let aliases: Vec<String> = aliases
+                .iter()
+                .map(|alias| aliased(alias, namespace.as_deref(), None))
+                .collect();
+            props.insert("aliases".to_string(), format!("[{}]", aliases.join(",")));
+        }
+        _ => {}
+    }
+    props
+}
+
+#[allow(dead_code)]
+fn get_metadata(

Review comment:
       This doesn't look like it's used anywhere. Is this intentional?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r701258430



##########
File path: datafusion/src/avro_to_arrow/mod.rs
##########
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains utilities to manipulate avro metadata.
+
+#[cfg(feature = "avro")]
+mod arrow_array_reader;
+#[cfg(feature = "avro")]
+mod reader;
+#[cfg(feature = "avro")]
+mod schema;
+
+use crate::arrow::datatypes::Schema;
+use crate::error::Result;
+#[cfg(feature = "avro")]
+pub use reader::{Reader, ReaderBuilder};
+use std::io::{Read, Seek};
+
+#[cfg(feature = "avro")]
+/// Infer Avro schema given a reader
+pub fn infer_avro_schema_from_reader<R: Read + Seek>(reader: &mut R) -> Result<Schema> {

Review comment:
       https://github.com/Igosuki/arrow-datafusion/commit/e05ccfcd6783b6e524f9635160767eafd77ca95f




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r696528746



##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -78,7 +78,7 @@ impl NthValue {
     }
 
     /// Create a new NTH_VALUE window aggregate function
-    pub fn nth_value(
+    pub fn value(

Review comment:
       It breaks on the latest nightly with new clippy rules, specifically the one where they don't want a method to have the same name as the Struct or Trait, I tried to exclude the clippy rule but it doesn't work on stable because it doesn't exist, and there are no conditional attributes for nightly/stable




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-906425173


   I think I handled all your comments, let me know if there's more I can do


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r695977278



##########
File path: .gitmodules
##########
@@ -3,4 +3,5 @@
 	url = https://github.com/apache/parquet-testing.git
 [submodule "testing"]
 	path = testing
-	url = https://github.com/apache/arrow-testing
+	url = https://github.com/Igosuki/arrow-testing.git

Review comment:
       prior to merging this PR we should request/merge the changes into apache/arrow-testing I think

##########
File path: datafusion-examples/examples/avro_sql.rs
##########
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one

Review comment:
       👍 
   
   ```shell
   Opening avro: /Users/alamb/Software/arrow-datafusion/datafusion/../testing/data/avro/alltypes_plain.avro
   +---------+------------+-------------------------------+
   | int_col | double_col | CAST(date_string_col AS Utf8) |
   +---------+------------+-------------------------------+
   | 1       | 10.1       | 03/01/09                      |
   | 1       | 10.1       | 04/01/09                      |
   | 1       | 10.1       | 02/01/09                      |
   +---------+------------+-------------------------------+
   ```

##########
File path: datafusion/src/logical_plan/expr.rs
##########
@@ -244,6 +245,13 @@ pub enum Expr {
     IsNull(Box<Expr>),
     /// arithmetic negation of an expression, the operand must be of a signed numeric data type
     Negative(Box<Expr>),
+    /// Returns the field of a [`StructArray`] by name
+    GetField {

Review comment:
       This appears to be some part of https://github.com/apache/arrow-datafusion/pull/628 -- I think it would be cleaner if we revived that PR to get the GetField functionality separately rather than including it in a single large PR

##########
File path: datafusion/src/sql/parser.rs
##########
@@ -390,10 +393,21 @@ mod tests {
         });
         expect_parse_ok(sql, expected)?;
 
+        // positive case: it is ok for parquet files not to have columns specified

Review comment:
       ```suggestion
           // positive case: it is ok for avro files not to have columns specified
   ```

##########
File path: ballista/rust/core/src/serde/logical_plan/to_proto.rs
##########
@@ -1403,6 +1424,9 @@ impl TryInto<protobuf::ScalarFunction> for &BuiltinScalarFunction {
             BuiltinScalarFunction::SHA256 => Ok(protobuf::ScalarFunction::Sha256),
             BuiltinScalarFunction::SHA384 => Ok(protobuf::ScalarFunction::Sha384),
             BuiltinScalarFunction::SHA512 => Ok(protobuf::ScalarFunction::Sha512),
+            BuiltinScalarFunction::ToTimestampMillis => {

Review comment:
       the addition of `ToTimestampMillis` seems unrelated to this PR, but is a good addition

##########
File path: datafusion/src/avro_to_arrow/reader.rs
##########
@@ -0,0 +1,289 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::datatypes::{Schema, SchemaRef};
+use crate::arrow::record_batch::RecordBatch;
+use crate::avro_to_arrow::arrow_array_reader::AvroArrowArrayReader;
+use crate::error::Result;
+use arrow::error::Result as ArrowResult;
+use avro_rs::Reader as AvroReader;
+use std::io::{Read, Seek, SeekFrom};
+use std::sync::Arc;
+
+/// Avro file reader builder
+#[derive(Debug)]
+pub struct ReaderBuilder {
+    /// Optional schema for the Avro file
+    ///
+    /// If the schema is not supplied, the reader will try to infer the schema
+    /// based on the Avro structure.
+    schema: Option<SchemaRef>,
+    /// Batch size (number of records to load each time)
+    ///
+    /// The default batch size when using the `ReaderBuilder` is 1024 records
+    batch_size: usize,
+    /// Optional projection for which columns to load (zero-based column indices)
+    projection: Option<Vec<String>>,
+}
+
+impl Default for ReaderBuilder {
+    fn default() -> Self {
+        Self {
+            schema: None,
+            batch_size: 1024,
+            projection: None,
+        }
+    }
+}
+
+impl ReaderBuilder {
+    /// Create a new builder for configuring Avro parsing options.
+    ///
+    /// To convert a builder into a reader, call `Reader::from_builder`
+    ///
+    /// # Example
+    ///
+    /// ```
+    /// extern crate avro_rs;
+    ///
+    /// use std::fs::File;
+    ///
+    /// fn example() -> crate::datafusion::avro_to_arrow::Reader<'static, File> {
+    ///     let file = File::open("test/data/basic.avro").unwrap();
+    ///
+    ///     // create a builder, inferring the schema with the first 100 records
+    ///     let builder = crate::datafusion::avro_to_arrow::ReaderBuilder::new().infer_schema().with_batch_size(100);
+    ///
+    ///     let reader = builder.build::<File>(file).unwrap();
+    ///
+    ///     reader
+    /// }
+    /// ```
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Set the Avro file's schema
+    pub fn with_schema(mut self, schema: SchemaRef) -> Self {
+        self.schema = Some(schema);
+        self
+    }
+
+    /// Set the Avro reader to infer the schema of the file

Review comment:
       this doc comments seems to be the opposite of what the function does -- the function seems to set the schema and turn off schema inference

##########
File path: datafusion/src/avro_to_arrow/arrow_array_reader.rs
##########
@@ -0,0 +1,1093 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Avro to Arrow array readers
+
+use crate::arrow::array::{

Review comment:
       I can't say I am too familiar with the avro data model and thus I don't really feel qualified to review the logic in this file. 
   
   However, the code  seems easy to read and is  reasonably well tested so I think it would be fine to merge it in as is and then adjust / improve as this module got more use

##########
File path: datafusion/src/datasource/avro.rs
##########
@@ -0,0 +1,420 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Line-delimited Avro data source
+//!
+//! This data source allows Line-delimited Avro records or files to be used as input for queries.
+//!
+
+use std::{
+    any::Any,
+    io::{Read, Seek},
+    sync::{Arc, Mutex},
+};
+
+use arrow::datatypes::SchemaRef;
+
+use crate::physical_plan::avro::{AvroExec, AvroReadOptions};
+use crate::{
+    datasource::{Source, TableProvider},
+    error::{DataFusionError, Result},
+    physical_plan::{common, ExecutionPlan},
+};
+
+use super::datasource::Statistics;
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+
+/// Represents a  line-delimited Avro file with a provided schema
+pub struct AvroFile {
+    source: Source<Box<dyn SeekRead + Send + Sync + 'static>>,
+    schema: SchemaRef,
+    file_extension: String,
+    statistics: Statistics,
+}
+
+impl AvroFile {
+    /// Attempt to initialize a `AvroFile` from a path. The schema can be inferred automatically.
+    pub fn try_new(path: &str, options: AvroReadOptions) -> Result<Self> {
+        let schema = if let Some(schema) = options.schema {
+            schema
+        } else {
+            let filenames =
+                common::build_checked_file_list(path, options.file_extension)?;
+            Arc::new(AvroExec::try_infer_schema(&filenames)?)
+        };
+
+        Ok(Self {
+            source: Source::Path(path.to_string()),
+            schema,
+            file_extension: options.file_extension.to_string(),
+            statistics: Statistics::default(),
+        })
+    }
+
+    /// Attempt to initialize a `AvroFile` from a reader. The schema MUST be provided in options
+    pub fn try_new_from_reader<R: Read + Seek + Send + Sync + 'static>(
+        reader: R,
+        options: AvroReadOptions,
+    ) -> Result<Self> {
+        let schema = match options.schema {
+            Some(s) => s,
+            None => {
+                return Err(DataFusionError::Execution(
+                    "Schema must be provided to CsvRead".to_string(),
+                ));
+            }
+        };
+        Ok(Self {
+            source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+            schema,
+            statistics: Statistics::default(),
+            file_extension: String::new(),
+        })
+    }
+
+    /// Attempt to initialize an AvroFile from a reader impls Seek. The schema can be inferred automatically.
+    pub fn try_new_from_reader_infer_schema<R: Read + Seek + Send + Sync + 'static>(
+        mut reader: R,
+        options: AvroReadOptions,
+    ) -> Result<Self> {
+        let schema = {
+            if let Some(schema) = options.schema {
+                schema
+            } else {
+                Arc::new(crate::avro_to_arrow::infer_avro_schema_from_reader(
+                    &mut reader,
+                )?)
+            }
+        };
+
+        Ok(Self {
+            source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+            schema,
+            statistics: Statistics::default(),
+            file_extension: String::new(),
+        })
+    }
+
+    /// Get the path for Avro file(s) represented by this AvroFile instance
+    pub fn path(&self) -> &str {
+        match &self.source {
+            Source::Reader(_) => "",
+            Source::Path(path) => path,
+        }
+    }
+
+    /// Get the file extension for the Avro file(s) represented by this AvroFile instance
+    pub fn file_extension(&self) -> &str {
+        &self.file_extension
+    }
+}
+
+impl TableProvider for AvroFile {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn scan(
+        &self,
+        projection: &Option<Vec<usize>>,
+        batch_size: usize,
+        _filters: &[crate::logical_plan::Expr],
+        limit: Option<usize>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let opts = AvroReadOptions {
+            schema: Some(self.schema.clone()),
+            file_extension: self.file_extension.as_str(),
+        };
+        let batch_size = limit
+            .map(|l| std::cmp::min(l, batch_size))
+            .unwrap_or(batch_size);
+
+        let exec = match &self.source {
+            Source::Reader(maybe_reader) => {
+                if let Some(rdr) = maybe_reader.lock().unwrap().take() {
+                    AvroExec::try_new_from_reader(
+                        rdr,
+                        opts,
+                        projection.clone(),
+                        batch_size,
+                        limit,
+                    )?
+                } else {
+                    return Err(DataFusionError::Execution(
+                        "You can only read once if the data comes from a reader"
+                            .to_string(),
+                    ));
+                }
+            }
+            Source::Path(p) => {
+                AvroExec::try_from_path(p, opts, projection.clone(), batch_size, limit)?
+            }
+        };
+        Ok(Arc::new(exec))
+    }
+
+    fn statistics(&self) -> Statistics {
+        self.statistics.clone()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use arrow::array::{
+        BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
+        TimestampMicrosecondArray,
+    };
+    use arrow::record_batch::RecordBatch;
+    use futures::StreamExt;
+
+    use super::*;
+
+    #[tokio::test]
+    async fn read_small_batches() -> Result<()> {
+        let table = load_table("alltypes_plain.avro")?;
+        let projection = None;
+        let exec = table.scan(&projection, 2, &[], None)?;
+        let stream = exec.execute(0).await?;
+
+        let _ = stream
+            .map(|batch| {
+                let batch = batch.unwrap();
+                assert_eq!(11, batch.num_columns());
+                assert_eq!(2, batch.num_rows());
+            })
+            .fold(0, |acc, _| async move { acc + 1i32 })
+            .await;
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn read_alltypes_plain_avro() -> Result<()> {
+        let table = load_table("alltypes_plain.avro")?;
+
+        let x: Vec<String> = table
+            .schema()
+            .fields()
+            .iter()
+            .map(|f| format!("{}: {:?}", f.name(), f.data_type()))
+            .collect();
+        let y = x.join("\n");
+        assert_eq!(
+            "id: Int32\n\
+             bool_col: Boolean\n\
+             tinyint_col: Int32\n\
+             smallint_col: Int32\n\
+             int_col: Int32\n\
+             bigint_col: Int64\n\
+             float_col: Float32\n\
+             double_col: Float64\n\
+             date_string_col: Binary\n\
+             string_col: Binary\n\
+             timestamp_col: Timestamp(Microsecond, None)",
+            y
+        );
+
+        let projection = None;
+        let batch = get_first_batch(table, &projection).await?;
+
+        assert_eq!(11, batch.num_columns());
+        assert_eq!(8, batch.num_rows());

Review comment:
       It is a matter of preference, but I personally prefer using the `assert_batches!` macro for this kind of test (it is easy to update by copy/pasting test output). Here is an example:
   
   https://github.com/apache/arrow-datafusion/blob/60fa315dd82b040045d1d0797639bd0c17dd40ae/datafusion/tests/sql.rs#L163-L180
   

##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -78,7 +78,7 @@ impl NthValue {
     }
 
     /// Create a new NTH_VALUE window aggregate function
-    pub fn nth_value(
+    pub fn value(

Review comment:
       why this change? It seems unrelated to the rest of the PR (as well as not an obvious improvement to me)

##########
File path: datafusion/src/logical_plan/dfschema.rs
##########
@@ -140,6 +141,7 @@ impl DFSchema {
                 return Ok(i);
             }
         }
+        println!("{}", name);

Review comment:
       perhaps a stray debugging leftover

##########
File path: datafusion/src/physical_plan/avro.rs
##########
@@ -0,0 +1,395 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for reading line-delimited Avro files
+use async_trait::async_trait;
+use futures::Stream;
+
+use super::{common, source::Source, ExecutionPlan, Partitioning, RecordBatchStream};
+use crate::avro_to_arrow::infer_avro_schema_from_reader;
+use crate::error::{DataFusionError, Result};
+use arrow::{
+    datatypes::{Schema, SchemaRef},
+    error::Result as ArrowResult,
+    record_batch::RecordBatch,
+};
+use std::fs::File;
+use std::{any::Any, io::Seek};
+use std::{
+    io::Read,
+    pin::Pin,
+    sync::{Arc, Mutex},
+    task::{Context, Poll},
+};
+
+/// Line-delimited Avro read options
+#[derive(Clone)]
+pub struct AvroReadOptions<'a> {
+    /// The data source schema.
+    pub schema: Option<SchemaRef>,
+
+    /// File extension; only files with this extension are selected for data input.
+    /// Defaults to ".avro".
+    pub file_extension: &'a str,
+}
+
+impl<'a> Default for AvroReadOptions<'a> {
+    fn default() -> Self {
+        Self {
+            schema: None,
+            file_extension: ".avro",
+        }
+    }
+}
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+/// Execution plan for scanning Avro data source
+#[derive(Debug)]
+pub struct AvroExec {
+    source: Source<Box<dyn SeekRead + Send + Sync>>,
+    schema: SchemaRef,
+    projection: Option<Vec<usize>>,
+    projected_schema: SchemaRef,
+    file_extension: String,
+    batch_size: usize,
+    limit: Option<usize>,
+}
+
+impl AvroExec {
+    /// Create a new execution plan for reading from a path
+    pub fn try_from_path(
+        path: &str,
+        options: AvroReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let file_extension = options.file_extension.to_string();
+
+        let filenames = common::build_file_list(path, &file_extension)?;
+
+        if filenames.is_empty() {
+            return Err(DataFusionError::Execution(format!(
+                "No files found at {path} with file extension {file_extension}",
+                path = path,
+                file_extension = file_extension.as_str()
+            )));
+        }
+
+        let schema = match options.schema {
+            Some(s) => s,
+            None => Arc::new(AvroExec::try_infer_schema(filenames.as_slice())?),
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::PartitionedFiles {
+                path: path.to_string(),
+                filenames,
+            },
+            schema,
+            projected_schema,
+            file_extension,
+            projection,
+            batch_size,
+            limit,
+        })
+    }
+    /// Create a new execution plan for reading from a reader
+    pub fn try_new_from_reader(
+        reader: impl Read + Seek + Send + Sync + 'static,
+        options: AvroReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let schema = match options.schema {
+            Some(s) => s,
+            None => {
+                return Err(DataFusionError::Execution(
+                    "The schema must be provided in options when reading from a reader"
+                        .to_string(),
+                ));
+            }
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+            schema,
+            file_extension: String::new(),
+            projection,
+            projected_schema,
+            batch_size,
+            limit,
+        })
+    }
+
+    /// Path to directory containing partitioned CSV files with the same schema
+    pub fn path(&self) -> &str {
+        self.source.path()
+    }
+
+    /// The individual files under path
+    pub fn filenames(&self) -> &[String] {
+        self.source.filenames()
+    }
+
+    /// File extension
+    pub fn file_extension(&self) -> &str {
+        &self.file_extension
+    }
+
+    /// Get the schema of the avro file
+    pub fn file_schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Optional projection for which columns to load
+    pub fn projection(&self) -> Option<&Vec<usize>> {
+        self.projection.as_ref()
+    }
+
+    /// Batch size
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Limit
+    pub fn limit(&self) -> Option<usize> {
+        self.limit
+    }
+
+    /// Infer schema for given Avro dataset
+    pub fn try_infer_schema(filenames: &[String]) -> Result<Schema> {
+        let mut schemas = Vec::new();
+        for filename in filenames {
+            let mut file = File::open(filename)?;
+            let schema = infer_avro_schema_from_reader(&mut file)?;
+            schemas.push(schema);
+        }
+
+        Ok(Schema::try_merge(schemas)?)
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for AvroExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.projected_schema.clone()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(match &self.source {
+            Source::PartitionedFiles { filenames, .. } => filenames.len(),
+            Source::Reader(_) => 1,
+        })
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        Vec::new()
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if !children.is_empty() {
+            Err(DataFusionError::Internal(format!(
+                "Children cannot be replaced in {:?}",
+                self
+            )))
+        } else if let Source::PartitionedFiles { filenames, path } = &self.source {
+            Ok(Arc::new(Self {
+                source: Source::PartitionedFiles {
+                    filenames: filenames.clone(),
+                    path: path.clone(),
+                },
+                schema: self.schema.clone(),
+                projection: self.projection.clone(),
+                projected_schema: self.projected_schema.clone(),
+                batch_size: self.batch_size,
+                limit: self.limit,
+                file_extension: self.file_extension.clone(),
+            }))
+        } else {
+            Err(DataFusionError::Internal(
+                "AvroExec with reader source cannot be used with `with_new_children`"
+                    .to_string(),
+            ))
+        }
+    }
+
+    async fn execute(
+        &self,
+        partition: usize,
+    ) -> Result<super::SendableRecordBatchStream> {
+        let mut builder = crate::avro_to_arrow::ReaderBuilder::new()
+            .with_schema(self.schema.clone())
+            .with_batch_size(self.batch_size);
+        if let Some(proj) = &self.projection {
+            builder = builder.with_projection(
+                proj.iter()
+                    .map(|col_idx| self.schema.field(*col_idx).name())
+                    .cloned()
+                    .collect(),
+            );
+        }
+        match &self.source {
+            Source::PartitionedFiles { filenames, .. } => {
+                let file = File::open(&filenames[partition])?;
+
+                Ok(Box::pin(AvroStream::new(builder.build(file)?, self.limit)))
+            }
+            Source::Reader(rdr) => {
+                if partition != 0 {
+                    Err(DataFusionError::Internal(
+                        "Only partition 0 is valid when CSV comes from a reader"

Review comment:
       ```suggestion
                           "Only partition 0 is valid when AVRO comes from a reader"
   ```

##########
File path: datafusion/src/avro_to_arrow/reader.rs
##########
@@ -0,0 +1,289 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::datatypes::{Schema, SchemaRef};
+use crate::arrow::record_batch::RecordBatch;
+use crate::avro_to_arrow::arrow_array_reader::AvroArrowArrayReader;
+use crate::error::Result;
+use arrow::error::Result as ArrowResult;
+use avro_rs::Reader as AvroReader;
+use std::io::{Read, Seek, SeekFrom};
+use std::sync::Arc;
+
+/// Avro file reader builder
+#[derive(Debug)]
+pub struct ReaderBuilder {
+    /// Optional schema for the Avro file
+    ///
+    /// If the schema is not supplied, the reader will try to infer the schema
+    /// based on the Avro structure.
+    schema: Option<SchemaRef>,
+    /// Batch size (number of records to load each time)
+    ///
+    /// The default batch size when using the `ReaderBuilder` is 1024 records
+    batch_size: usize,
+    /// Optional projection for which columns to load (zero-based column indices)
+    projection: Option<Vec<String>>,
+}
+
+impl Default for ReaderBuilder {
+    fn default() -> Self {
+        Self {
+            schema: None,
+            batch_size: 1024,
+            projection: None,
+        }
+    }
+}
+
+impl ReaderBuilder {
+    /// Create a new builder for configuring Avro parsing options.
+    ///
+    /// To convert a builder into a reader, call `Reader::from_builder`
+    ///
+    /// # Example
+    ///
+    /// ```
+    /// extern crate avro_rs;
+    ///
+    /// use std::fs::File;
+    ///
+    /// fn example() -> crate::datafusion::avro_to_arrow::Reader<'static, File> {
+    ///     let file = File::open("test/data/basic.avro").unwrap();
+    ///
+    ///     // create a builder, inferring the schema with the first 100 records
+    ///     let builder = crate::datafusion::avro_to_arrow::ReaderBuilder::new().infer_schema().with_batch_size(100);
+    ///
+    ///     let reader = builder.build::<File>(file).unwrap();
+    ///
+    ///     reader
+    /// }
+    /// ```
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Set the Avro file's schema
+    pub fn with_schema(mut self, schema: SchemaRef) -> Self {
+        self.schema = Some(schema);
+        self
+    }
+
+    /// Set the Avro reader to infer the schema of the file
+    pub fn infer_schema(mut self) -> Self {
+        // remove any schema that is set
+        self.schema = None;
+        self
+    }
+
+    /// Set the batch size (number of records to load at one time)
+    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+        self.batch_size = batch_size;
+        self
+    }
+
+    /// Set the reader's column projection
+    pub fn with_projection(mut self, projection: Vec<String>) -> Self {
+        self.projection = Some(projection);
+        self
+    }
+
+    /// Create a new `Reader` from the `ReaderBuilder`
+    pub fn build<'a, R>(self, source: R) -> Result<Reader<'a, R>>
+    where
+        R: Read + Seek,
+    {
+        let mut source = source;
+
+        // check if schema should be inferred
+        let schema = match self.schema {
+            Some(schema) => schema,
+            None => Arc::new(infer_avro_schema_from_reader(&mut source)?),
+        };
+        source.seek(SeekFrom::Start(0))?;
+        Reader::try_new(source, schema, self.batch_size, self.projection)
+    }
+}
+
+/// Avro file record  reader
+pub struct Reader<'a, R: Read> {
+    array_reader: AvroArrowArrayReader<'a, R>,
+    schema: SchemaRef,
+    batch_size: usize,
+}
+
+impl<'a, R: Read> Reader<'a, R> {
+    /// Create a new Avro Reader from any value that implements the `Read` trait.
+    ///
+    /// If reading a `File`, you can customise the Reader, such as to enable schema
+    /// inference, use `ReaderBuilder`.
+    pub fn try_new(
+        reader: R,
+        schema: SchemaRef,
+        batch_size: usize,
+        projection: Option<Vec<String>>,
+    ) -> Result<Self> {
+        Ok(Self {
+            array_reader: AvroArrowArrayReader::try_new(
+                AvroReader::new(reader)?,
+                schema.clone(),
+                projection,
+            )?,
+            schema,
+            batch_size,
+        })
+    }
+
+    /// Returns the schema of the reader, useful for getting the schema without reading
+    /// record batches
+    pub fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Read the next batch of records
+    #[allow(clippy::should_implement_trait)]
+    pub fn next(&mut self) -> ArrowResult<Option<RecordBatch>> {
+        self.array_reader.next_batch(self.batch_size)
+    }
+}
+
+impl<'a, R: Read> Iterator for Reader<'a, R> {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.next().transpose()
+    }
+}
+
+/// Infer Avro schema given a reader
+pub fn infer_avro_schema_from_reader<R: Read + Seek>(reader: &mut R) -> Result<Schema> {
+    let avro_reader = avro_rs::Reader::new(reader)?;
+    let schema = avro_reader.writer_schema();
+    super::to_arrow_schema(schema)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::arrow::array::*;
+    use crate::arrow::datatypes::{DataType, Field};
+    use arrow::datatypes::TimeUnit;
+    use std::fs::File;
+
+    fn build_reader(name: &str) -> Reader<File> {
+        let testdata = crate::test_util::arrow_test_data();
+        let filename = format!("{}/avro/{}", testdata, name);
+        let builder = ReaderBuilder::new().infer_schema().with_batch_size(64);
+        builder.build(File::open(filename).unwrap()).unwrap()
+    }
+
+    fn get_col<'a, T: 'static>(
+        batch: &'a RecordBatch,
+        col: (usize, &Field),
+    ) -> Option<&'a T> {
+        batch.column(col.0).as_any().downcast_ref::<T>()
+    }
+
+    #[test]
+    fn test_avro_basic() {
+        let mut reader = build_reader("alltypes_dictionary.avro");
+        let batch = reader.next().unwrap().unwrap();
+
+        assert_eq!(11, batch.num_columns());
+        assert_eq!(2, batch.num_rows());
+
+        let schema = reader.schema();
+        let batch_schema = batch.schema();
+        assert_eq!(schema, batch_schema);
+
+        let id = schema.column_with_name("id").unwrap();
+        assert_eq!(0, id.0);
+        assert_eq!(&DataType::Int32, id.1.data_type());
+        let col = get_col::<Int32Array>(&batch, id).unwrap();

Review comment:
       I personally the `assert_batches_eq!` approach for comparing the output of record batches, but this works too

##########
File path: datafusion/src/avro_to_arrow/arrow_array_reader.rs
##########
@@ -0,0 +1,1093 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Avro to Arrow array readers
+
+use crate::arrow::array::{
+    make_array, Array, ArrayBuilder, ArrayData, ArrayDataBuilder, ArrayRef,
+    BooleanBuilder, LargeStringArray, ListBuilder, NullArray, OffsetSizeTrait,
+    PrimitiveArray, PrimitiveBuilder, StringArray, StringBuilder,
+    StringDictionaryBuilder,
+};
+use crate::arrow::buffer::{Buffer, MutableBuffer};
+use crate::arrow::datatypes::{
+    ArrowDictionaryKeyType, ArrowNumericType, ArrowPrimitiveType, DataType, Date32Type,
+    Date64Type, Field, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type,
+    Int8Type, Schema, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
+    Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType,
+    TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, UInt64Type,
+    UInt8Type,
+};
+use crate::arrow::error::ArrowError;
+use crate::arrow::record_batch::RecordBatch;
+use crate::arrow::util::bit_util;
+use crate::error::{DataFusionError, Result};
+use arrow::array::{BinaryArray, GenericListArray};
+use arrow::datatypes::SchemaRef;
+use arrow::error::ArrowError::SchemaError;
+use arrow::error::Result as ArrowResult;
+use avro_rs::schema::Schema as AvroSchema;
+use avro_rs::schema::SchemaKind;
+use avro_rs::types::Value;
+use avro_rs::{AvroResult, Reader as AvroReader};
+use num_traits::NumCast;
+use std::collections::HashMap;
+use std::io::Read;
+use std::sync::Arc;
+
+type RecordSlice<'a> = &'a [Vec<(String, Value)>];
+
+pub struct AvroArrowArrayReader<'a, R: Read> {
+    reader: AvroReader<'a, R>,
+    schema: SchemaRef,
+    projection: Option<Vec<String>>,
+    schema_lookup: HashMap<String, usize>,
+}
+
+impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
+    pub fn try_new(
+        reader: AvroReader<'a, R>,
+        schema: SchemaRef,
+        projection: Option<Vec<String>>,
+    ) -> Result<Self> {
+        let writer_schema = reader.writer_schema().clone();
+        let schema_lookup = Self::schema_lookup(writer_schema)?;
+        Ok(Self {
+            reader,
+            schema,
+            projection,
+            schema_lookup,
+        })
+    }
+
+    pub fn schema_lookup(schema: AvroSchema) -> Result<HashMap<String, usize>> {
+        match schema {
+            AvroSchema::Record {
+                lookup: ref schema_lookup,
+                ..
+            } => Ok(schema_lookup.clone()),
+            _ => Err(DataFusionError::ArrowError(SchemaError(
+                "expected avro schema to be a record".to_string(),
+            ))),
+        }
+    }
+
+    /// Read the next batch of records
+    #[allow(clippy::should_implement_trait)]
+    pub fn next_batch(&mut self, batch_size: usize) -> ArrowResult<Option<RecordBatch>> {
+        let mut rows = Vec::with_capacity(batch_size);
+        for value in self.reader.by_ref().take(batch_size) {
+            let v = value.map_err(|e| {
+                ArrowError::ParseError(format!("Failed to parse avro value: {:?}", e))
+            })?;
+            match v {
+                Value::Record(v) => {
+                    rows.push(v);
+                }
+                other => {
+                    return Err(ArrowError::ParseError(format!(
+                        "Row needs to be of type object, got: {:?}",
+                        other
+                    )))
+                }
+            }
+        }
+        if rows.is_empty() {
+            // reached end of file
+            return Ok(None);
+        }
+        let rows = &rows[..];
+        let projection = self.projection.clone().unwrap_or_else(Vec::new);
+        let arrays = self.build_struct_array(rows, self.schema.fields(), &projection);
+        let projected_fields: Vec<Field> = if projection.is_empty() {
+            self.schema.fields().to_vec()
+        } else {
+            projection
+                .iter()
+                .map(|name| self.schema.column_with_name(name))
+                .flatten()
+                .map(|(_, field)| field.clone())
+                .collect()
+        };
+        let projected_schema = Arc::new(Schema::new(projected_fields));
+        arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr).map(Some))
+    }
+
+    fn build_boolean_array(
+        &self,
+        rows: RecordSlice,
+        col_name: &str,
+    ) -> ArrowResult<ArrayRef> {
+        let mut builder = BooleanBuilder::new(rows.len());
+        for row in rows {
+            if let Some(value) = self.field_lookup(col_name, row) {
+                if let Some(boolean) = resolve_boolean(&value) {
+                    builder.append_value(boolean)?
+                } else {
+                    builder.append_null()?;
+                }
+            } else {
+                builder.append_null()?;
+            }
+        }
+        Ok(Arc::new(builder.finish()))
+    }
+
+    #[allow(clippy::unnecessary_wraps)]
+    fn build_primitive_array<T: ArrowPrimitiveType + Resolver>(
+        &self,
+        rows: RecordSlice,
+        col_name: &str,
+    ) -> ArrowResult<ArrayRef>
+    where
+        T: ArrowNumericType,
+        T::Native: num_traits::cast::NumCast,
+    {
+        Ok(Arc::new(
+            rows.iter()
+                .map(|row| {
+                    self.field_lookup(col_name, row)
+                        .and_then(|value| resolve_item::<T>(&value))
+                })
+                .collect::<PrimitiveArray<T>>(),
+        ))
+    }
+
+    #[inline(always)]
+    #[allow(clippy::unnecessary_wraps)]
+    fn build_string_dictionary_builder<T>(
+        &self,
+        row_len: usize,
+    ) -> ArrowResult<StringDictionaryBuilder<T>>
+    where
+        T: ArrowPrimitiveType + ArrowDictionaryKeyType,
+    {
+        let key_builder = PrimitiveBuilder::<T>::new(row_len);
+        let values_builder = StringBuilder::new(row_len * 5);
+        Ok(StringDictionaryBuilder::new(key_builder, values_builder))
+    }
+
+    fn build_wrapped_list_array(
+        &self,
+        rows: RecordSlice,
+        col_name: &str,
+        key_type: &DataType,
+    ) -> ArrowResult<ArrayRef> {
+        match *key_type {
+            DataType::Int8 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::Int8),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<Int8Type>(&dtype, col_name, rows)
+            }
+            DataType::Int16 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::Int16),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<Int16Type>(&dtype, col_name, rows)
+            }
+            DataType::Int32 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::Int32),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<Int32Type>(&dtype, col_name, rows)
+            }
+            DataType::Int64 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::Int64),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<Int64Type>(&dtype, col_name, rows)
+            }
+            DataType::UInt8 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::UInt8),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<UInt8Type>(&dtype, col_name, rows)
+            }
+            DataType::UInt16 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::UInt16),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<UInt16Type>(&dtype, col_name, rows)
+            }
+            DataType::UInt32 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::UInt32),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<UInt32Type>(&dtype, col_name, rows)
+            }
+            DataType::UInt64 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::UInt64),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<UInt64Type>(&dtype, col_name, rows)
+            }
+            ref e => Err(SchemaError(format!(
+                "Data type is currently not supported for dictionaries in list : {:?}",
+                e
+            ))),
+        }
+    }
+
+    #[inline(always)]
+    fn list_array_string_array_builder<D>(
+        &self,
+        data_type: &DataType,
+        col_name: &str,
+        rows: RecordSlice,
+    ) -> ArrowResult<ArrayRef>
+    where
+        D: ArrowPrimitiveType + ArrowDictionaryKeyType,
+    {
+        let mut builder: Box<dyn ArrayBuilder> = match data_type {
+            DataType::Utf8 => {
+                let values_builder = StringBuilder::new(rows.len() * 5);
+                Box::new(ListBuilder::new(values_builder))
+            }
+            DataType::Dictionary(_, _) => {
+                let values_builder =
+                    self.build_string_dictionary_builder::<D>(rows.len() * 5)?;
+                Box::new(ListBuilder::new(values_builder))
+            }
+            e => {
+                return Err(SchemaError(format!(
+                    "Nested list data builder type is not supported: {:?}",
+                    e
+                )))
+            }
+        };
+
+        for row in rows {
+            if let Some(value) = self.field_lookup(col_name, row) {
+                // value can be an array or a scalar
+                let vals: Vec<Option<String>> = if let Value::String(v) = value {
+                    vec![Some(v.to_string())]
+                } else if let Value::Array(n) = value {
+                    n.into_iter()
+                        .map(|v| {
+                            resolve_string(&v)
+                            // else if matches!(

Review comment:
       perhaps this should return an error?

##########
File path: datafusion/Cargo.toml
##########
@@ -69,6 +69,8 @@ regex = { version = "^1.4.3", optional = true }
 lazy_static = { version = "^1.4.0", optional = true }
 smallvec = { version = "1.6", features = ["union"] }
 rand = "0.8"
+avro-rs = { version = "0.13", features = ["snappy"] }

Review comment:
       As the datafusion codebase already has a non trivial number of depenencies I would personally prefer we did not add additional required ones.
   
   What would you think about making
   
   ```suggestion
   avro-rs = { version = "0.13", features = ["snappy"], optional=true }
   ```
   
   And then document it as a crate `feature` -- e.g. like `regex_expressions` (but not a default feature)?
   
   I think most of the rest of the code in this PR could be left the same and only the part that interacts with `avro-rs` could be `#[cfg(...)]` out

##########
File path: datafusion/src/physical_plan/avro.rs
##########
@@ -0,0 +1,395 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for reading line-delimited Avro files
+use async_trait::async_trait;
+use futures::Stream;
+
+use super::{common, source::Source, ExecutionPlan, Partitioning, RecordBatchStream};
+use crate::avro_to_arrow::infer_avro_schema_from_reader;
+use crate::error::{DataFusionError, Result};
+use arrow::{
+    datatypes::{Schema, SchemaRef},
+    error::Result as ArrowResult,
+    record_batch::RecordBatch,
+};
+use std::fs::File;
+use std::{any::Any, io::Seek};
+use std::{
+    io::Read,
+    pin::Pin,
+    sync::{Arc, Mutex},
+    task::{Context, Poll},
+};
+
+/// Line-delimited Avro read options
+#[derive(Clone)]
+pub struct AvroReadOptions<'a> {
+    /// The data source schema.
+    pub schema: Option<SchemaRef>,
+
+    /// File extension; only files with this extension are selected for data input.
+    /// Defaults to ".avro".
+    pub file_extension: &'a str,
+}
+
+impl<'a> Default for AvroReadOptions<'a> {
+    fn default() -> Self {
+        Self {
+            schema: None,
+            file_extension: ".avro",
+        }
+    }
+}
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+/// Execution plan for scanning Avro data source
+#[derive(Debug)]
+pub struct AvroExec {
+    source: Source<Box<dyn SeekRead + Send + Sync>>,
+    schema: SchemaRef,
+    projection: Option<Vec<usize>>,
+    projected_schema: SchemaRef,
+    file_extension: String,
+    batch_size: usize,
+    limit: Option<usize>,
+}
+
+impl AvroExec {
+    /// Create a new execution plan for reading from a path
+    pub fn try_from_path(
+        path: &str,
+        options: AvroReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let file_extension = options.file_extension.to_string();
+
+        let filenames = common::build_file_list(path, &file_extension)?;
+
+        if filenames.is_empty() {
+            return Err(DataFusionError::Execution(format!(
+                "No files found at {path} with file extension {file_extension}",
+                path = path,
+                file_extension = file_extension.as_str()
+            )));
+        }
+
+        let schema = match options.schema {
+            Some(s) => s,
+            None => Arc::new(AvroExec::try_infer_schema(filenames.as_slice())?),
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::PartitionedFiles {
+                path: path.to_string(),
+                filenames,
+            },
+            schema,
+            projected_schema,
+            file_extension,
+            projection,
+            batch_size,
+            limit,
+        })
+    }
+    /// Create a new execution plan for reading from a reader
+    pub fn try_new_from_reader(
+        reader: impl Read + Seek + Send + Sync + 'static,
+        options: AvroReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let schema = match options.schema {
+            Some(s) => s,
+            None => {
+                return Err(DataFusionError::Execution(
+                    "The schema must be provided in options when reading from a reader"
+                        .to_string(),
+                ));
+            }
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+            schema,
+            file_extension: String::new(),
+            projection,
+            projected_schema,
+            batch_size,
+            limit,
+        })
+    }
+
+    /// Path to directory containing partitioned CSV files with the same schema
+    pub fn path(&self) -> &str {
+        self.source.path()
+    }
+
+    /// The individual files under path
+    pub fn filenames(&self) -> &[String] {
+        self.source.filenames()
+    }
+
+    /// File extension
+    pub fn file_extension(&self) -> &str {
+        &self.file_extension
+    }
+
+    /// Get the schema of the avro file
+    pub fn file_schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Optional projection for which columns to load
+    pub fn projection(&self) -> Option<&Vec<usize>> {
+        self.projection.as_ref()
+    }
+
+    /// Batch size
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Limit
+    pub fn limit(&self) -> Option<usize> {
+        self.limit
+    }
+
+    /// Infer schema for given Avro dataset
+    pub fn try_infer_schema(filenames: &[String]) -> Result<Schema> {
+        let mut schemas = Vec::new();
+        for filename in filenames {
+            let mut file = File::open(filename)?;
+            let schema = infer_avro_schema_from_reader(&mut file)?;
+            schemas.push(schema);
+        }
+
+        Ok(Schema::try_merge(schemas)?)
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for AvroExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.projected_schema.clone()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(match &self.source {
+            Source::PartitionedFiles { filenames, .. } => filenames.len(),
+            Source::Reader(_) => 1,
+        })
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        Vec::new()
+    }
+
+    fn with_new_children(

Review comment:
       This code seems to support creating a table backed by multiple avro files (as is supported by the csv and parquet readers), but I don't see a test for that functionality anywhere.
   
   Maybe you could have a test in sql.rs that refered to the same test files twice and ensured we got back two copies of the data

##########
File path: datafusion/src/physical_plan/avro.rs
##########
@@ -0,0 +1,395 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for reading line-delimited Avro files
+use async_trait::async_trait;
+use futures::Stream;
+
+use super::{common, source::Source, ExecutionPlan, Partitioning, RecordBatchStream};
+use crate::avro_to_arrow::infer_avro_schema_from_reader;
+use crate::error::{DataFusionError, Result};
+use arrow::{
+    datatypes::{Schema, SchemaRef},
+    error::Result as ArrowResult,
+    record_batch::RecordBatch,
+};
+use std::fs::File;
+use std::{any::Any, io::Seek};
+use std::{
+    io::Read,
+    pin::Pin,
+    sync::{Arc, Mutex},
+    task::{Context, Poll},
+};
+
+/// Line-delimited Avro read options
+#[derive(Clone)]
+pub struct AvroReadOptions<'a> {
+    /// The data source schema.
+    pub schema: Option<SchemaRef>,
+
+    /// File extension; only files with this extension are selected for data input.
+    /// Defaults to ".avro".
+    pub file_extension: &'a str,
+}
+
+impl<'a> Default for AvroReadOptions<'a> {
+    fn default() -> Self {
+        Self {
+            schema: None,
+            file_extension: ".avro",
+        }
+    }
+}
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+/// Execution plan for scanning Avro data source
+#[derive(Debug)]
+pub struct AvroExec {
+    source: Source<Box<dyn SeekRead + Send + Sync>>,
+    schema: SchemaRef,
+    projection: Option<Vec<usize>>,
+    projected_schema: SchemaRef,
+    file_extension: String,
+    batch_size: usize,
+    limit: Option<usize>,
+}
+
+impl AvroExec {
+    /// Create a new execution plan for reading from a path
+    pub fn try_from_path(
+        path: &str,
+        options: AvroReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let file_extension = options.file_extension.to_string();
+
+        let filenames = common::build_file_list(path, &file_extension)?;
+
+        if filenames.is_empty() {
+            return Err(DataFusionError::Execution(format!(
+                "No files found at {path} with file extension {file_extension}",
+                path = path,
+                file_extension = file_extension.as_str()
+            )));
+        }
+
+        let schema = match options.schema {
+            Some(s) => s,
+            None => Arc::new(AvroExec::try_infer_schema(filenames.as_slice())?),
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::PartitionedFiles {
+                path: path.to_string(),
+                filenames,
+            },
+            schema,
+            projected_schema,
+            file_extension,
+            projection,
+            batch_size,
+            limit,
+        })
+    }
+    /// Create a new execution plan for reading from a reader
+    pub fn try_new_from_reader(
+        reader: impl Read + Seek + Send + Sync + 'static,
+        options: AvroReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let schema = match options.schema {
+            Some(s) => s,
+            None => {
+                return Err(DataFusionError::Execution(
+                    "The schema must be provided in options when reading from a reader"
+                        .to_string(),
+                ));
+            }
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+            schema,
+            file_extension: String::new(),
+            projection,
+            projected_schema,
+            batch_size,
+            limit,
+        })
+    }
+
+    /// Path to directory containing partitioned CSV files with the same schema
+    pub fn path(&self) -> &str {
+        self.source.path()
+    }
+
+    /// The individual files under path
+    pub fn filenames(&self) -> &[String] {
+        self.source.filenames()
+    }
+
+    /// File extension
+    pub fn file_extension(&self) -> &str {
+        &self.file_extension
+    }
+
+    /// Get the schema of the avro file
+    pub fn file_schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Optional projection for which columns to load
+    pub fn projection(&self) -> Option<&Vec<usize>> {
+        self.projection.as_ref()
+    }
+
+    /// Batch size
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Limit
+    pub fn limit(&self) -> Option<usize> {
+        self.limit
+    }
+
+    /// Infer schema for given Avro dataset
+    pub fn try_infer_schema(filenames: &[String]) -> Result<Schema> {
+        let mut schemas = Vec::new();
+        for filename in filenames {
+            let mut file = File::open(filename)?;
+            let schema = infer_avro_schema_from_reader(&mut file)?;
+            schemas.push(schema);
+        }
+
+        Ok(Schema::try_merge(schemas)?)
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for AvroExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.projected_schema.clone()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(match &self.source {
+            Source::PartitionedFiles { filenames, .. } => filenames.len(),
+            Source::Reader(_) => 1,
+        })
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        Vec::new()
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if !children.is_empty() {
+            Err(DataFusionError::Internal(format!(
+                "Children cannot be replaced in {:?}",
+                self
+            )))
+        } else if let Source::PartitionedFiles { filenames, path } = &self.source {
+            Ok(Arc::new(Self {
+                source: Source::PartitionedFiles {
+                    filenames: filenames.clone(),
+                    path: path.clone(),
+                },
+                schema: self.schema.clone(),
+                projection: self.projection.clone(),
+                projected_schema: self.projected_schema.clone(),
+                batch_size: self.batch_size,
+                limit: self.limit,
+                file_extension: self.file_extension.clone(),
+            }))
+        } else {
+            Err(DataFusionError::Internal(
+                "AvroExec with reader source cannot be used with `with_new_children`"
+                    .to_string(),
+            ))
+        }
+    }
+
+    async fn execute(
+        &self,
+        partition: usize,
+    ) -> Result<super::SendableRecordBatchStream> {
+        let mut builder = crate::avro_to_arrow::ReaderBuilder::new()
+            .with_schema(self.schema.clone())
+            .with_batch_size(self.batch_size);
+        if let Some(proj) = &self.projection {
+            builder = builder.with_projection(
+                proj.iter()
+                    .map(|col_idx| self.schema.field(*col_idx).name())
+                    .cloned()
+                    .collect(),
+            );
+        }
+        match &self.source {
+            Source::PartitionedFiles { filenames, .. } => {
+                let file = File::open(&filenames[partition])?;
+
+                Ok(Box::pin(AvroStream::new(builder.build(file)?, self.limit)))
+            }
+            Source::Reader(rdr) => {
+                if partition != 0 {
+                    Err(DataFusionError::Internal(
+                        "Only partition 0 is valid when CSV comes from a reader"
+                            .to_string(),
+                    ))
+                } else if let Some(rdr) = rdr.lock().unwrap().take() {
+                    Ok(Box::pin(AvroStream::new(builder.build(rdr)?, self.limit)))
+                } else {
+                    Err(DataFusionError::Execution(
+                        "Error reading CSV: Data can only be read a single time when the source is a reader"

Review comment:
       ```suggestion
                           "Error reading AVRO: Data can only be read a single time when the source is a reader"
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-917612781


   Test failure seems unrelated to PR, not sure what to do here


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-907770080


   > I have a question on the avro support, don't know if it's been asked/addresseed yet. Would it be better to move the reader to arrow-rs, so that a writer could also be implemented there?
   
   @nevi-me  I think having a reader in arrow-rs is definitely something to consider (I can definitely see other projects wanting to read from avro files to Arrow arrays). Given this PR is already fairly large I suggest we get it in as is and if there is interest / appetite to moving the avro reader to arrow-rs we do it as a follow on PR -- I filed https://github.com/apache/arrow-rs/issues/727 to track this idea


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan edited a comment on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Dandandan edited a comment on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-917637560


   Cargo test uses a test profile by default, which uses debug mode.
   
   I think the usage of the test runner + Tokio runtime + the current design of DataFusion to recurse into the execution plan and expressions increase the usage of stack space.
   Running the test without optimizations will increase the usage of stack space, as almost no optimizations are done in this case.
   
   There could be made some improvements like using a explicit stack on the heap, e.g. for the `evaluate` function, and redesigning the execution model of DataFusion to limit the use of the call stack.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r701243226



##########
File path: datafusion/src/physical_plan/datetime_expressions.rs
##########
@@ -236,14 +236,8 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
     let f = |x: Option<i64>| x.map(|x| date_trunc_single(granularity, x)).transpose();
 
     Ok(match array {
-        ColumnarValue::Scalar(scalar) => {

Review comment:
       @nevi-me This is another clippy issue in nightly




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r701260232



##########
File path: datafusion/src/avro_to_arrow/schema.rs
##########
@@ -0,0 +1,452 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::datatypes::{DataType, IntervalUnit, Schema, TimeUnit};
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::Field;
+use avro_rs::schema::Name;
+use avro_rs::types::Value;
+use avro_rs::Schema as AvroSchema;
+use std::collections::BTreeMap;
+use std::convert::TryFrom;
+
+/// Converts an avro schema to an arrow schema
+pub fn to_arrow_schema(avro_schema: &avro_rs::Schema) -> Result<Schema> {
+    let mut schema_fields = vec![];
+    match avro_schema {
+        AvroSchema::Record { fields, .. } => {
+            for field in fields {
+                schema_fields.push(schema_to_field_with_props(
+                    &field.schema,
+                    Some(&field.name),
+                    false,
+                    Some(&external_props(&field.schema)),
+                )?)
+            }
+        }
+        schema => schema_fields.push(schema_to_field(schema, Some(""), false)?),
+    }
+
+    let schema = Schema::new(schema_fields);
+    Ok(schema)
+}
+
+fn schema_to_field(
+    schema: &avro_rs::Schema,
+    name: Option<&str>,
+    nullable: bool,
+) -> Result<Field> {
+    schema_to_field_with_props(schema, name, nullable, Some(&Default::default()))
+}
+
+fn schema_to_field_with_props(
+    schema: &AvroSchema,
+    name: Option<&str>,
+    nullable: bool,
+    props: Option<&BTreeMap<String, String>>,
+) -> Result<Field> {
+    let mut nullable = nullable;
+    let field_type: DataType = match schema {
+        AvroSchema::Null => DataType::Null,
+        AvroSchema::Boolean => DataType::Boolean,
+        AvroSchema::Int => DataType::Int32,
+        AvroSchema::Long => DataType::Int64,
+        AvroSchema::Float => DataType::Float32,
+        AvroSchema::Double => DataType::Float64,
+        AvroSchema::Bytes => DataType::Binary,
+        AvroSchema::String => DataType::Utf8,
+        AvroSchema::Array(item_schema) => DataType::List(Box::new(
+            schema_to_field_with_props(item_schema, None, false, None)?,
+        )),
+        AvroSchema::Map(value_schema) => {
+            let value_field =
+                schema_to_field_with_props(value_schema, Some("value"), false, None)?;
+            DataType::Dictionary(
+                Box::new(DataType::Utf8),
+                Box::new(value_field.data_type().clone()),
+            )
+        }
+        AvroSchema::Union(us) => {
+            // If there are only two variants and one of them is null, set the other type as the field data type
+            let has_nullable = us.find_schema(&Value::Null).is_some();
+            let sub_schemas = us.variants();
+            if has_nullable && sub_schemas.len() == 2 {
+                nullable = true;
+                if let Some(schema) = sub_schemas
+                    .iter()
+                    .find(|&schema| !matches!(schema, AvroSchema::Null))
+                {
+                    schema_to_field_with_props(schema, None, has_nullable, None)?
+                        .data_type()
+                        .clone()
+                } else {
+                    return Err(DataFusionError::AvroError(
+                        avro_rs::Error::GetUnionDuplicate,
+                    ));
+                }
+            } else {
+                let fields = sub_schemas
+                    .iter()
+                    .map(|s| schema_to_field_with_props(s, None, has_nullable, None))
+                    .collect::<Result<Vec<Field>>>()?;
+                DataType::Union(fields)
+            }
+        }
+        AvroSchema::Record { name, fields, .. } => {
+            let fields: Result<Vec<Field>> = fields
+                .iter()
+                .map(|field| {
+                    let mut props = BTreeMap::new();
+                    if let Some(doc) = &field.doc {
+                        props.insert("doc".to_string(), doc.clone());
+                    }
+                    /*if let Some(aliases) = fields.aliases {
+                        props.insert("aliases", aliases);
+                    }*/
+                    schema_to_field_with_props(
+                        &field.schema,
+                        Some(&format!("{}.{}", name.fullname(None), field.name)),
+                        false,
+                        Some(&props),
+                    )
+                })
+                .collect();
+            DataType::Struct(fields?)
+        }
+        AvroSchema::Enum { symbols, name, .. } => {
+            return Ok(Field::new_dict(
+                &name.fullname(None),
+                index_type(symbols.len()),
+                false,
+                0,
+                false,
+            ))
+        }
+        AvroSchema::Fixed { size, .. } => DataType::FixedSizeBinary(*size as i32),
+        AvroSchema::Decimal {
+            precision, scale, ..
+        } => DataType::Decimal(*precision, *scale),
+        AvroSchema::Uuid => DataType::Utf8,
+        AvroSchema::Date => DataType::Date32,
+        AvroSchema::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
+        AvroSchema::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
+        AvroSchema::TimestampMillis => DataType::Timestamp(TimeUnit::Millisecond, None),
+        AvroSchema::TimestampMicros => DataType::Timestamp(TimeUnit::Microsecond, None),
+        AvroSchema::Duration => DataType::Duration(TimeUnit::Millisecond),
+    };
+
+    let data_type = field_type.clone();
+    let name = name.unwrap_or_else(|| default_field_name(&data_type));
+
+    let mut field = Field::new(name, field_type, nullable);
+    field.set_metadata(props.cloned());
+    Ok(field)
+}
+
+fn default_field_name(dt: &DataType) -> &str {
+    match dt {
+        DataType::Null => "null",
+        DataType::Boolean => "bit",
+        DataType::Int8 => "tinyint",
+        DataType::Int16 => "smallint",
+        DataType::Int32 => "int",
+        DataType::Int64 => "bigint",
+        DataType::UInt8 => "uint1",
+        DataType::UInt16 => "uint2",
+        DataType::UInt32 => "uint4",
+        DataType::UInt64 => "uint8",
+        DataType::Float16 => "float2",
+        DataType::Float32 => "float4",
+        DataType::Float64 => "float8",
+        DataType::Date32 => "dateday",
+        DataType::Date64 => "datemilli",
+        DataType::Time32(tu) | DataType::Time64(tu) => match tu {
+            TimeUnit::Second => "timesec",
+            TimeUnit::Millisecond => "timemilli",
+            TimeUnit::Microsecond => "timemicro",
+            TimeUnit::Nanosecond => "timenano",
+        },
+        DataType::Timestamp(tu, tz) => {
+            if tz.is_some() {
+                match tu {
+                    TimeUnit::Second => "timestampsectz",
+                    TimeUnit::Millisecond => "timestampmillitz",
+                    TimeUnit::Microsecond => "timestampmicrotz",
+                    TimeUnit::Nanosecond => "timestampnanotz",
+                }
+            } else {
+                match tu {
+                    TimeUnit::Second => "timestampsec",
+                    TimeUnit::Millisecond => "timestampmilli",
+                    TimeUnit::Microsecond => "timestampmicro",
+                    TimeUnit::Nanosecond => "timestampnano",
+                }
+            }
+        }
+        DataType::Duration(_) => "duration",
+        DataType::Interval(unit) => match unit {
+            IntervalUnit::YearMonth => "intervalyear",
+            IntervalUnit::DayTime => "intervalmonth",
+        },
+        DataType::Binary => "varbinary",
+        DataType::FixedSizeBinary(_) => "fixedsizebinary",
+        DataType::LargeBinary => "largevarbinary",
+        DataType::Utf8 => "varchar",
+        DataType::LargeUtf8 => "largevarchar",
+        DataType::List(_) => "list",
+        DataType::FixedSizeList(_, _) => "fixed_size_list",
+        DataType::LargeList(_) => "largelist",
+        DataType::Struct(_) => "struct",
+        DataType::Union(_) => "union",
+        DataType::Dictionary(_, _) => "map",
+        DataType::Decimal(_, _) => "decimal",
+    }
+}
+
+fn index_type(len: usize) -> DataType {
+    if len <= usize::from(u8::MAX) {
+        DataType::Int8
+    } else if len <= usize::from(u16::MAX) {
+        DataType::Int16
+    } else if usize::try_from(u32::MAX).map(|i| len < i).unwrap_or(false) {
+        DataType::Int32
+    } else {
+        DataType::Int64
+    }
+}
+
+fn external_props(schema: &AvroSchema) -> BTreeMap<String, String> {
+    let mut props = BTreeMap::new();
+    match &schema {
+        AvroSchema::Record {
+            doc: Some(ref doc), ..
+        }
+        | AvroSchema::Enum {
+            doc: Some(ref doc), ..
+        } => {
+            props.insert("doc".to_string(), doc.clone());

Review comment:
       Record fields can have individual documentation, Enums and Records can have their own doc, Enum Record and Fixed can have aliases




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-908379864


   rebased


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-917640243


   I totally agree, was going to write this. It's better to avoid recursion and walk a tree iteratively instead.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-917629551


   ```
   (lldb) run
   Process 349706 launched: '/home/geps/dev/oss/arrow-datafusion/target/debug/deps/tpch-f2ea9cecad4b495e' (x86_64)
   
   running 44 tests
   test tests::q1 ... ok
   test tests::q10 ... ok
   test tests::q13 ... ok
   test tests::q11 ... ok
   test tests::q14 ... ok
   test tests::q12 ... ok
   test tests::q16 ... ok
   test tests::q15 ... ok
   test tests::q17 ... ok
   test tests::q18 ... ok
   test tests::q19 ... ok
   test tests::q2 ... ok
   test tests::q20 ... ok
   test tests::q21 ... ok
   test tests::q3 ... ok
   test tests::q22 ... ok
   test tests::q4 ... ok
   test tests::q5 ... ok
   test tests::q7 ... ok
   test tests::q6 ... ok
   test tests::q8 ... ok
   test tests::q9 ... ok
   test tests::ballista_round_trip::q6 ... ok
   Process 349706 stopped
   * thread #24, name = 'tokio-runtime-w', stop reason = signal SIGSEGV: address access protected (fault address: 0x7ffff57f9870)
       frame #0: 0x0000555558cfa6b1 tpch-f2ea9cecad4b495e`__rust_probestack + 23
   tpch-f2ea9cecad4b495e`__rust_probestack:
   ->  0x555558cfa6b1 <+23>: testq  %rsp, 0x8(%rsp)
       0x555558cfa6b6 <+28>: subq   $0x1000, %r11             ; imm = 0x1000 
       0x555558cfa6bd <+35>: cmpq   $0x1000, %r11             ; imm = 0x1000 
       0x555558cfa6c4 <+42>: ja     0x555558cfa6aa            ; <+16>
   (lldb) bt
   * thread #24, name = 'tokio-runtime-w', stop reason = signal SIGSEGV: address access protected (fault address: 0x7ffff57f9870)
     * frame #0: 0x0000555558cfa6b1 tpch-f2ea9cecad4b495e`__rust_probestack + 23
       frame #1: 0x00005555572ebe5a tpch-f2ea9cecad4b495e`_$LT$datafusion..physical_plan..expressions..binary..BinaryExpr$u20$as$u20$datafusion..physical_plan..PhysicalExpr$GT$::evaluate::hb8ffd80410333eeb + 10
       frame #2: 0x00005555572ec167 tpch-f2ea9cecad4b495e`_$LT$datafusion..physical_plan..expressions..binary..BinaryExpr$u20$as$u20$datafusion..physical_plan..PhysicalExpr$GT$::evaluate::hb8ffd80410333eeb + 791
       frame #3: 0x00005555572ec029 tpch-f2ea9cecad4b495e`_$LT$datafusion..physical_plan..expressions..binary..BinaryExpr$u20$as$u20$datafusion..physical_plan..PhysicalExpr$GT$::evaluate::hb8ffd80410333eeb + 473
       frame #4: 0x00005555572ec167 tpch-f2ea9cecad4b495e`_$LT$datafusion..physical_plan..expressions..binary..BinaryExpr$u20$as$u20$datafusion..physical_plan..PhysicalExpr$GT$::evaluate::hb8ffd80410333eeb + 791
       frame #5: 0x00005555572ec029 tpch-f2ea9cecad4b495e`_$LT$datafusion..physical_plan..expressions..binary..BinaryExpr$u20$as$u20$datafusion..physical_plan..PhysicalExpr$GT$::evaluate::hb8ffd80410333eeb + 473
       frame #6: 0x0000555556cd9275 tpch-f2ea9cecad4b495e`datafusion::physical_plan::filter::batch_filter::h2931bae7e18f7b02 + 85
       frame #7: 0x000055555668ca1d tpch-f2ea9cecad4b495e`_$LT$datafusion..physical_plan..filter..FilterExecStream$u20$as$u20$futures_core..stream..Stream$GT$::poll_next::_$u7b$$u7b$closure$u7d$$u7d$::h5320c300c001a1a9 + 381
       frame #8: 0x0000555556efe896 tpch-f2ea9cecad4b495e`core::task::poll::Poll$LT$T$GT$::map::hae20e84bf478b027 + 166
       frame #9: 0x0000555556cd9359 tpch-f2ea9cecad4b495e`_$LT$datafusion..physical_plan..filter..FilterExecStream$u20$as$u20$futures_core..stream..Stream$GT$::poll_next::ha1bd18f41b2aeaeb + 121
       frame #10: 0x0000555556818519 tpch-f2ea9cecad4b495e`_$LT$core..pin..Pin$LT$P$GT$$u20$as$u20$futures_core..stream..Stream$GT$::poll_next::h6b8322acd76ed2f3 + 121
       frame #11: 0x0000555556814039 tpch-f2ea9cecad4b495e`futures_util::stream::stream::StreamExt::poll_next_unpin::hc246907f8724c84f + 73
       frame #12: 0x00005555567be4f7 tpch-f2ea9cecad4b495e`_$LT$datafusion..physical_plan..coalesce_batches..CoalesceBatchesStream$u20$as$u20$futures_core..stream..Stream$GT$::poll_next::hec3849e5708ab413 + 199
       frame #13: 0x0000555556818519 tpch-f2ea9cecad4b495e`_$LT$core..pin..Pin$LT$P$GT$$u20$as$u20$futures_core..stream..Stream$GT$::poll_next::h6b8322acd76ed2f3 + 121
       frame #14: 0x0000555556814039 tpch-f2ea9cecad4b495e`futures_util::stream::stream::StreamExt::poll_next_unpin::hc246907f8724c84f + 73
       frame #15: 0x000055555668ab65 tpch-f2ea9cecad4b495e`_$LT$futures_util..stream..stream..next..Next$LT$St$GT$$u20$as$u20$core..future..future..Future$GT$::poll::h43dc0a231349b70c + 69
       frame #16: 0x0000555556c38170 tpch-f2ea9cecad4b495e`datafusion::physical_plan::hash_aggregate::compute_hash_aggregate::_$u7b$$u7b$closure$u7d$$u7d$::ha133b45ff83b5316 + 1648
       frame #17: 0x00005555571fc055 tpch-f2ea9cecad4b495e`_$LT$core..future..from_generator..GenFuture$LT$T$GT$$u20$as$u20$core..future..future..Future$GT$::poll::hd64cd23280783697 + 133
       frame #18: 0x0000555556c38d3f tpch-f2ea9cecad4b495e`datafusion::physical_plan::hash_aggregate::HashAggregateStream::new::_$u7b$$u7b$closure$u7d$$u7d$::hfcfb865fc32d4af0 + 479
       frame #19: 0x00005555571f9c24 tpch-f2ea9cecad4b495e`_$LT$core..future..from_generator..GenFuture$LT$T$GT$$u20$as$u20$core..future..future..Future$GT$::poll::h2eb24a5c9faf5544 + 132
       frame #20: 0x0000555556920cbe tpch-f2ea9cecad4b495e`tokio::runtime::task::core::CoreStage$LT$T$GT$::poll::_$u7b$$u7b$closure$u7d$$u7d$::h3fc037b970535b0d + 270
       frame #21: 0x0000555557053946 tpch-f2ea9cecad4b495e`tokio::loom::std::unsafe_cell::UnsafeCell$LT$T$GT$::with_mut::hf353d404048ecbc0 + 86
       frame #22: 0x0000555556920295 tpch-f2ea9cecad4b495e`tokio::runtime::task::core::CoreStage$LT$T$GT$::poll::h8ac0726a76622c6c + 53
       frame #23: 0x00005555568f55bb tpch-f2ea9cecad4b495e`tokio::runtime::task::harness::poll_future::_$u7b$$u7b$closure$u7d$$u7d$::h47c8c22e7e12cb4e + 59
       frame #24: 0x00005555571bd77c tpch-f2ea9cecad4b495e`_$LT$core..panic..unwind_safe..AssertUnwindSafe$LT$F$GT$$u20$as$u20$core..ops..function..FnOnce$LT$$LP$$RP$$GT$$GT$::call_once::h5592876ff7ee44ca + 28
       frame #25: 0x0000555556fc45ff tpch-f2ea9cecad4b495e`std::panicking::try::do_call::hed76e74d7288a682 + 79
       frame #26: 0x0000555556fc9b0b tpch-f2ea9cecad4b495e`__rust_try + 27
       frame #27: 0x0000555556fbed7f tpch-f2ea9cecad4b495e`std::panicking::try::ha2530087e54fd077 + 143
       frame #28: 0x000055555691431b tpch-f2ea9cecad4b495e`std::panic::catch_unwind::h7c6668a4704e2556 + 27
       frame #29: 0x00005555568f433c tpch-f2ea9cecad4b495e`tokio::runtime::task::harness::poll_future::h7e06bd0ab8459b4f + 92
       frame #30: 0x00005555568f9244 tpch-f2ea9cecad4b495e`tokio::runtime::task::harness::Harness$LT$T$C$S$GT$::poll_inner::h9ff3a836c0bd363f + 260
       frame #31: 0x000055555690a174 tpch-f2ea9cecad4b495e`tokio::runtime::task::harness::Harness$LT$T$C$S$GT$::poll::hb6e99b3200023f33 + 20
       frame #32: 0x0000555556925052 tpch-f2ea9cecad4b495e`tokio::runtime::task::raw::poll::he5ab3ab29179160a + 34
       frame #33: 0x0000555557dc990f tpch-f2ea9cecad4b495e`tokio::runtime::task::raw::RawTask::poll::h39bb74e855f2bb5a + 47
       frame #34: 0x0000555557d8f262 tpch-f2ea9cecad4b495e`tokio::runtime::task::LocalNotified$LT$S$GT$::run::hd0105087dcdd2fa5 + 34
       frame #35: 0x0000555557de9362 tpch-f2ea9cecad4b495e`tokio::runtime::thread_pool::worker::Context::run_task::_$u7b$$u7b$closure$u7d$$u7d$::hb21a1ce4bfebfa0d + 66
       frame #36: 0x0000555557da0dbd tpch-f2ea9cecad4b495e`tokio::coop::with_budget::_$u7b$$u7b$closure$u7d$$u7d$::ha64bc7dc29ca1c6d + 205
       frame #37: 0x0000555557de137b tpch-f2ea9cecad4b495e`std::thread::local::LocalKey$LT$T$GT$::try_with::h17cad570de6d35e8 + 219
       frame #38: 0x0000555557de0dce tpch-f2ea9cecad4b495e`std::thread::local::LocalKey$LT$T$GT$::with::hfcc045bf7e8c2826 + 46
       frame #39: 0x0000555557de926b tpch-f2ea9cecad4b495e`tokio::runtime::thread_pool::worker::Context::run_task::h01faa3a5c018378e + 891
       frame #40: 0x0000555557de8b46 tpch-f2ea9cecad4b495e`tokio::runtime::thread_pool::worker::Context::run::h3e60facf9904afd7 + 374
       frame #41: 0x0000555557de8943 tpch-f2ea9cecad4b495e`tokio::runtime::thread_pool::worker::run::_$u7b$$u7b$closure$u7d$$u7d$::hfeb3f6ee7e351412 + 19
       frame #42: 0x0000555557df9977 tpch-f2ea9cecad4b495e`tokio::macros::scoped_tls::ScopedKey$LT$T$GT$::set::h9cf62dde3fd93b3c + 119
       frame #43: 0x0000555557de886f tpch-f2ea9cecad4b495e`tokio::runtime::thread_pool::worker::run::h7510bc0782a07caf + 383
       frame #44: 0x0000555557de86eb tpch-f2ea9cecad4b495e`tokio::runtime::thread_pool::worker::Launch::launch::_$u7b$$u7b$closure$u7d$$u7d$::h6f8716c2ddf69fed + 11
       frame #45: 0x0000555557d9a20a tpch-f2ea9cecad4b495e`_$LT$tokio..runtime..blocking..task..BlockingTask$LT$T$GT$$u20$as$u20$core..future..future..Future$GT$::poll::hc6b9526f6672be24 + 122
       frame #46: 0x0000555557dc8ed1 tpch-f2ea9cecad4b495e`tokio::runtime::task::core::CoreStage$LT$T$GT$::poll::_$u7b$$u7b$closure$u7d$$u7d$::hbe1e499f7860e87f + 257
       frame #47: 0x0000555557de5c45 tpch-f2ea9cecad4b495e`tokio::loom::std::unsafe_cell::UnsafeCell$LT$T$GT$::with_mut::he2847e1f3c127ef4 + 69
       frame #48: 0x0000555557dc8c48 tpch-f2ea9cecad4b495e`tokio::runtime::task::core::CoreStage$LT$T$GT$::poll::hfc738b56769578a4 + 40
       frame #49: 0x0000555557ddf36e tpch-f2ea9cecad4b495e`tokio::runtime::task::harness::poll_future::_$u7b$$u7b$closure$u7d$$u7d$::hb1076201b62a973a + 46
       frame #50: 0x0000555557d9d9a3 tpch-f2ea9cecad4b495e`_$LT$core..panic..unwind_safe..AssertUnwindSafe$LT$F$GT$$u20$as$u20$core..ops..function..FnOnce$LT$$LP$$RP$$GT$$GT$::call_once::hb12353242aa56739 + 19
       frame #51: 0x0000555557dcdaa2 tpch-f2ea9cecad4b495e`std::panicking::try::do_call::h318201706e1b68bf + 66
       frame #52: 0x0000555557dce47b tpch-f2ea9cecad4b495e`__rust_try + 27
       frame #53: 0x0000555557dcd36a tpch-f2ea9cecad4b495e`std::panicking::try::h85fa9d8d5048a8c2 + 122
       frame #54: 0x0000555557de053b tpch-f2ea9cecad4b495e`std::panic::catch_unwind::h2fd101cfd96d457f + 27
       frame #55: 0x0000555557ddeefc tpch-f2ea9cecad4b495e`tokio::runtime::task::harness::poll_future::h8341970483244e5a + 92
       frame #56: 0x0000555557ddd44c tpch-f2ea9cecad4b495e`tokio::runtime::task::harness::Harness$LT$T$C$S$GT$::poll_inner::hbd02c615e7de37cf + 252
       frame #57: 0x0000555557ddd0f4 tpch-f2ea9cecad4b495e`tokio::runtime::task::harness::Harness$LT$T$C$S$GT$::poll::ha9752d47ac2cf040 + 20
       frame #58: 0x0000555557dc9a62 tpch-f2ea9cecad4b495e`tokio::runtime::task::raw::poll::ha581e4009ba7798c + 34
       frame #59: 0x0000555557dc990f tpch-f2ea9cecad4b495e`tokio::runtime::task::raw::RawTask::poll::h39bb74e855f2bb5a + 47
       frame #60: 0x0000555557d8f314 tpch-f2ea9cecad4b495e`tokio::runtime::task::UnownedTask$LT$S$GT$::run::ha99f5d9c67a2de92 + 52
       frame #61: 0x0000555557dbc627 tpch-f2ea9cecad4b495e`tokio::runtime::blocking::pool::Inner::run::hf5d495e407c88d15 + 2791
       frame #62: 0x0000555557dbbaac tpch-f2ea9cecad4b495e`tokio::runtime::blocking::pool::Spawner::spawn_thread::_$u7b$$u7b$closure$u7d$$u7d$::h8098f20a7ba1cba0 + 140
       frame #63: 0x0000555557d9da4c tpch-f2ea9cecad4b495e`std::sys_common::backtrace::__rust_begin_short_backtrace::h2cb10f2dfbb829a7 + 28
       frame #64: 0x0000555557dbec21 tpch-f2ea9cecad4b495e`std::thread::Builder::spawn_unchecked::_$u7b$$u7b$closure$u7d$$u7d$::_$u7b$$u7b$closure$u7d$$u7d$::h04d638576b74652d + 33
       frame #65: 0x0000555557d9d8b1 tpch-f2ea9cecad4b495e`_$LT$core..panic..unwind_safe..AssertUnwindSafe$LT$F$GT$$u20$as$u20$core..ops..function..FnOnce$LT$$LP$$RP$$GT$$GT$::call_once::h0b01bb3f803b5867 + 33
       frame #66: 0x0000555557dcda0c tpch-f2ea9cecad4b495e`std::panicking::try::do_call::h2a540903513d0d41 + 76
       frame #67: 0x0000555557dce47b tpch-f2ea9cecad4b495e`__rust_try + 27
       frame #68: 0x0000555557dcd8f1 tpch-f2ea9cecad4b495e`std::panicking::try::hf3cae92ffab45e45 + 129
       frame #69: 0x0000555557de0611 tpch-f2ea9cecad4b495e`std::panic::catch_unwind::h7e6df63ac1e399eb + 33
       frame #70: 0x0000555557dbea35 tpch-f2ea9cecad4b495e`std::thread::Builder::spawn_unchecked::_$u7b$$u7b$closure$u7d$$u7d$::hc605bcf30235c950 + 485
       frame #71: 0x0000555557d825ff tpch-f2ea9cecad4b495e`core::ops::function::FnOnce::call_once$u7b$$u7b$vtable.shim$u7d$$u7d$::h033582bbbaf2bf5c + 15
       frame #72: 0x0000555558cd3283 tpch-f2ea9cecad4b495e`std::sys::unix::thread::Thread::new::thread_start::h62931528f61e35f5 + 35
       frame #73: 0x00007ffff7f69450 libpthread.so.0`start_thread(arg=0x00007ffff59f9640) at pthread_create.c:473:8
       frame #74: 0x00007ffff7d36d53 libc.so.6`__clone + 67
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-917628970


   @alamb 
   The tests pass on master or on the branch if I remove the avro feature, it could be a test harness problem with tokio simply because we are adding more async tests, or a dependency causing trouble
   
   ```
   cargo test --features=avro
   thread 'tests::run_q6' has overflowed its stack
   fatal runtime error: stack overflow
   error: test failed, to rerun pass '-p arrow-benchmarks --bin tpch'
   ```
   
   ```
   cargo test 
   test result: ok
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r696549158



##########
File path: datafusion/src/avro_to_arrow/reader.rs
##########
@@ -0,0 +1,289 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::datatypes::{Schema, SchemaRef};
+use crate::arrow::record_batch::RecordBatch;
+use crate::avro_to_arrow::arrow_array_reader::AvroArrowArrayReader;
+use crate::error::Result;
+use arrow::error::Result as ArrowResult;
+use avro_rs::Reader as AvroReader;
+use std::io::{Read, Seek, SeekFrom};
+use std::sync::Arc;
+
+/// Avro file reader builder
+#[derive(Debug)]
+pub struct ReaderBuilder {
+    /// Optional schema for the Avro file
+    ///
+    /// If the schema is not supplied, the reader will try to infer the schema
+    /// based on the Avro structure.
+    schema: Option<SchemaRef>,
+    /// Batch size (number of records to load each time)
+    ///
+    /// The default batch size when using the `ReaderBuilder` is 1024 records
+    batch_size: usize,
+    /// Optional projection for which columns to load (zero-based column indices)
+    projection: Option<Vec<String>>,
+}
+
+impl Default for ReaderBuilder {
+    fn default() -> Self {
+        Self {
+            schema: None,
+            batch_size: 1024,
+            projection: None,
+        }
+    }
+}
+
+impl ReaderBuilder {
+    /// Create a new builder for configuring Avro parsing options.
+    ///
+    /// To convert a builder into a reader, call `Reader::from_builder`
+    ///
+    /// # Example
+    ///
+    /// ```
+    /// extern crate avro_rs;
+    ///
+    /// use std::fs::File;
+    ///
+    /// fn example() -> crate::datafusion::avro_to_arrow::Reader<'static, File> {
+    ///     let file = File::open("test/data/basic.avro").unwrap();
+    ///
+    ///     // create a builder, inferring the schema with the first 100 records
+    ///     let builder = crate::datafusion::avro_to_arrow::ReaderBuilder::new().infer_schema().with_batch_size(100);
+    ///
+    ///     let reader = builder.build::<File>(file).unwrap();
+    ///
+    ///     reader
+    /// }
+    /// ```
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Set the Avro file's schema
+    pub fn with_schema(mut self, schema: SchemaRef) -> Self {
+        self.schema = Some(schema);
+        self
+    }
+
+    /// Set the Avro reader to infer the schema of the file
+    pub fn infer_schema(mut self) -> Self {
+        // remove any schema that is set
+        self.schema = None;
+        self
+    }
+
+    /// Set the batch size (number of records to load at one time)
+    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+        self.batch_size = batch_size;
+        self
+    }
+
+    /// Set the reader's column projection
+    pub fn with_projection(mut self, projection: Vec<String>) -> Self {
+        self.projection = Some(projection);
+        self
+    }
+
+    /// Create a new `Reader` from the `ReaderBuilder`
+    pub fn build<'a, R>(self, source: R) -> Result<Reader<'a, R>>
+    where
+        R: Read + Seek,
+    {
+        let mut source = source;
+
+        // check if schema should be inferred
+        let schema = match self.schema {
+            Some(schema) => schema,
+            None => Arc::new(infer_avro_schema_from_reader(&mut source)?),
+        };
+        source.seek(SeekFrom::Start(0))?;
+        Reader::try_new(source, schema, self.batch_size, self.projection)
+    }
+}
+
+/// Avro file record  reader
+pub struct Reader<'a, R: Read> {
+    array_reader: AvroArrowArrayReader<'a, R>,
+    schema: SchemaRef,
+    batch_size: usize,
+}
+
+impl<'a, R: Read> Reader<'a, R> {
+    /// Create a new Avro Reader from any value that implements the `Read` trait.
+    ///
+    /// If reading a `File`, you can customise the Reader, such as to enable schema
+    /// inference, use `ReaderBuilder`.
+    pub fn try_new(
+        reader: R,
+        schema: SchemaRef,
+        batch_size: usize,
+        projection: Option<Vec<String>>,
+    ) -> Result<Self> {
+        Ok(Self {
+            array_reader: AvroArrowArrayReader::try_new(
+                AvroReader::new(reader)?,
+                schema.clone(),
+                projection,
+            )?,
+            schema,
+            batch_size,
+        })
+    }
+
+    /// Returns the schema of the reader, useful for getting the schema without reading
+    /// record batches
+    pub fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Read the next batch of records
+    #[allow(clippy::should_implement_trait)]
+    pub fn next(&mut self) -> ArrowResult<Option<RecordBatch>> {
+        self.array_reader.next_batch(self.batch_size)
+    }
+}
+
+impl<'a, R: Read> Iterator for Reader<'a, R> {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.next().transpose()
+    }
+}
+
+/// Infer Avro schema given a reader
+pub fn infer_avro_schema_from_reader<R: Read + Seek>(reader: &mut R) -> Result<Schema> {
+    let avro_reader = avro_rs::Reader::new(reader)?;
+    let schema = avro_reader.writer_schema();
+    super::to_arrow_schema(schema)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::arrow::array::*;
+    use crate::arrow::datatypes::{DataType, Field};
+    use arrow::datatypes::TimeUnit;
+    use std::fs::File;
+
+    fn build_reader(name: &str) -> Reader<File> {
+        let testdata = crate::test_util::arrow_test_data();
+        let filename = format!("{}/avro/{}", testdata, name);
+        let builder = ReaderBuilder::new().infer_schema().with_batch_size(64);
+        builder.build(File::open(filename).unwrap()).unwrap()
+    }
+
+    fn get_col<'a, T: 'static>(
+        batch: &'a RecordBatch,
+        col: (usize, &Field),
+    ) -> Option<&'a T> {
+        batch.column(col.0).as_any().downcast_ref::<T>()
+    }
+
+    #[test]
+    fn test_avro_basic() {
+        let mut reader = build_reader("alltypes_dictionary.avro");
+        let batch = reader.next().unwrap().unwrap();
+
+        assert_eq!(11, batch.num_columns());
+        assert_eq!(2, batch.num_rows());
+
+        let schema = reader.schema();
+        let batch_schema = batch.schema();
+        assert_eq!(schema, batch_schema);
+
+        let id = schema.column_with_name("id").unwrap();
+        assert_eq!(0, id.0);
+        assert_eq!(&DataType::Int32, id.1.data_type());
+        let col = get_col::<Int32Array>(&batch, id).unwrap();

Review comment:
       Although for this particular test, the output wouldn't for instance distinguish between Int32 and Int64 and in this test we really want to test the type of columns not just the values.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r701262121



##########
File path: datafusion/src/avro_to_arrow/schema.rs
##########
@@ -0,0 +1,452 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::datatypes::{DataType, IntervalUnit, Schema, TimeUnit};
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::Field;
+use avro_rs::schema::Name;
+use avro_rs::types::Value;
+use avro_rs::Schema as AvroSchema;
+use std::collections::BTreeMap;
+use std::convert::TryFrom;
+
+/// Converts an avro schema to an arrow schema
+pub fn to_arrow_schema(avro_schema: &avro_rs::Schema) -> Result<Schema> {
+    let mut schema_fields = vec![];
+    match avro_schema {
+        AvroSchema::Record { fields, .. } => {
+            for field in fields {
+                schema_fields.push(schema_to_field_with_props(
+                    &field.schema,
+                    Some(&field.name),
+                    false,
+                    Some(&external_props(&field.schema)),
+                )?)
+            }
+        }
+        schema => schema_fields.push(schema_to_field(schema, Some(""), false)?),
+    }
+
+    let schema = Schema::new(schema_fields);
+    Ok(schema)
+}
+
+fn schema_to_field(
+    schema: &avro_rs::Schema,
+    name: Option<&str>,
+    nullable: bool,
+) -> Result<Field> {
+    schema_to_field_with_props(schema, name, nullable, Some(&Default::default()))
+}
+
+fn schema_to_field_with_props(
+    schema: &AvroSchema,
+    name: Option<&str>,
+    nullable: bool,
+    props: Option<&BTreeMap<String, String>>,
+) -> Result<Field> {
+    let mut nullable = nullable;
+    let field_type: DataType = match schema {
+        AvroSchema::Null => DataType::Null,
+        AvroSchema::Boolean => DataType::Boolean,
+        AvroSchema::Int => DataType::Int32,
+        AvroSchema::Long => DataType::Int64,
+        AvroSchema::Float => DataType::Float32,
+        AvroSchema::Double => DataType::Float64,
+        AvroSchema::Bytes => DataType::Binary,
+        AvroSchema::String => DataType::Utf8,
+        AvroSchema::Array(item_schema) => DataType::List(Box::new(
+            schema_to_field_with_props(item_schema, None, false, None)?,
+        )),
+        AvroSchema::Map(value_schema) => {
+            let value_field =
+                schema_to_field_with_props(value_schema, Some("value"), false, None)?;
+            DataType::Dictionary(
+                Box::new(DataType::Utf8),
+                Box::new(value_field.data_type().clone()),
+            )
+        }
+        AvroSchema::Union(us) => {
+            // If there are only two variants and one of them is null, set the other type as the field data type
+            let has_nullable = us.find_schema(&Value::Null).is_some();
+            let sub_schemas = us.variants();
+            if has_nullable && sub_schemas.len() == 2 {
+                nullable = true;
+                if let Some(schema) = sub_schemas
+                    .iter()
+                    .find(|&schema| !matches!(schema, AvroSchema::Null))
+                {
+                    schema_to_field_with_props(schema, None, has_nullable, None)?
+                        .data_type()
+                        .clone()
+                } else {
+                    return Err(DataFusionError::AvroError(
+                        avro_rs::Error::GetUnionDuplicate,
+                    ));
+                }
+            } else {
+                let fields = sub_schemas
+                    .iter()
+                    .map(|s| schema_to_field_with_props(s, None, has_nullable, None))
+                    .collect::<Result<Vec<Field>>>()?;
+                DataType::Union(fields)
+            }
+        }
+        AvroSchema::Record { name, fields, .. } => {
+            let fields: Result<Vec<Field>> = fields
+                .iter()
+                .map(|field| {
+                    let mut props = BTreeMap::new();
+                    if let Some(doc) = &field.doc {
+                        props.insert("doc".to_string(), doc.clone());
+                    }
+                    /*if let Some(aliases) = fields.aliases {
+                        props.insert("aliases", aliases);
+                    }*/
+                    schema_to_field_with_props(
+                        &field.schema,
+                        Some(&format!("{}.{}", name.fullname(None), field.name)),
+                        false,
+                        Some(&props),
+                    )
+                })
+                .collect();
+            DataType::Struct(fields?)
+        }
+        AvroSchema::Enum { symbols, name, .. } => {
+            return Ok(Field::new_dict(
+                &name.fullname(None),
+                index_type(symbols.len()),
+                false,
+                0,
+                false,
+            ))
+        }
+        AvroSchema::Fixed { size, .. } => DataType::FixedSizeBinary(*size as i32),
+        AvroSchema::Decimal {
+            precision, scale, ..
+        } => DataType::Decimal(*precision, *scale),
+        AvroSchema::Uuid => DataType::Utf8,
+        AvroSchema::Date => DataType::Date32,
+        AvroSchema::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
+        AvroSchema::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
+        AvroSchema::TimestampMillis => DataType::Timestamp(TimeUnit::Millisecond, None),
+        AvroSchema::TimestampMicros => DataType::Timestamp(TimeUnit::Microsecond, None),
+        AvroSchema::Duration => DataType::Duration(TimeUnit::Millisecond),
+    };
+
+    let data_type = field_type.clone();
+    let name = name.unwrap_or_else(|| default_field_name(&data_type));
+
+    let mut field = Field::new(name, field_type, nullable);
+    field.set_metadata(props.cloned());
+    Ok(field)
+}
+
+fn default_field_name(dt: &DataType) -> &str {
+    match dt {
+        DataType::Null => "null",
+        DataType::Boolean => "bit",
+        DataType::Int8 => "tinyint",
+        DataType::Int16 => "smallint",
+        DataType::Int32 => "int",
+        DataType::Int64 => "bigint",
+        DataType::UInt8 => "uint1",
+        DataType::UInt16 => "uint2",
+        DataType::UInt32 => "uint4",
+        DataType::UInt64 => "uint8",
+        DataType::Float16 => "float2",
+        DataType::Float32 => "float4",
+        DataType::Float64 => "float8",
+        DataType::Date32 => "dateday",
+        DataType::Date64 => "datemilli",
+        DataType::Time32(tu) | DataType::Time64(tu) => match tu {
+            TimeUnit::Second => "timesec",
+            TimeUnit::Millisecond => "timemilli",
+            TimeUnit::Microsecond => "timemicro",
+            TimeUnit::Nanosecond => "timenano",
+        },
+        DataType::Timestamp(tu, tz) => {
+            if tz.is_some() {
+                match tu {
+                    TimeUnit::Second => "timestampsectz",
+                    TimeUnit::Millisecond => "timestampmillitz",
+                    TimeUnit::Microsecond => "timestampmicrotz",
+                    TimeUnit::Nanosecond => "timestampnanotz",
+                }
+            } else {
+                match tu {
+                    TimeUnit::Second => "timestampsec",
+                    TimeUnit::Millisecond => "timestampmilli",
+                    TimeUnit::Microsecond => "timestampmicro",
+                    TimeUnit::Nanosecond => "timestampnano",
+                }
+            }
+        }
+        DataType::Duration(_) => "duration",
+        DataType::Interval(unit) => match unit {
+            IntervalUnit::YearMonth => "intervalyear",
+            IntervalUnit::DayTime => "intervalmonth",
+        },
+        DataType::Binary => "varbinary",
+        DataType::FixedSizeBinary(_) => "fixedsizebinary",
+        DataType::LargeBinary => "largevarbinary",
+        DataType::Utf8 => "varchar",
+        DataType::LargeUtf8 => "largevarchar",
+        DataType::List(_) => "list",
+        DataType::FixedSizeList(_, _) => "fixed_size_list",
+        DataType::LargeList(_) => "largelist",
+        DataType::Struct(_) => "struct",
+        DataType::Union(_) => "union",
+        DataType::Dictionary(_, _) => "map",
+        DataType::Decimal(_, _) => "decimal",
+    }
+}
+
+fn index_type(len: usize) -> DataType {
+    if len <= usize::from(u8::MAX) {
+        DataType::Int8
+    } else if len <= usize::from(u16::MAX) {
+        DataType::Int16
+    } else if usize::try_from(u32::MAX).map(|i| len < i).unwrap_or(false) {
+        DataType::Int32
+    } else {
+        DataType::Int64
+    }
+}
+
+fn external_props(schema: &AvroSchema) -> BTreeMap<String, String> {
+    let mut props = BTreeMap::new();
+    match &schema {
+        AvroSchema::Record {
+            doc: Some(ref doc), ..
+        }
+        | AvroSchema::Enum {
+            doc: Some(ref doc), ..
+        } => {
+            props.insert("doc".to_string(), doc.clone());
+        }
+        _ => {}
+    }
+    match &schema {
+        AvroSchema::Record {
+            name:
+                Name {
+                    aliases: Some(aliases),
+                    namespace,
+                    ..
+                },
+            ..
+        }
+        | AvroSchema::Enum {
+            name:
+                Name {
+                    aliases: Some(aliases),
+                    namespace,
+                    ..
+                },
+            ..
+        }
+        | AvroSchema::Fixed {
+            name:
+                Name {
+                    aliases: Some(aliases),
+                    namespace,
+                    ..
+                },
+            ..
+        } => {
+            let aliases: Vec<String> = aliases
+                .iter()
+                .map(|alias| aliased(alias, namespace.as_deref(), None))
+                .collect();
+            props.insert("aliases".to_string(), format!("[{}]", aliases.join(",")));
+        }
+        _ => {}
+    }
+    props
+}
+
+#[allow(dead_code)]
+fn get_metadata(

Review comment:
       Yeah I completely forgot about this, arbitrary metadata can land into the avro schema, and so can be passed through to arrow, but I didn't implement it. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r697964158



##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -17,6 +17,7 @@
 
 //! DataFusion data sources
 
+pub mod avro;

Review comment:
       this should be gated by feature flag, no?

##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -78,7 +78,7 @@ impl NthValue {
     }
 
     /// Create a new NTH_VALUE window aggregate function
-    pub fn nth_value(
+    pub fn value(

Review comment:
       maybe `first`, `last` and `nth` would be better names here, cc @Jimexist 

##########
File path: datafusion/Cargo.toml
##########
@@ -69,6 +70,8 @@ regex = { version = "^1.4.3", optional = true }
 lazy_static = { version = "^1.4.0", optional = true }
 smallvec = { version = "1.6", features = ["union"] }
 rand = "0.8"
+avro-rs = { version = "0.13", features = ["snappy"], optional = true }
+num-traits = "0.2"

Review comment:
       It looks like num-traits should be marked as optional and be included as part of `avro` feature flag?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r697996034



##########
File path: datafusion/Cargo.toml
##########
@@ -69,6 +70,8 @@ regex = { version = "^1.4.3", optional = true }
 lazy_static = { version = "^1.4.0", optional = true }
 smallvec = { version = "1.6", features = ["union"] }
 rand = "0.8"
+avro-rs = { version = "0.13", features = ["snappy"], optional = true }
+num-traits = "0.2"

Review comment:
       ```suggestion
   num-traits = { version = "0.2", optional = true }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan edited a comment on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Dandandan edited a comment on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-917637560


   Cargo test uses a dev/test profile by default.
   
   I think the usage of the test runner + Tokio runtime + the current design of DataFusion to recurse into the execution plan and expressions increase the usage of stack space.
   Running the test without optimizations will increase the usage of stack space, as almost no optimizations are done in this case.
   
   There could be made some improvements like using a explicit stack on the heap, e.g. for the `evaluate` function, and redesigning the execution model of DataFusion to limit the use of the call stack.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-907849901


   Looks like I should have run the entire CI pipeline before committing, will fix


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] nevi-me commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
nevi-me commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-907762733


   @Igosuki @houqp @alamb I firstly apologise for not being able to review anything of late :(
   
   I have a question on the avro support, don't know if it's been asked/addresseed yet. Would it be better to move the reader to arrow-rs, so that a writer could also be implemented there?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r701263397



##########
File path: datafusion/src/datasource/avro.rs
##########
@@ -0,0 +1,434 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Line-delimited Avro data source
+//!
+//! This data source allows Line-delimited Avro records or files to be used as input for queries.
+//!
+
+use std::{
+    any::Any,
+    io::{Read, Seek},
+    sync::{Arc, Mutex},
+};
+
+use arrow::datatypes::SchemaRef;
+
+use crate::physical_plan::avro::{AvroExec, AvroReadOptions};
+use crate::{
+    datasource::{Source, TableProvider},
+    error::{DataFusionError, Result},
+    physical_plan::{common, ExecutionPlan},
+};
+
+use super::datasource::Statistics;
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+
+/// Represents a  line-delimited Avro file with a provided schema

Review comment:
       It isn't, but since we read the schema before hand, when re-invoking the avro reader it would crash because it can't read the file header. Of course it's not ideal




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] nevi-me commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
nevi-me commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-917615982


   The clippy one is related:
   
   ```rust
   error: this creates an owned instance just for comparison
      --> datafusion/src/physical_plan/avro.rs:448:23
       |
   448 |             if msg == "cannot read avro schema without the 'avro' feature enabled".to_string()
       |                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: try: `*"cannot read avro schema without the 'avro' feature enabled"`
       |
       = note: `-D clippy::cmp-owned` implied by `-D warnings`
       = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#cmp_owned
   ```
   
   The other one is unrelated, but it's not a failure that I've seen before.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r701261369



##########
File path: datafusion/src/avro_to_arrow/schema.rs
##########
@@ -0,0 +1,452 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::datatypes::{DataType, IntervalUnit, Schema, TimeUnit};
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::Field;
+use avro_rs::schema::Name;
+use avro_rs::types::Value;
+use avro_rs::Schema as AvroSchema;
+use std::collections::BTreeMap;
+use std::convert::TryFrom;
+
+/// Converts an avro schema to an arrow schema
+pub fn to_arrow_schema(avro_schema: &avro_rs::Schema) -> Result<Schema> {
+    let mut schema_fields = vec![];
+    match avro_schema {
+        AvroSchema::Record { fields, .. } => {
+            for field in fields {
+                schema_fields.push(schema_to_field_with_props(
+                    &field.schema,
+                    Some(&field.name),
+                    false,
+                    Some(&external_props(&field.schema)),
+                )?)
+            }
+        }
+        schema => schema_fields.push(schema_to_field(schema, Some(""), false)?),
+    }
+
+    let schema = Schema::new(schema_fields);
+    Ok(schema)
+}
+
+fn schema_to_field(
+    schema: &avro_rs::Schema,
+    name: Option<&str>,
+    nullable: bool,
+) -> Result<Field> {
+    schema_to_field_with_props(schema, name, nullable, Some(&Default::default()))
+}
+
+fn schema_to_field_with_props(
+    schema: &AvroSchema,
+    name: Option<&str>,
+    nullable: bool,
+    props: Option<&BTreeMap<String, String>>,
+) -> Result<Field> {
+    let mut nullable = nullable;
+    let field_type: DataType = match schema {
+        AvroSchema::Null => DataType::Null,
+        AvroSchema::Boolean => DataType::Boolean,
+        AvroSchema::Int => DataType::Int32,
+        AvroSchema::Long => DataType::Int64,
+        AvroSchema::Float => DataType::Float32,
+        AvroSchema::Double => DataType::Float64,
+        AvroSchema::Bytes => DataType::Binary,
+        AvroSchema::String => DataType::Utf8,
+        AvroSchema::Array(item_schema) => DataType::List(Box::new(
+            schema_to_field_with_props(item_schema, None, false, None)?,
+        )),
+        AvroSchema::Map(value_schema) => {
+            let value_field =
+                schema_to_field_with_props(value_schema, Some("value"), false, None)?;
+            DataType::Dictionary(
+                Box::new(DataType::Utf8),
+                Box::new(value_field.data_type().clone()),
+            )
+        }
+        AvroSchema::Union(us) => {
+            // If there are only two variants and one of them is null, set the other type as the field data type
+            let has_nullable = us.find_schema(&Value::Null).is_some();
+            let sub_schemas = us.variants();
+            if has_nullable && sub_schemas.len() == 2 {
+                nullable = true;
+                if let Some(schema) = sub_schemas
+                    .iter()
+                    .find(|&schema| !matches!(schema, AvroSchema::Null))
+                {
+                    schema_to_field_with_props(schema, None, has_nullable, None)?
+                        .data_type()
+                        .clone()
+                } else {
+                    return Err(DataFusionError::AvroError(
+                        avro_rs::Error::GetUnionDuplicate,
+                    ));
+                }
+            } else {
+                let fields = sub_schemas
+                    .iter()
+                    .map(|s| schema_to_field_with_props(s, None, has_nullable, None))
+                    .collect::<Result<Vec<Field>>>()?;
+                DataType::Union(fields)
+            }
+        }
+        AvroSchema::Record { name, fields, .. } => {
+            let fields: Result<Vec<Field>> = fields
+                .iter()
+                .map(|field| {
+                    let mut props = BTreeMap::new();
+                    if let Some(doc) = &field.doc {
+                        props.insert("doc".to_string(), doc.clone());
+                    }
+                    /*if let Some(aliases) = fields.aliases {
+                        props.insert("aliases", aliases);
+                    }*/
+                    schema_to_field_with_props(
+                        &field.schema,
+                        Some(&format!("{}.{}", name.fullname(None), field.name)),
+                        false,
+                        Some(&props),
+                    )
+                })
+                .collect();
+            DataType::Struct(fields?)
+        }
+        AvroSchema::Enum { symbols, name, .. } => {
+            return Ok(Field::new_dict(
+                &name.fullname(None),
+                index_type(symbols.len()),
+                false,
+                0,
+                false,
+            ))
+        }
+        AvroSchema::Fixed { size, .. } => DataType::FixedSizeBinary(*size as i32),
+        AvroSchema::Decimal {
+            precision, scale, ..
+        } => DataType::Decimal(*precision, *scale),
+        AvroSchema::Uuid => DataType::Utf8,
+        AvroSchema::Date => DataType::Date32,
+        AvroSchema::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
+        AvroSchema::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
+        AvroSchema::TimestampMillis => DataType::Timestamp(TimeUnit::Millisecond, None),
+        AvroSchema::TimestampMicros => DataType::Timestamp(TimeUnit::Microsecond, None),
+        AvroSchema::Duration => DataType::Duration(TimeUnit::Millisecond),
+    };
+
+    let data_type = field_type.clone();
+    let name = name.unwrap_or_else(|| default_field_name(&data_type));
+
+    let mut field = Field::new(name, field_type, nullable);
+    field.set_metadata(props.cloned());
+    Ok(field)
+}
+
+fn default_field_name(dt: &DataType) -> &str {
+    match dt {
+        DataType::Null => "null",
+        DataType::Boolean => "bit",
+        DataType::Int8 => "tinyint",
+        DataType::Int16 => "smallint",
+        DataType::Int32 => "int",
+        DataType::Int64 => "bigint",
+        DataType::UInt8 => "uint1",
+        DataType::UInt16 => "uint2",
+        DataType::UInt32 => "uint4",
+        DataType::UInt64 => "uint8",
+        DataType::Float16 => "float2",
+        DataType::Float32 => "float4",
+        DataType::Float64 => "float8",
+        DataType::Date32 => "dateday",
+        DataType::Date64 => "datemilli",
+        DataType::Time32(tu) | DataType::Time64(tu) => match tu {
+            TimeUnit::Second => "timesec",
+            TimeUnit::Millisecond => "timemilli",
+            TimeUnit::Microsecond => "timemicro",
+            TimeUnit::Nanosecond => "timenano",
+        },
+        DataType::Timestamp(tu, tz) => {
+            if tz.is_some() {
+                match tu {
+                    TimeUnit::Second => "timestampsectz",
+                    TimeUnit::Millisecond => "timestampmillitz",
+                    TimeUnit::Microsecond => "timestampmicrotz",
+                    TimeUnit::Nanosecond => "timestampnanotz",
+                }
+            } else {
+                match tu {
+                    TimeUnit::Second => "timestampsec",
+                    TimeUnit::Millisecond => "timestampmilli",
+                    TimeUnit::Microsecond => "timestampmicro",
+                    TimeUnit::Nanosecond => "timestampnano",
+                }
+            }
+        }
+        DataType::Duration(_) => "duration",
+        DataType::Interval(unit) => match unit {
+            IntervalUnit::YearMonth => "intervalyear",
+            IntervalUnit::DayTime => "intervalmonth",
+        },
+        DataType::Binary => "varbinary",
+        DataType::FixedSizeBinary(_) => "fixedsizebinary",
+        DataType::LargeBinary => "largevarbinary",
+        DataType::Utf8 => "varchar",
+        DataType::LargeUtf8 => "largevarchar",
+        DataType::List(_) => "list",
+        DataType::FixedSizeList(_, _) => "fixed_size_list",
+        DataType::LargeList(_) => "largelist",
+        DataType::Struct(_) => "struct",
+        DataType::Union(_) => "union",
+        DataType::Dictionary(_, _) => "map",
+        DataType::Decimal(_, _) => "decimal",
+    }
+}
+
+fn index_type(len: usize) -> DataType {
+    if len <= usize::from(u8::MAX) {
+        DataType::Int8
+    } else if len <= usize::from(u16::MAX) {
+        DataType::Int16
+    } else if usize::try_from(u32::MAX).map(|i| len < i).unwrap_or(false) {
+        DataType::Int32
+    } else {
+        DataType::Int64
+    }
+}
+
+fn external_props(schema: &AvroSchema) -> BTreeMap<String, String> {
+    let mut props = BTreeMap::new();
+    match &schema {
+        AvroSchema::Record {
+            doc: Some(ref doc), ..
+        }
+        | AvroSchema::Enum {
+            doc: Some(ref doc), ..
+        } => {
+            props.insert("doc".to_string(), doc.clone());

Review comment:
       I think you're right so I made the modif




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r701258301



##########
File path: datafusion/src/avro_to_arrow/schema.rs
##########
@@ -0,0 +1,452 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::datatypes::{DataType, IntervalUnit, Schema, TimeUnit};
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::Field;
+use avro_rs::schema::Name;
+use avro_rs::types::Value;
+use avro_rs::Schema as AvroSchema;
+use std::collections::BTreeMap;
+use std::convert::TryFrom;
+
+/// Converts an avro schema to an arrow schema
+pub fn to_arrow_schema(avro_schema: &avro_rs::Schema) -> Result<Schema> {
+    let mut schema_fields = vec![];
+    match avro_schema {
+        AvroSchema::Record { fields, .. } => {
+            for field in fields {
+                schema_fields.push(schema_to_field_with_props(
+                    &field.schema,
+                    Some(&field.name),
+                    false,
+                    Some(&external_props(&field.schema)),
+                )?)
+            }
+        }
+        schema => schema_fields.push(schema_to_field(schema, Some(""), false)?),
+    }
+
+    let schema = Schema::new(schema_fields);
+    Ok(schema)
+}
+
+fn schema_to_field(
+    schema: &avro_rs::Schema,
+    name: Option<&str>,
+    nullable: bool,
+) -> Result<Field> {
+    schema_to_field_with_props(schema, name, nullable, Some(&Default::default()))
+}
+
+fn schema_to_field_with_props(
+    schema: &AvroSchema,
+    name: Option<&str>,
+    nullable: bool,
+    props: Option<&BTreeMap<String, String>>,
+) -> Result<Field> {
+    let mut nullable = nullable;
+    let field_type: DataType = match schema {
+        AvroSchema::Null => DataType::Null,
+        AvroSchema::Boolean => DataType::Boolean,
+        AvroSchema::Int => DataType::Int32,
+        AvroSchema::Long => DataType::Int64,
+        AvroSchema::Float => DataType::Float32,
+        AvroSchema::Double => DataType::Float64,
+        AvroSchema::Bytes => DataType::Binary,
+        AvroSchema::String => DataType::Utf8,
+        AvroSchema::Array(item_schema) => DataType::List(Box::new(
+            schema_to_field_with_props(item_schema, None, false, None)?,
+        )),
+        AvroSchema::Map(value_schema) => {
+            let value_field =
+                schema_to_field_with_props(value_schema, Some("value"), false, None)?;
+            DataType::Dictionary(
+                Box::new(DataType::Utf8),
+                Box::new(value_field.data_type().clone()),
+            )
+        }
+        AvroSchema::Union(us) => {
+            // If there are only two variants and one of them is null, set the other type as the field data type
+            let has_nullable = us.find_schema(&Value::Null).is_some();
+            let sub_schemas = us.variants();
+            if has_nullable && sub_schemas.len() == 2 {
+                nullable = true;
+                if let Some(schema) = sub_schemas
+                    .iter()
+                    .find(|&schema| !matches!(schema, AvroSchema::Null))
+                {
+                    schema_to_field_with_props(schema, None, has_nullable, None)?
+                        .data_type()
+                        .clone()
+                } else {
+                    return Err(DataFusionError::AvroError(
+                        avro_rs::Error::GetUnionDuplicate,
+                    ));
+                }
+            } else {
+                let fields = sub_schemas
+                    .iter()
+                    .map(|s| schema_to_field_with_props(s, None, has_nullable, None))
+                    .collect::<Result<Vec<Field>>>()?;
+                DataType::Union(fields)
+            }
+        }
+        AvroSchema::Record { name, fields, .. } => {
+            let fields: Result<Vec<Field>> = fields
+                .iter()
+                .map(|field| {
+                    let mut props = BTreeMap::new();
+                    if let Some(doc) = &field.doc {
+                        props.insert("doc".to_string(), doc.clone());
+                    }
+                    /*if let Some(aliases) = fields.aliases {
+                        props.insert("aliases", aliases);
+                    }*/
+                    schema_to_field_with_props(
+                        &field.schema,
+                        Some(&format!("{}.{}", name.fullname(None), field.name)),
+                        false,
+                        Some(&props),
+                    )
+                })
+                .collect();
+            DataType::Struct(fields?)
+        }
+        AvroSchema::Enum { symbols, name, .. } => {
+            return Ok(Field::new_dict(
+                &name.fullname(None),
+                index_type(symbols.len()),
+                false,
+                0,
+                false,
+            ))
+        }
+        AvroSchema::Fixed { size, .. } => DataType::FixedSizeBinary(*size as i32),
+        AvroSchema::Decimal {
+            precision, scale, ..
+        } => DataType::Decimal(*precision, *scale),
+        AvroSchema::Uuid => DataType::Utf8,

Review comment:
       https://github.com/Igosuki/arrow-datafusion/commit/94ec6e637c5a985bfc17618f9dda2cc342fa0381




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r697996186



##########
File path: datafusion/tests/sql.rs
##########
@@ -4382,3 +4387,167 @@ async fn like_on_string_dictionaries() -> Result<()> {
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[cfg(feature = "avro")]
+#[tokio::test]
+async fn avro_query() {
+    let mut ctx = ExecutionContext::new();
+    register_alltypes_avro(&mut ctx);
+    // NOTE that string_col is actually a binary column and does not have the UTF8 logical type
+    // so we need an explicit cast
+    let sql = "SELECT id, CAST(string_col AS varchar) FROM alltypes_plain";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+----+--------------------------+",
+        "| id | CAST(string_col AS Utf8) |",
+        "+----+--------------------------+",
+        "| 4  | 0                        |",
+        "| 5  | 1                        |",
+        "| 6  | 0                        |",
+        "| 7  | 1                        |",
+        "| 2  | 0                        |",
+        "| 3  | 1                        |",
+        "| 0  | 0                        |",
+        "| 1  | 1                        |",
+        "+----+--------------------------+",
+    ];
+
+    assert_batches_eq!(expected, &actual);
+}
+
+#[cfg(feature = "avro")]
+#[tokio::test]
+async fn avro_query_multiple_files() {
+    let tempdir = tempfile::tempdir().unwrap();
+    let table_path = tempdir.path();
+    let testdata = datafusion::test_util::arrow_test_data();
+    let alltypes_plain_file = format!("{}/avro/alltypes_plain.avro", testdata);
+    std::fs::copy(
+        &alltypes_plain_file,
+        format!("{}/alltypes_plain1.avro", table_path.display()),
+    )
+    .unwrap();
+    std::fs::copy(
+        &alltypes_plain_file,
+        format!("{}/alltypes_plain2.avro", table_path.display()),
+    )
+    .unwrap();
+
+    let mut ctx = ExecutionContext::new();
+    ctx.register_avro(
+        "alltypes_plain",
+        table_path.display().to_string().as_str(),
+        AvroReadOptions::default(),
+    )
+    .unwrap();
+    // NOTE that string_col is actually a binary column and does not have the UTF8 logical type
+    // so we need an explicit cast
+    let sql = "SELECT id, CAST(string_col AS varchar) FROM alltypes_plain";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+----+--------------------------+",
+        "| id | CAST(string_col AS Utf8) |",
+        "+----+--------------------------+",
+        "| 4  | 0                        |",
+        "| 5  | 1                        |",
+        "| 6  | 0                        |",
+        "| 7  | 1                        |",
+        "| 2  | 0                        |",
+        "| 3  | 1                        |",
+        "| 0  | 0                        |",
+        "| 1  | 1                        |",
+        "| 4  | 0                        |",
+        "| 5  | 1                        |",
+        "| 6  | 0                        |",
+        "| 7  | 1                        |",
+        "| 2  | 0                        |",
+        "| 3  | 1                        |",
+        "| 0  | 0                        |",
+        "| 1  | 1                        |",
+        "+----+--------------------------+",
+    ];
+
+    assert_batches_eq!(expected, &actual);
+}
+
+#[cfg(feature = "avro")]
+#[tokio::test]
+async fn avro_single_nan_schema() {
+    let mut ctx = ExecutionContext::new();
+    let testdata = datafusion::test_util::arrow_test_data();
+    ctx.register_avro(
+        "single_nan",
+        &format!("{}/avro/single_nan.avro", testdata),
+        AvroReadOptions::default(),
+    )
+    .unwrap();
+    let sql = "SELECT mycol FROM single_nan";
+    let plan = ctx.create_logical_plan(sql).unwrap();
+    let plan = ctx.optimize(&plan).unwrap();
+    let plan = ctx.create_physical_plan(&plan).unwrap();
+    let results = collect(plan).await.unwrap();
+    for batch in results {
+        assert_eq!(1, batch.num_rows());
+        assert_eq!(1, batch.num_columns());
+    }
+}
+
+#[cfg(feature = "avro")]
+#[tokio::test]
+async fn avro_explain() {
+    let mut ctx = ExecutionContext::new();
+    register_alltypes_avro(&mut ctx);
+
+    let sql = "EXPLAIN SELECT count(*) from alltypes_plain";
+    let actual = execute(&mut ctx, sql).await;
+    let actual = normalize_vec_for_explain(actual);
+    let expected = vec![
+        vec![
+            "logical_plan",
+            "Projection: #COUNT(UInt8(1))\
+            \n  Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\
+            \n    TableScan: alltypes_plain projection=Some([0])",
+        ],
+        vec![
+            "physical_plan",
+            "ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))]\
+            \n  HashAggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\
+            \n    CoalescePartitionsExec\
+            \n      HashAggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\
+            \n        RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
+            \n          ExecutionPlan(PlaceHolder)\

Review comment:
       note the fact that this says `Placeholder` can be fixed by implementing `fmt_as` in the Avro `ExecutionPlan`

##########
File path: datafusion/tests/sql.rs
##########
@@ -4382,3 +4387,167 @@ async fn like_on_string_dictionaries() -> Result<()> {
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[cfg(feature = "avro")]
+#[tokio::test]
+async fn avro_query() {
+    let mut ctx = ExecutionContext::new();
+    register_alltypes_avro(&mut ctx);
+    // NOTE that string_col is actually a binary column and does not have the UTF8 logical type
+    // so we need an explicit cast
+    let sql = "SELECT id, CAST(string_col AS varchar) FROM alltypes_plain";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+----+--------------------------+",
+        "| id | CAST(string_col AS Utf8) |",
+        "+----+--------------------------+",
+        "| 4  | 0                        |",
+        "| 5  | 1                        |",
+        "| 6  | 0                        |",
+        "| 7  | 1                        |",
+        "| 2  | 0                        |",
+        "| 3  | 1                        |",
+        "| 0  | 0                        |",
+        "| 1  | 1                        |",
+        "+----+--------------------------+",
+    ];
+
+    assert_batches_eq!(expected, &actual);
+}
+
+#[cfg(feature = "avro")]
+#[tokio::test]
+async fn avro_query_multiple_files() {
+    let tempdir = tempfile::tempdir().unwrap();
+    let table_path = tempdir.path();
+    let testdata = datafusion::test_util::arrow_test_data();
+    let alltypes_plain_file = format!("{}/avro/alltypes_plain.avro", testdata);
+    std::fs::copy(
+        &alltypes_plain_file,
+        format!("{}/alltypes_plain1.avro", table_path.display()),
+    )
+    .unwrap();
+    std::fs::copy(
+        &alltypes_plain_file,
+        format!("{}/alltypes_plain2.avro", table_path.display()),
+    )
+    .unwrap();
+
+    let mut ctx = ExecutionContext::new();
+    ctx.register_avro(
+        "alltypes_plain",
+        table_path.display().to_string().as_str(),
+        AvroReadOptions::default(),
+    )
+    .unwrap();
+    // NOTE that string_col is actually a binary column and does not have the UTF8 logical type
+    // so we need an explicit cast
+    let sql = "SELECT id, CAST(string_col AS varchar) FROM alltypes_plain";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+----+--------------------------+",
+        "| id | CAST(string_col AS Utf8) |",
+        "+----+--------------------------+",
+        "| 4  | 0                        |",
+        "| 5  | 1                        |",
+        "| 6  | 0                        |",
+        "| 7  | 1                        |",
+        "| 2  | 0                        |",
+        "| 3  | 1                        |",
+        "| 0  | 0                        |",
+        "| 1  | 1                        |",
+        "| 4  | 0                        |",
+        "| 5  | 1                        |",
+        "| 6  | 0                        |",
+        "| 7  | 1                        |",
+        "| 2  | 0                        |",
+        "| 3  | 1                        |",
+        "| 0  | 0                        |",
+        "| 1  | 1                        |",
+        "+----+--------------------------+",
+    ];
+
+    assert_batches_eq!(expected, &actual);
+}
+
+#[cfg(feature = "avro")]
+#[tokio::test]
+async fn avro_single_nan_schema() {
+    let mut ctx = ExecutionContext::new();
+    let testdata = datafusion::test_util::arrow_test_data();
+    ctx.register_avro(
+        "single_nan",
+        &format!("{}/avro/single_nan.avro", testdata),
+        AvroReadOptions::default(),
+    )
+    .unwrap();
+    let sql = "SELECT mycol FROM single_nan";
+    let plan = ctx.create_logical_plan(sql).unwrap();
+    let plan = ctx.optimize(&plan).unwrap();
+    let plan = ctx.create_physical_plan(&plan).unwrap();
+    let results = collect(plan).await.unwrap();
+    for batch in results {
+        assert_eq!(1, batch.num_rows());
+        assert_eq!(1, batch.num_columns());
+    }
+}
+
+#[cfg(feature = "avro")]
+#[tokio::test]
+async fn avro_explain() {
+    let mut ctx = ExecutionContext::new();
+    register_alltypes_avro(&mut ctx);
+
+    let sql = "EXPLAIN SELECT count(*) from alltypes_plain";
+    let actual = execute(&mut ctx, sql).await;
+    let actual = normalize_vec_for_explain(actual);
+    let expected = vec![
+        vec![
+            "logical_plan",
+            "Projection: #COUNT(UInt8(1))\
+            \n  Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\
+            \n    TableScan: alltypes_plain projection=Some([0])",
+        ],
+        vec![
+            "physical_plan",
+            "ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))]\
+            \n  HashAggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\
+            \n    CoalescePartitionsExec\
+            \n      HashAggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\
+            \n        RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
+            \n          ExecutionPlan(PlaceHolder)\
+            \n",
+        ],
+    ];
+    assert_eq!(expected, actual);
+}
+
+#[cfg(feature = "avro")]
+#[tokio::test]
+async fn avro_explain_analyze() {

Review comment:
       I don't think this particular test adds  a lot of test coverage -- the non avro version is to ensure execution metrics are propagated correctly.

##########
File path: .github/workflows/rust.yml
##########
@@ -105,13 +105,14 @@ jobs:
         run: |
           export ARROW_TEST_DATA=$(pwd)/testing/data
           export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data
-          # run tests on all workspace members with default feature list
-          cargo test
+          # run tests on all workspace members with default feature list + avro

Review comment:
       👍 

##########
File path: datafusion/tests/sql.rs
##########
@@ -4382,3 +4387,167 @@ async fn like_on_string_dictionaries() -> Result<()> {
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[cfg(feature = "avro")]
+#[tokio::test]
+async fn avro_query() {
+    let mut ctx = ExecutionContext::new();
+    register_alltypes_avro(&mut ctx);
+    // NOTE that string_col is actually a binary column and does not have the UTF8 logical type
+    // so we need an explicit cast
+    let sql = "SELECT id, CAST(string_col AS varchar) FROM alltypes_plain";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+----+--------------------------+",
+        "| id | CAST(string_col AS Utf8) |",
+        "+----+--------------------------+",
+        "| 4  | 0                        |",
+        "| 5  | 1                        |",
+        "| 6  | 0                        |",
+        "| 7  | 1                        |",
+        "| 2  | 0                        |",
+        "| 3  | 1                        |",
+        "| 0  | 0                        |",
+        "| 1  | 1                        |",
+        "+----+--------------------------+",
+    ];
+
+    assert_batches_eq!(expected, &actual);
+}
+
+#[cfg(feature = "avro")]
+#[tokio::test]
+async fn avro_query_multiple_files() {

Review comment:
       ❤️ 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Jimexist commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Jimexist commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r698107150



##########
File path: datafusion/src/logical_plan/expr.rs
##########
@@ -1911,17 +1911,13 @@ mod tests {
     impl ExprRewriter for FooBarRewriter {
         fn mutate(&mut self, expr: Expr) -> Result<Expr> {
             match expr {
-                Expr::Literal(scalar) => {
-                    if let ScalarValue::Utf8(Some(utf8_val)) = scalar {
-                        let utf8_val = if utf8_val == "foo" {
-                            "bar".to_string()
-                        } else {
-                            utf8_val
-                        };
-                        Ok(lit(utf8_val))
+                Expr::Literal(ScalarValue::Utf8(Some(utf8_val))) => {
+                    let utf8_val = if utf8_val == "foo" {

Review comment:
       maybe this isn't intended to be merged?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-917621958


   @nevi-me fixed
   
   Found the following clippy errors unrelated to this PR with the latest nightly : 
   ```
   error: field is never read: `data_type`
     --> datafusion/src/physical_plan/expressions/average.rs:40:5
      |
   40 |     data_type: DataType,
      |     ^^^^^^^^^^^^^^^^^^^
      |
      = note: `-D dead-code` implied by `-D warnings`
   
   error: field is never read: `nullable`
     --> datafusion/src/physical_plan/expressions/average.rs:41:5
      |
   41 |     nullable: bool,
      |     ^^^^^^^^^^^^^^
   
   error: field is never read: `metrics`
     --> datafusion/src/physical_plan/parquet.rs:99:5
      |
   99 |     metrics: ExecutionPlanMetricsSet,
      |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
   error: field is never read: `fun`
     --> datafusion/src/physical_plan/windows/built_in.rs:36:5
      |
   36 |     fun: BuiltInWindowFunction,
      |     ^^^^^^^^^^^^^^^^^^^^^^^^^^
   
   error: field is never read: `window_frame`
     --> datafusion/src/physical_plan/windows/built_in.rs:40:5
      |
   40 |     window_frame: Option<WindowFrame>,
      |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
   error: this `impl` can be derived
      --> datafusion/src/physical_plan/planner.rs:238:1
       |
   238 | / impl Default for DefaultPhysicalPlanner {
   239 | |     fn default() -> Self {
   240 | |         Self {
   241 | |             extension_planners: vec![],
   242 | |         }
   243 | |     }
   244 | | }
       | |_^
   
   ```
   Had to fix like 20 clippy errors on my personal project with the new nightly 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r714324727



##########
File path: datafusion/src/avro_to_arrow/schema.rs
##########
@@ -0,0 +1,464 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::datatypes::{DataType, IntervalUnit, Schema, TimeUnit};
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::Field;
+use avro_rs::schema::Name;
+use avro_rs::types::Value;
+use avro_rs::Schema as AvroSchema;
+use std::collections::BTreeMap;
+use std::convert::TryFrom;
+
+/// Converts an avro schema to an arrow schema
+pub fn to_arrow_schema(avro_schema: &avro_rs::Schema) -> Result<Schema> {
+    let mut schema_fields = vec![];
+    match avro_schema {
+        AvroSchema::Record { fields, .. } => {
+            for field in fields {
+                schema_fields.push(schema_to_field_with_props(
+                    &field.schema,
+                    Some(&field.name),
+                    false,
+                    Some(&external_props(&field.schema)),
+                )?)
+            }
+        }
+        schema => schema_fields.push(schema_to_field(schema, Some(""), false)?),
+    }
+
+    let schema = Schema::new(schema_fields);
+    Ok(schema)
+}
+
+fn schema_to_field(
+    schema: &avro_rs::Schema,
+    name: Option<&str>,
+    nullable: bool,
+) -> Result<Field> {
+    schema_to_field_with_props(schema, name, nullable, None)
+}
+
+fn schema_to_field_with_props(
+    schema: &AvroSchema,
+    name: Option<&str>,
+    nullable: bool,
+    props: Option<&BTreeMap<String, String>>,
+) -> Result<Field> {
+    let mut nullable = nullable;
+    let field_type: DataType = match schema {
+        AvroSchema::Null => DataType::Null,
+        AvroSchema::Boolean => DataType::Boolean,
+        AvroSchema::Int => DataType::Int32,
+        AvroSchema::Long => DataType::Int64,
+        AvroSchema::Float => DataType::Float32,
+        AvroSchema::Double => DataType::Float64,
+        AvroSchema::Bytes => DataType::Binary,
+        AvroSchema::String => DataType::Utf8,
+        AvroSchema::Array(item_schema) => DataType::List(Box::new(
+            schema_to_field_with_props(item_schema, None, false, None)?,
+        )),
+        AvroSchema::Map(value_schema) => {
+            let value_field =
+                schema_to_field_with_props(value_schema, Some("value"), false, None)?;
+            DataType::Dictionary(
+                Box::new(DataType::Utf8),
+                Box::new(value_field.data_type().clone()),
+            )
+        }
+        AvroSchema::Union(us) => {
+            // If there are only two variants and one of them is null, set the other type as the field data type
+            let has_nullable = us.find_schema(&Value::Null).is_some();
+            let sub_schemas = us.variants();
+            if has_nullable && sub_schemas.len() == 2 {
+                nullable = true;
+                if let Some(schema) = sub_schemas
+                    .iter()
+                    .find(|&schema| !matches!(schema, AvroSchema::Null))
+                {
+                    schema_to_field_with_props(schema, None, has_nullable, None)?
+                        .data_type()
+                        .clone()
+                } else {
+                    return Err(DataFusionError::AvroError(
+                        avro_rs::Error::GetUnionDuplicate,
+                    ));
+                }
+            } else {
+                let fields = sub_schemas
+                    .iter()
+                    .map(|s| schema_to_field_with_props(s, None, has_nullable, None))
+                    .collect::<Result<Vec<Field>>>()?;
+                DataType::Union(fields)
+            }
+        }
+        AvroSchema::Record { name, fields, .. } => {
+            let fields: Result<Vec<Field>> = fields
+                .iter()
+                .map(|field| {
+                    let mut props = BTreeMap::new();
+                    if let Some(doc) = &field.doc {
+                        props.insert("avro::doc".to_string(), doc.clone());
+                    }
+                    /*if let Some(aliases) = fields.aliases {
+                        props.insert("aliases", aliases);
+                    }*/
+                    schema_to_field_with_props(
+                        &field.schema,
+                        Some(&format!("{}.{}", name.fullname(None), field.name)),
+                        false,
+                        Some(&props),
+                    )
+                })
+                .collect();
+            DataType::Struct(fields?)
+        }
+        AvroSchema::Enum { symbols, name, .. } => {
+            return Ok(Field::new_dict(
+                &name.fullname(None),
+                index_type(symbols.len()),

Review comment:
       @jorgecarleitao thanks for pointing that out, I should fix it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-904936338


   @alamb Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-917631419


   I am wondering if those errors are related to running the tests in debug mode. This leaves out any optimizations that reduce stack usage for local variables and call stack which might increase the chance of running into this much more easily. 
   
   Does the error go away when running the tests in release mode?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-905801351


   @alamb will address everything soon


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-918329902


   The thing is none of the queries in this PR have deep nesting, the default stack in Rust is just not large enough to support what the test harness + recursion is doing even with just a little bit of nesting.
   The only proper solution I foresee as outlined by @Dandandan is to re-create an AST of ColumnarValues through a heap based stack when walking the tree. I've already looked into it but not sure I'm the best person to do this refactoring


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r696641994



##########
File path: datafusion/src/avro_to_arrow/arrow_array_reader.rs
##########
@@ -0,0 +1,1093 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Avro to Arrow array readers
+
+use crate::arrow::array::{
+    make_array, Array, ArrayBuilder, ArrayData, ArrayDataBuilder, ArrayRef,
+    BooleanBuilder, LargeStringArray, ListBuilder, NullArray, OffsetSizeTrait,
+    PrimitiveArray, PrimitiveBuilder, StringArray, StringBuilder,
+    StringDictionaryBuilder,
+};
+use crate::arrow::buffer::{Buffer, MutableBuffer};
+use crate::arrow::datatypes::{
+    ArrowDictionaryKeyType, ArrowNumericType, ArrowPrimitiveType, DataType, Date32Type,
+    Date64Type, Field, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type,
+    Int8Type, Schema, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
+    Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType,
+    TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, UInt64Type,
+    UInt8Type,
+};
+use crate::arrow::error::ArrowError;
+use crate::arrow::record_batch::RecordBatch;
+use crate::arrow::util::bit_util;
+use crate::error::{DataFusionError, Result};
+use arrow::array::{BinaryArray, GenericListArray};
+use arrow::datatypes::SchemaRef;
+use arrow::error::ArrowError::SchemaError;
+use arrow::error::Result as ArrowResult;
+use avro_rs::schema::Schema as AvroSchema;
+use avro_rs::schema::SchemaKind;
+use avro_rs::types::Value;
+use avro_rs::{AvroResult, Reader as AvroReader};
+use num_traits::NumCast;
+use std::collections::HashMap;
+use std::io::Read;
+use std::sync::Arc;
+
+type RecordSlice<'a> = &'a [Vec<(String, Value)>];
+
+pub struct AvroArrowArrayReader<'a, R: Read> {
+    reader: AvroReader<'a, R>,
+    schema: SchemaRef,
+    projection: Option<Vec<String>>,
+    schema_lookup: HashMap<String, usize>,
+}
+
+impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
+    pub fn try_new(
+        reader: AvroReader<'a, R>,
+        schema: SchemaRef,
+        projection: Option<Vec<String>>,
+    ) -> Result<Self> {
+        let writer_schema = reader.writer_schema().clone();
+        let schema_lookup = Self::schema_lookup(writer_schema)?;
+        Ok(Self {
+            reader,
+            schema,
+            projection,
+            schema_lookup,
+        })
+    }
+
+    pub fn schema_lookup(schema: AvroSchema) -> Result<HashMap<String, usize>> {
+        match schema {
+            AvroSchema::Record {
+                lookup: ref schema_lookup,
+                ..
+            } => Ok(schema_lookup.clone()),
+            _ => Err(DataFusionError::ArrowError(SchemaError(
+                "expected avro schema to be a record".to_string(),
+            ))),
+        }
+    }
+
+    /// Read the next batch of records
+    #[allow(clippy::should_implement_trait)]
+    pub fn next_batch(&mut self, batch_size: usize) -> ArrowResult<Option<RecordBatch>> {
+        let mut rows = Vec::with_capacity(batch_size);
+        for value in self.reader.by_ref().take(batch_size) {
+            let v = value.map_err(|e| {
+                ArrowError::ParseError(format!("Failed to parse avro value: {:?}", e))
+            })?;
+            match v {
+                Value::Record(v) => {
+                    rows.push(v);
+                }
+                other => {
+                    return Err(ArrowError::ParseError(format!(
+                        "Row needs to be of type object, got: {:?}",
+                        other
+                    )))
+                }
+            }
+        }
+        if rows.is_empty() {
+            // reached end of file
+            return Ok(None);
+        }
+        let rows = &rows[..];
+        let projection = self.projection.clone().unwrap_or_else(Vec::new);
+        let arrays = self.build_struct_array(rows, self.schema.fields(), &projection);
+        let projected_fields: Vec<Field> = if projection.is_empty() {
+            self.schema.fields().to_vec()
+        } else {
+            projection
+                .iter()
+                .map(|name| self.schema.column_with_name(name))
+                .flatten()
+                .map(|(_, field)| field.clone())
+                .collect()
+        };
+        let projected_schema = Arc::new(Schema::new(projected_fields));
+        arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr).map(Some))
+    }
+
+    fn build_boolean_array(
+        &self,
+        rows: RecordSlice,
+        col_name: &str,
+    ) -> ArrowResult<ArrayRef> {
+        let mut builder = BooleanBuilder::new(rows.len());
+        for row in rows {
+            if let Some(value) = self.field_lookup(col_name, row) {
+                if let Some(boolean) = resolve_boolean(&value) {
+                    builder.append_value(boolean)?
+                } else {
+                    builder.append_null()?;
+                }
+            } else {
+                builder.append_null()?;
+            }
+        }
+        Ok(Arc::new(builder.finish()))
+    }
+
+    #[allow(clippy::unnecessary_wraps)]
+    fn build_primitive_array<T: ArrowPrimitiveType + Resolver>(
+        &self,
+        rows: RecordSlice,
+        col_name: &str,
+    ) -> ArrowResult<ArrayRef>
+    where
+        T: ArrowNumericType,
+        T::Native: num_traits::cast::NumCast,
+    {
+        Ok(Arc::new(
+            rows.iter()
+                .map(|row| {
+                    self.field_lookup(col_name, row)
+                        .and_then(|value| resolve_item::<T>(&value))
+                })
+                .collect::<PrimitiveArray<T>>(),
+        ))
+    }
+
+    #[inline(always)]
+    #[allow(clippy::unnecessary_wraps)]
+    fn build_string_dictionary_builder<T>(
+        &self,
+        row_len: usize,
+    ) -> ArrowResult<StringDictionaryBuilder<T>>
+    where
+        T: ArrowPrimitiveType + ArrowDictionaryKeyType,
+    {
+        let key_builder = PrimitiveBuilder::<T>::new(row_len);
+        let values_builder = StringBuilder::new(row_len * 5);
+        Ok(StringDictionaryBuilder::new(key_builder, values_builder))
+    }
+
+    fn build_wrapped_list_array(
+        &self,
+        rows: RecordSlice,
+        col_name: &str,
+        key_type: &DataType,
+    ) -> ArrowResult<ArrayRef> {
+        match *key_type {
+            DataType::Int8 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::Int8),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<Int8Type>(&dtype, col_name, rows)
+            }
+            DataType::Int16 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::Int16),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<Int16Type>(&dtype, col_name, rows)
+            }
+            DataType::Int32 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::Int32),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<Int32Type>(&dtype, col_name, rows)
+            }
+            DataType::Int64 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::Int64),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<Int64Type>(&dtype, col_name, rows)
+            }
+            DataType::UInt8 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::UInt8),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<UInt8Type>(&dtype, col_name, rows)
+            }
+            DataType::UInt16 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::UInt16),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<UInt16Type>(&dtype, col_name, rows)
+            }
+            DataType::UInt32 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::UInt32),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<UInt32Type>(&dtype, col_name, rows)
+            }
+            DataType::UInt64 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::UInt64),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<UInt64Type>(&dtype, col_name, rows)
+            }
+            ref e => Err(SchemaError(format!(
+                "Data type is currently not supported for dictionaries in list : {:?}",
+                e
+            ))),
+        }
+    }
+
+    #[inline(always)]
+    fn list_array_string_array_builder<D>(
+        &self,
+        data_type: &DataType,
+        col_name: &str,
+        rows: RecordSlice,
+    ) -> ArrowResult<ArrayRef>
+    where
+        D: ArrowPrimitiveType + ArrowDictionaryKeyType,
+    {
+        let mut builder: Box<dyn ArrayBuilder> = match data_type {
+            DataType::Utf8 => {
+                let values_builder = StringBuilder::new(rows.len() * 5);
+                Box::new(ListBuilder::new(values_builder))
+            }
+            DataType::Dictionary(_, _) => {
+                let values_builder =
+                    self.build_string_dictionary_builder::<D>(rows.len() * 5)?;
+                Box::new(ListBuilder::new(values_builder))
+            }
+            e => {
+                return Err(SchemaError(format!(
+                    "Nested list data builder type is not supported: {:?}",
+                    e
+                )))
+            }
+        };
+
+        for row in rows {
+            if let Some(value) = self.field_lookup(col_name, row) {
+                // value can be an array or a scalar
+                let vals: Vec<Option<String>> = if let Value::String(v) = value {
+                    vec![Some(v.to_string())]
+                } else if let Value::Array(n) = value {
+                    n.into_iter()
+                        .map(|v| {
+                            resolve_string(&v)
+                            // else if matches!(

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-918375756


   To get this PR in, how about we adding a workaround in CI to increase the stack size with something like `RUSTFLAGS="-C link-args=-Wl,-zstack-size=4194304"` as descsribed in https://www.reddit.com/r/rust/comments/872fc4/how_to_increase_the_stack_size/
   
   And then we can file a follow on ticket to fix it more properly (e.g. reduce recursion)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb merged pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
alamb merged pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r696603576



##########
File path: datafusion/src/physical_plan/avro.rs
##########
@@ -0,0 +1,395 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for reading line-delimited Avro files
+use async_trait::async_trait;
+use futures::Stream;
+
+use super::{common, source::Source, ExecutionPlan, Partitioning, RecordBatchStream};
+use crate::avro_to_arrow::infer_avro_schema_from_reader;
+use crate::error::{DataFusionError, Result};
+use arrow::{
+    datatypes::{Schema, SchemaRef},
+    error::Result as ArrowResult,
+    record_batch::RecordBatch,
+};
+use std::fs::File;
+use std::{any::Any, io::Seek};
+use std::{
+    io::Read,
+    pin::Pin,
+    sync::{Arc, Mutex},
+    task::{Context, Poll},
+};
+
+/// Line-delimited Avro read options
+#[derive(Clone)]
+pub struct AvroReadOptions<'a> {
+    /// The data source schema.
+    pub schema: Option<SchemaRef>,
+
+    /// File extension; only files with this extension are selected for data input.
+    /// Defaults to ".avro".
+    pub file_extension: &'a str,
+}
+
+impl<'a> Default for AvroReadOptions<'a> {
+    fn default() -> Self {
+        Self {
+            schema: None,
+            file_extension: ".avro",
+        }
+    }
+}
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+/// Execution plan for scanning Avro data source
+#[derive(Debug)]
+pub struct AvroExec {
+    source: Source<Box<dyn SeekRead + Send + Sync>>,
+    schema: SchemaRef,
+    projection: Option<Vec<usize>>,
+    projected_schema: SchemaRef,
+    file_extension: String,
+    batch_size: usize,
+    limit: Option<usize>,
+}
+
+impl AvroExec {
+    /// Create a new execution plan for reading from a path
+    pub fn try_from_path(
+        path: &str,
+        options: AvroReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let file_extension = options.file_extension.to_string();
+
+        let filenames = common::build_file_list(path, &file_extension)?;
+
+        if filenames.is_empty() {
+            return Err(DataFusionError::Execution(format!(
+                "No files found at {path} with file extension {file_extension}",
+                path = path,
+                file_extension = file_extension.as_str()
+            )));
+        }
+
+        let schema = match options.schema {
+            Some(s) => s,
+            None => Arc::new(AvroExec::try_infer_schema(filenames.as_slice())?),
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::PartitionedFiles {
+                path: path.to_string(),
+                filenames,
+            },
+            schema,
+            projected_schema,
+            file_extension,
+            projection,
+            batch_size,
+            limit,
+        })
+    }
+    /// Create a new execution plan for reading from a reader
+    pub fn try_new_from_reader(
+        reader: impl Read + Seek + Send + Sync + 'static,
+        options: AvroReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let schema = match options.schema {
+            Some(s) => s,
+            None => {
+                return Err(DataFusionError::Execution(
+                    "The schema must be provided in options when reading from a reader"
+                        .to_string(),
+                ));
+            }
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+            schema,
+            file_extension: String::new(),
+            projection,
+            projected_schema,
+            batch_size,
+            limit,
+        })
+    }
+
+    /// Path to directory containing partitioned CSV files with the same schema
+    pub fn path(&self) -> &str {
+        self.source.path()
+    }
+
+    /// The individual files under path
+    pub fn filenames(&self) -> &[String] {
+        self.source.filenames()
+    }
+
+    /// File extension
+    pub fn file_extension(&self) -> &str {
+        &self.file_extension
+    }
+
+    /// Get the schema of the avro file
+    pub fn file_schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Optional projection for which columns to load
+    pub fn projection(&self) -> Option<&Vec<usize>> {
+        self.projection.as_ref()
+    }
+
+    /// Batch size
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Limit
+    pub fn limit(&self) -> Option<usize> {
+        self.limit
+    }
+
+    /// Infer schema for given Avro dataset
+    pub fn try_infer_schema(filenames: &[String]) -> Result<Schema> {
+        let mut schemas = Vec::new();
+        for filename in filenames {
+            let mut file = File::open(filename)?;
+            let schema = infer_avro_schema_from_reader(&mut file)?;
+            schemas.push(schema);
+        }
+
+        Ok(Schema::try_merge(schemas)?)
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for AvroExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.projected_schema.clone()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(match &self.source {
+            Source::PartitionedFiles { filenames, .. } => filenames.len(),
+            Source::Reader(_) => 1,
+        })
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        Vec::new()
+    }
+
+    fn with_new_children(

Review comment:
       That feature works with directories, but how do you register the same file twice since you only register the table once with a filename ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r706354917



##########
File path: datafusion/src/avro_to_arrow/mod.rs
##########
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains utilities to manipulate avro metadata.
+
+#[cfg(feature = "avro")]

Review comment:
       I think this would also be ok to do as a follow on PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-916430044


   Thanks @Igosuki  - I also just got back. I think this PR is close. I plan to review it again carefully tomorrow. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r701249729



##########
File path: datafusion/src/avro_to_arrow/mod.rs
##########
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains utilities to manipulate avro metadata.
+
+#[cfg(feature = "avro")]
+mod arrow_array_reader;
+#[cfg(feature = "avro")]
+mod reader;
+#[cfg(feature = "avro")]
+mod schema;
+
+use crate::arrow::datatypes::Schema;
+use crate::error::Result;
+#[cfg(feature = "avro")]
+pub use reader::{Reader, ReaderBuilder};
+use std::io::{Read, Seek};
+
+#[cfg(feature = "avro")]
+/// Infer Avro schema given a reader
+pub fn infer_avro_schema_from_reader<R: Read + Seek>(reader: &mut R) -> Result<Schema> {

Review comment:
       It is actually reading the schema from the input and not infering. I went that way initially, because in the Java world, there is such a use case as infering avro schemas from a stream of avro datums.
   The use case would be streaming data into arrow from avro, but it's not the case here so we can simply remove Seek and rename this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] NGA-TRAN commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
NGA-TRAN commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-918262642


   @alamb Since @Igosuki and @Dandandan have figured out the problems, I just add a minor observation. Query q12.sql that caused stack overflow in the original test, has 5 expressions in the conjunctive where expressions. It seems the one reported in #419 gone thru some extra work in IOx and has 8-level expressions. So I think this problem was exaggerated by other factor  as Avro feature found by @Igosuki .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-920097872


   Thanks @Igosuki great contribution! Looking forward to future contributions 😁


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-919939740


   Tests pass on my local environment setting RUST_MIN_STACK_SIZE


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki edited a comment on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki edited a comment on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-919939740


   Tests pass on my local environment setting RUST_MIN_STACK_SIZE without changing cargo, I put it in the github workflow to see if it passes 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-906270531


   PR in arrow-testing https://github.com/apache/arrow-testing/pull/62


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r706361582



##########
File path: datafusion/src/physical_plan/mod.rs
##########
@@ -570,6 +570,7 @@ pub trait Accumulator: Send + Sync + Debug {
 pub mod aggregates;
 pub mod analyze;
 pub mod array_expressions;
+pub mod avro;

Review comment:
       I think @nevi-me  is suggesting something like
   
   ```suggestion
   #[cfg(feature = "avro")]
   pub mod avro;
   ```
   
   Which would then let you avoid so much `#[cfg...]` in https://github.com/apache/arrow-datafusion/pull/910#discussion_r700593118
   
   I am not sure what other changes that entails




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r698135614



##########
File path: datafusion/src/logical_plan/expr.rs
##########
@@ -1911,17 +1911,13 @@ mod tests {
     impl ExprRewriter for FooBarRewriter {
         fn mutate(&mut self, expr: Expr) -> Result<Expr> {
             match expr {
-                Expr::Literal(scalar) => {
-                    if let ScalarValue::Utf8(Some(utf8_val)) = scalar {
-                        let utf8_val = if utf8_val == "foo" {
-                            "bar".to_string()
-                        } else {
-                            utf8_val
-                        };
-                        Ok(lit(utf8_val))
+                Expr::Literal(ScalarValue::Utf8(Some(utf8_val))) => {
+                    let utf8_val = if utf8_val == "foo" {

Review comment:
       @Jimexist I get a clippy failure on nightly : 
   ```
   error: unnecessary nested `if let` or `match`
       --> datafusion/src/logical_plan/expr.rs:1915:21
        |
   1915 | /                     if let ScalarValue::Utf8(Some(utf8_val)) = scalar {
   1916 | |                         let utf8_val = if utf8_val == "foo" {
   1917 | |                             "bar".to_string()
   1918 | |                         } else {
   ...    |
   1923 | |                         Ok(Expr::Literal(scalar))
   1924 | |                     }
        | |_____________________^
        |
        = note: `-D clippy::collapsible-match` implied by `-D warnings`
   help: the outer pattern can be modified to include the inner pattern
       --> datafusion/src/logical_plan/expr.rs:1914:31
        |
   1914 |                 Expr::Literal(scalar) => {
        |                               ^^^^^^ replace this binding
   1915 |                     if let ScalarValue::Utf8(Some(utf8_val)) = scalar {
        |                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ with this pattern
        = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#collapsible_match
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#issuecomment-917635206


   Then am I assuming cargo test runs benches on the dev profile...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] jorgecarleitao commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r709404501



##########
File path: datafusion/src/avro_to_arrow/schema.rs
##########
@@ -0,0 +1,464 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::datatypes::{DataType, IntervalUnit, Schema, TimeUnit};
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::Field;
+use avro_rs::schema::Name;
+use avro_rs::types::Value;
+use avro_rs::Schema as AvroSchema;
+use std::collections::BTreeMap;
+use std::convert::TryFrom;
+
+/// Converts an avro schema to an arrow schema
+pub fn to_arrow_schema(avro_schema: &avro_rs::Schema) -> Result<Schema> {
+    let mut schema_fields = vec![];
+    match avro_schema {
+        AvroSchema::Record { fields, .. } => {
+            for field in fields {
+                schema_fields.push(schema_to_field_with_props(
+                    &field.schema,
+                    Some(&field.name),
+                    false,
+                    Some(&external_props(&field.schema)),
+                )?)
+            }
+        }
+        schema => schema_fields.push(schema_to_field(schema, Some(""), false)?),
+    }
+
+    let schema = Schema::new(schema_fields);
+    Ok(schema)
+}
+
+fn schema_to_field(
+    schema: &avro_rs::Schema,
+    name: Option<&str>,
+    nullable: bool,
+) -> Result<Field> {
+    schema_to_field_with_props(schema, name, nullable, None)
+}
+
+fn schema_to_field_with_props(
+    schema: &AvroSchema,
+    name: Option<&str>,
+    nullable: bool,
+    props: Option<&BTreeMap<String, String>>,
+) -> Result<Field> {
+    let mut nullable = nullable;
+    let field_type: DataType = match schema {
+        AvroSchema::Null => DataType::Null,
+        AvroSchema::Boolean => DataType::Boolean,
+        AvroSchema::Int => DataType::Int32,
+        AvroSchema::Long => DataType::Int64,
+        AvroSchema::Float => DataType::Float32,
+        AvroSchema::Double => DataType::Float64,
+        AvroSchema::Bytes => DataType::Binary,
+        AvroSchema::String => DataType::Utf8,
+        AvroSchema::Array(item_schema) => DataType::List(Box::new(
+            schema_to_field_with_props(item_schema, None, false, None)?,
+        )),
+        AvroSchema::Map(value_schema) => {
+            let value_field =
+                schema_to_field_with_props(value_schema, Some("value"), false, None)?;
+            DataType::Dictionary(
+                Box::new(DataType::Utf8),
+                Box::new(value_field.data_type().clone()),
+            )
+        }
+        AvroSchema::Union(us) => {
+            // If there are only two variants and one of them is null, set the other type as the field data type
+            let has_nullable = us.find_schema(&Value::Null).is_some();
+            let sub_schemas = us.variants();
+            if has_nullable && sub_schemas.len() == 2 {
+                nullable = true;
+                if let Some(schema) = sub_schemas
+                    .iter()
+                    .find(|&schema| !matches!(schema, AvroSchema::Null))
+                {
+                    schema_to_field_with_props(schema, None, has_nullable, None)?
+                        .data_type()
+                        .clone()
+                } else {
+                    return Err(DataFusionError::AvroError(
+                        avro_rs::Error::GetUnionDuplicate,
+                    ));
+                }
+            } else {
+                let fields = sub_schemas
+                    .iter()
+                    .map(|s| schema_to_field_with_props(s, None, has_nullable, None))
+                    .collect::<Result<Vec<Field>>>()?;
+                DataType::Union(fields)
+            }
+        }
+        AvroSchema::Record { name, fields, .. } => {
+            let fields: Result<Vec<Field>> = fields
+                .iter()
+                .map(|field| {
+                    let mut props = BTreeMap::new();
+                    if let Some(doc) = &field.doc {
+                        props.insert("avro::doc".to_string(), doc.clone());
+                    }
+                    /*if let Some(aliases) = fields.aliases {
+                        props.insert("aliases", aliases);
+                    }*/
+                    schema_to_field_with_props(
+                        &field.schema,
+                        Some(&format!("{}.{}", name.fullname(None), field.name)),
+                        false,
+                        Some(&props),
+                    )
+                })
+                .collect();
+            DataType::Struct(fields?)
+        }
+        AvroSchema::Enum { symbols, name, .. } => {
+            return Ok(Field::new_dict(
+                &name.fullname(None),
+                index_type(symbols.len()),

Review comment:
       Sorry for being late to the party. While porting this to arrow2 I noticed it: I think we want here
   
   ```
   DataType::Dictionary(Box::new(index_type(symbols.len())), Box::new(DataType::Utf8))
   ```
   
   `Field::new_dict` does not create a new dict `datatype`, it is only used to set the `dict_id`, order, etc.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Igosuki commented on a change in pull request #910: Avro Table Provider

Posted by GitBox <gi...@apache.org>.
Igosuki commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r696622294



##########
File path: datafusion/src/physical_plan/avro.rs
##########
@@ -0,0 +1,395 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for reading line-delimited Avro files
+use async_trait::async_trait;
+use futures::Stream;
+
+use super::{common, source::Source, ExecutionPlan, Partitioning, RecordBatchStream};
+use crate::avro_to_arrow::infer_avro_schema_from_reader;
+use crate::error::{DataFusionError, Result};
+use arrow::{
+    datatypes::{Schema, SchemaRef},
+    error::Result as ArrowResult,
+    record_batch::RecordBatch,
+};
+use std::fs::File;
+use std::{any::Any, io::Seek};
+use std::{
+    io::Read,
+    pin::Pin,
+    sync::{Arc, Mutex},
+    task::{Context, Poll},
+};
+
+/// Line-delimited Avro read options
+#[derive(Clone)]
+pub struct AvroReadOptions<'a> {
+    /// The data source schema.
+    pub schema: Option<SchemaRef>,
+
+    /// File extension; only files with this extension are selected for data input.
+    /// Defaults to ".avro".
+    pub file_extension: &'a str,
+}
+
+impl<'a> Default for AvroReadOptions<'a> {
+    fn default() -> Self {
+        Self {
+            schema: None,
+            file_extension: ".avro",
+        }
+    }
+}
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+/// Execution plan for scanning Avro data source
+#[derive(Debug)]
+pub struct AvroExec {
+    source: Source<Box<dyn SeekRead + Send + Sync>>,
+    schema: SchemaRef,
+    projection: Option<Vec<usize>>,
+    projected_schema: SchemaRef,
+    file_extension: String,
+    batch_size: usize,
+    limit: Option<usize>,
+}
+
+impl AvroExec {
+    /// Create a new execution plan for reading from a path
+    pub fn try_from_path(
+        path: &str,
+        options: AvroReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let file_extension = options.file_extension.to_string();
+
+        let filenames = common::build_file_list(path, &file_extension)?;
+
+        if filenames.is_empty() {
+            return Err(DataFusionError::Execution(format!(
+                "No files found at {path} with file extension {file_extension}",
+                path = path,
+                file_extension = file_extension.as_str()
+            )));
+        }
+
+        let schema = match options.schema {
+            Some(s) => s,
+            None => Arc::new(AvroExec::try_infer_schema(filenames.as_slice())?),
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::PartitionedFiles {
+                path: path.to_string(),
+                filenames,
+            },
+            schema,
+            projected_schema,
+            file_extension,
+            projection,
+            batch_size,
+            limit,
+        })
+    }
+    /// Create a new execution plan for reading from a reader
+    pub fn try_new_from_reader(
+        reader: impl Read + Seek + Send + Sync + 'static,
+        options: AvroReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let schema = match options.schema {
+            Some(s) => s,
+            None => {
+                return Err(DataFusionError::Execution(
+                    "The schema must be provided in options when reading from a reader"
+                        .to_string(),
+                ));
+            }
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+            schema,
+            file_extension: String::new(),
+            projection,
+            projected_schema,
+            batch_size,
+            limit,
+        })
+    }
+
+    /// Path to directory containing partitioned CSV files with the same schema
+    pub fn path(&self) -> &str {
+        self.source.path()
+    }
+
+    /// The individual files under path
+    pub fn filenames(&self) -> &[String] {
+        self.source.filenames()
+    }
+
+    /// File extension
+    pub fn file_extension(&self) -> &str {
+        &self.file_extension
+    }
+
+    /// Get the schema of the avro file
+    pub fn file_schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Optional projection for which columns to load
+    pub fn projection(&self) -> Option<&Vec<usize>> {
+        self.projection.as_ref()
+    }
+
+    /// Batch size
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Limit
+    pub fn limit(&self) -> Option<usize> {
+        self.limit
+    }
+
+    /// Infer schema for given Avro dataset
+    pub fn try_infer_schema(filenames: &[String]) -> Result<Schema> {
+        let mut schemas = Vec::new();
+        for filename in filenames {
+            let mut file = File::open(filename)?;
+            let schema = infer_avro_schema_from_reader(&mut file)?;
+            schemas.push(schema);
+        }
+
+        Ok(Schema::try_merge(schemas)?)
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for AvroExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.projected_schema.clone()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(match &self.source {
+            Source::PartitionedFiles { filenames, .. } => filenames.len(),
+            Source::Reader(_) => 1,
+        })
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        Vec::new()
+    }
+
+    fn with_new_children(

Review comment:
       Added a test for that in `sql.rs`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org