You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/08/25 18:35:33 UTC

[GitHub] [arrow-datafusion] alamb commented on a change in pull request #910: Avro Table Provider

alamb commented on a change in pull request #910:
URL: https://github.com/apache/arrow-datafusion/pull/910#discussion_r695977278



##########
File path: .gitmodules
##########
@@ -3,4 +3,5 @@
 	url = https://github.com/apache/parquet-testing.git
 [submodule "testing"]
 	path = testing
-	url = https://github.com/apache/arrow-testing
+	url = https://github.com/Igosuki/arrow-testing.git

Review comment:
       prior to merging this PR we should request/merge the changes into apache/arrow-testing I think

##########
File path: datafusion-examples/examples/avro_sql.rs
##########
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one

Review comment:
       👍 
   
   ```shell
   Opening avro: /Users/alamb/Software/arrow-datafusion/datafusion/../testing/data/avro/alltypes_plain.avro
   +---------+------------+-------------------------------+
   | int_col | double_col | CAST(date_string_col AS Utf8) |
   +---------+------------+-------------------------------+
   | 1       | 10.1       | 03/01/09                      |
   | 1       | 10.1       | 04/01/09                      |
   | 1       | 10.1       | 02/01/09                      |
   +---------+------------+-------------------------------+
   ```

##########
File path: datafusion/src/logical_plan/expr.rs
##########
@@ -244,6 +245,13 @@ pub enum Expr {
     IsNull(Box<Expr>),
     /// arithmetic negation of an expression, the operand must be of a signed numeric data type
     Negative(Box<Expr>),
+    /// Returns the field of a [`StructArray`] by name
+    GetField {

Review comment:
       This appears to be some part of https://github.com/apache/arrow-datafusion/pull/628 -- I think it would be cleaner if we revived that PR to get the GetField functionality separately rather than including it in a single large PR

##########
File path: datafusion/src/sql/parser.rs
##########
@@ -390,10 +393,21 @@ mod tests {
         });
         expect_parse_ok(sql, expected)?;
 
+        // positive case: it is ok for parquet files not to have columns specified

Review comment:
       ```suggestion
           // positive case: it is ok for avro files not to have columns specified
   ```

##########
File path: ballista/rust/core/src/serde/logical_plan/to_proto.rs
##########
@@ -1403,6 +1424,9 @@ impl TryInto<protobuf::ScalarFunction> for &BuiltinScalarFunction {
             BuiltinScalarFunction::SHA256 => Ok(protobuf::ScalarFunction::Sha256),
             BuiltinScalarFunction::SHA384 => Ok(protobuf::ScalarFunction::Sha384),
             BuiltinScalarFunction::SHA512 => Ok(protobuf::ScalarFunction::Sha512),
+            BuiltinScalarFunction::ToTimestampMillis => {

Review comment:
       the addition of `ToTimestampMillis` seems unrelated to this PR, but is a good addition

##########
File path: datafusion/src/avro_to_arrow/reader.rs
##########
@@ -0,0 +1,289 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::datatypes::{Schema, SchemaRef};
+use crate::arrow::record_batch::RecordBatch;
+use crate::avro_to_arrow::arrow_array_reader::AvroArrowArrayReader;
+use crate::error::Result;
+use arrow::error::Result as ArrowResult;
+use avro_rs::Reader as AvroReader;
+use std::io::{Read, Seek, SeekFrom};
+use std::sync::Arc;
+
+/// Avro file reader builder
+#[derive(Debug)]
+pub struct ReaderBuilder {
+    /// Optional schema for the Avro file
+    ///
+    /// If the schema is not supplied, the reader will try to infer the schema
+    /// based on the Avro structure.
+    schema: Option<SchemaRef>,
+    /// Batch size (number of records to load each time)
+    ///
+    /// The default batch size when using the `ReaderBuilder` is 1024 records
+    batch_size: usize,
+    /// Optional projection for which columns to load (zero-based column indices)
+    projection: Option<Vec<String>>,
+}
+
+impl Default for ReaderBuilder {
+    fn default() -> Self {
+        Self {
+            schema: None,
+            batch_size: 1024,
+            projection: None,
+        }
+    }
+}
+
+impl ReaderBuilder {
+    /// Create a new builder for configuring Avro parsing options.
+    ///
+    /// To convert a builder into a reader, call `Reader::from_builder`
+    ///
+    /// # Example
+    ///
+    /// ```
+    /// extern crate avro_rs;
+    ///
+    /// use std::fs::File;
+    ///
+    /// fn example() -> crate::datafusion::avro_to_arrow::Reader<'static, File> {
+    ///     let file = File::open("test/data/basic.avro").unwrap();
+    ///
+    ///     // create a builder, inferring the schema with the first 100 records
+    ///     let builder = crate::datafusion::avro_to_arrow::ReaderBuilder::new().infer_schema().with_batch_size(100);
+    ///
+    ///     let reader = builder.build::<File>(file).unwrap();
+    ///
+    ///     reader
+    /// }
+    /// ```
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Set the Avro file's schema
+    pub fn with_schema(mut self, schema: SchemaRef) -> Self {
+        self.schema = Some(schema);
+        self
+    }
+
+    /// Set the Avro reader to infer the schema of the file

Review comment:
       this doc comments seems to be the opposite of what the function does -- the function seems to set the schema and turn off schema inference

##########
File path: datafusion/src/avro_to_arrow/arrow_array_reader.rs
##########
@@ -0,0 +1,1093 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Avro to Arrow array readers
+
+use crate::arrow::array::{

Review comment:
       I can't say I am too familiar with the avro data model and thus I don't really feel qualified to review the logic in this file. 
   
   However, the code  seems easy to read and is  reasonably well tested so I think it would be fine to merge it in as is and then adjust / improve as this module got more use

##########
File path: datafusion/src/datasource/avro.rs
##########
@@ -0,0 +1,420 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Line-delimited Avro data source
+//!
+//! This data source allows Line-delimited Avro records or files to be used as input for queries.
+//!
+
+use std::{
+    any::Any,
+    io::{Read, Seek},
+    sync::{Arc, Mutex},
+};
+
+use arrow::datatypes::SchemaRef;
+
+use crate::physical_plan::avro::{AvroExec, AvroReadOptions};
+use crate::{
+    datasource::{Source, TableProvider},
+    error::{DataFusionError, Result},
+    physical_plan::{common, ExecutionPlan},
+};
+
+use super::datasource::Statistics;
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+
+/// Represents a  line-delimited Avro file with a provided schema
+pub struct AvroFile {
+    source: Source<Box<dyn SeekRead + Send + Sync + 'static>>,
+    schema: SchemaRef,
+    file_extension: String,
+    statistics: Statistics,
+}
+
+impl AvroFile {
+    /// Attempt to initialize a `AvroFile` from a path. The schema can be inferred automatically.
+    pub fn try_new(path: &str, options: AvroReadOptions) -> Result<Self> {
+        let schema = if let Some(schema) = options.schema {
+            schema
+        } else {
+            let filenames =
+                common::build_checked_file_list(path, options.file_extension)?;
+            Arc::new(AvroExec::try_infer_schema(&filenames)?)
+        };
+
+        Ok(Self {
+            source: Source::Path(path.to_string()),
+            schema,
+            file_extension: options.file_extension.to_string(),
+            statistics: Statistics::default(),
+        })
+    }
+
+    /// Attempt to initialize a `AvroFile` from a reader. The schema MUST be provided in options
+    pub fn try_new_from_reader<R: Read + Seek + Send + Sync + 'static>(
+        reader: R,
+        options: AvroReadOptions,
+    ) -> Result<Self> {
+        let schema = match options.schema {
+            Some(s) => s,
+            None => {
+                return Err(DataFusionError::Execution(
+                    "Schema must be provided to CsvRead".to_string(),
+                ));
+            }
+        };
+        Ok(Self {
+            source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+            schema,
+            statistics: Statistics::default(),
+            file_extension: String::new(),
+        })
+    }
+
+    /// Attempt to initialize an AvroFile from a reader impls Seek. The schema can be inferred automatically.
+    pub fn try_new_from_reader_infer_schema<R: Read + Seek + Send + Sync + 'static>(
+        mut reader: R,
+        options: AvroReadOptions,
+    ) -> Result<Self> {
+        let schema = {
+            if let Some(schema) = options.schema {
+                schema
+            } else {
+                Arc::new(crate::avro_to_arrow::infer_avro_schema_from_reader(
+                    &mut reader,
+                )?)
+            }
+        };
+
+        Ok(Self {
+            source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+            schema,
+            statistics: Statistics::default(),
+            file_extension: String::new(),
+        })
+    }
+
+    /// Get the path for Avro file(s) represented by this AvroFile instance
+    pub fn path(&self) -> &str {
+        match &self.source {
+            Source::Reader(_) => "",
+            Source::Path(path) => path,
+        }
+    }
+
+    /// Get the file extension for the Avro file(s) represented by this AvroFile instance
+    pub fn file_extension(&self) -> &str {
+        &self.file_extension
+    }
+}
+
+impl TableProvider for AvroFile {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn scan(
+        &self,
+        projection: &Option<Vec<usize>>,
+        batch_size: usize,
+        _filters: &[crate::logical_plan::Expr],
+        limit: Option<usize>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let opts = AvroReadOptions {
+            schema: Some(self.schema.clone()),
+            file_extension: self.file_extension.as_str(),
+        };
+        let batch_size = limit
+            .map(|l| std::cmp::min(l, batch_size))
+            .unwrap_or(batch_size);
+
+        let exec = match &self.source {
+            Source::Reader(maybe_reader) => {
+                if let Some(rdr) = maybe_reader.lock().unwrap().take() {
+                    AvroExec::try_new_from_reader(
+                        rdr,
+                        opts,
+                        projection.clone(),
+                        batch_size,
+                        limit,
+                    )?
+                } else {
+                    return Err(DataFusionError::Execution(
+                        "You can only read once if the data comes from a reader"
+                            .to_string(),
+                    ));
+                }
+            }
+            Source::Path(p) => {
+                AvroExec::try_from_path(p, opts, projection.clone(), batch_size, limit)?
+            }
+        };
+        Ok(Arc::new(exec))
+    }
+
+    fn statistics(&self) -> Statistics {
+        self.statistics.clone()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use arrow::array::{
+        BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
+        TimestampMicrosecondArray,
+    };
+    use arrow::record_batch::RecordBatch;
+    use futures::StreamExt;
+
+    use super::*;
+
+    #[tokio::test]
+    async fn read_small_batches() -> Result<()> {
+        let table = load_table("alltypes_plain.avro")?;
+        let projection = None;
+        let exec = table.scan(&projection, 2, &[], None)?;
+        let stream = exec.execute(0).await?;
+
+        let _ = stream
+            .map(|batch| {
+                let batch = batch.unwrap();
+                assert_eq!(11, batch.num_columns());
+                assert_eq!(2, batch.num_rows());
+            })
+            .fold(0, |acc, _| async move { acc + 1i32 })
+            .await;
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn read_alltypes_plain_avro() -> Result<()> {
+        let table = load_table("alltypes_plain.avro")?;
+
+        let x: Vec<String> = table
+            .schema()
+            .fields()
+            .iter()
+            .map(|f| format!("{}: {:?}", f.name(), f.data_type()))
+            .collect();
+        let y = x.join("\n");
+        assert_eq!(
+            "id: Int32\n\
+             bool_col: Boolean\n\
+             tinyint_col: Int32\n\
+             smallint_col: Int32\n\
+             int_col: Int32\n\
+             bigint_col: Int64\n\
+             float_col: Float32\n\
+             double_col: Float64\n\
+             date_string_col: Binary\n\
+             string_col: Binary\n\
+             timestamp_col: Timestamp(Microsecond, None)",
+            y
+        );
+
+        let projection = None;
+        let batch = get_first_batch(table, &projection).await?;
+
+        assert_eq!(11, batch.num_columns());
+        assert_eq!(8, batch.num_rows());

Review comment:
       It is a matter of preference, but I personally prefer using the `assert_batches!` macro for this kind of test (it is easy to update by copy/pasting test output). Here is an example:
   
   https://github.com/apache/arrow-datafusion/blob/60fa315dd82b040045d1d0797639bd0c17dd40ae/datafusion/tests/sql.rs#L163-L180
   

##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -78,7 +78,7 @@ impl NthValue {
     }
 
     /// Create a new NTH_VALUE window aggregate function
-    pub fn nth_value(
+    pub fn value(

Review comment:
       why this change? It seems unrelated to the rest of the PR (as well as not an obvious improvement to me)

##########
File path: datafusion/src/logical_plan/dfschema.rs
##########
@@ -140,6 +141,7 @@ impl DFSchema {
                 return Ok(i);
             }
         }
+        println!("{}", name);

Review comment:
       perhaps a stray debugging leftover

##########
File path: datafusion/src/physical_plan/avro.rs
##########
@@ -0,0 +1,395 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for reading line-delimited Avro files
+use async_trait::async_trait;
+use futures::Stream;
+
+use super::{common, source::Source, ExecutionPlan, Partitioning, RecordBatchStream};
+use crate::avro_to_arrow::infer_avro_schema_from_reader;
+use crate::error::{DataFusionError, Result};
+use arrow::{
+    datatypes::{Schema, SchemaRef},
+    error::Result as ArrowResult,
+    record_batch::RecordBatch,
+};
+use std::fs::File;
+use std::{any::Any, io::Seek};
+use std::{
+    io::Read,
+    pin::Pin,
+    sync::{Arc, Mutex},
+    task::{Context, Poll},
+};
+
+/// Line-delimited Avro read options
+#[derive(Clone)]
+pub struct AvroReadOptions<'a> {
+    /// The data source schema.
+    pub schema: Option<SchemaRef>,
+
+    /// File extension; only files with this extension are selected for data input.
+    /// Defaults to ".avro".
+    pub file_extension: &'a str,
+}
+
+impl<'a> Default for AvroReadOptions<'a> {
+    fn default() -> Self {
+        Self {
+            schema: None,
+            file_extension: ".avro",
+        }
+    }
+}
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+/// Execution plan for scanning Avro data source
+#[derive(Debug)]
+pub struct AvroExec {
+    source: Source<Box<dyn SeekRead + Send + Sync>>,
+    schema: SchemaRef,
+    projection: Option<Vec<usize>>,
+    projected_schema: SchemaRef,
+    file_extension: String,
+    batch_size: usize,
+    limit: Option<usize>,
+}
+
+impl AvroExec {
+    /// Create a new execution plan for reading from a path
+    pub fn try_from_path(
+        path: &str,
+        options: AvroReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let file_extension = options.file_extension.to_string();
+
+        let filenames = common::build_file_list(path, &file_extension)?;
+
+        if filenames.is_empty() {
+            return Err(DataFusionError::Execution(format!(
+                "No files found at {path} with file extension {file_extension}",
+                path = path,
+                file_extension = file_extension.as_str()
+            )));
+        }
+
+        let schema = match options.schema {
+            Some(s) => s,
+            None => Arc::new(AvroExec::try_infer_schema(filenames.as_slice())?),
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::PartitionedFiles {
+                path: path.to_string(),
+                filenames,
+            },
+            schema,
+            projected_schema,
+            file_extension,
+            projection,
+            batch_size,
+            limit,
+        })
+    }
+    /// Create a new execution plan for reading from a reader
+    pub fn try_new_from_reader(
+        reader: impl Read + Seek + Send + Sync + 'static,
+        options: AvroReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let schema = match options.schema {
+            Some(s) => s,
+            None => {
+                return Err(DataFusionError::Execution(
+                    "The schema must be provided in options when reading from a reader"
+                        .to_string(),
+                ));
+            }
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+            schema,
+            file_extension: String::new(),
+            projection,
+            projected_schema,
+            batch_size,
+            limit,
+        })
+    }
+
+    /// Path to directory containing partitioned CSV files with the same schema
+    pub fn path(&self) -> &str {
+        self.source.path()
+    }
+
+    /// The individual files under path
+    pub fn filenames(&self) -> &[String] {
+        self.source.filenames()
+    }
+
+    /// File extension
+    pub fn file_extension(&self) -> &str {
+        &self.file_extension
+    }
+
+    /// Get the schema of the avro file
+    pub fn file_schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Optional projection for which columns to load
+    pub fn projection(&self) -> Option<&Vec<usize>> {
+        self.projection.as_ref()
+    }
+
+    /// Batch size
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Limit
+    pub fn limit(&self) -> Option<usize> {
+        self.limit
+    }
+
+    /// Infer schema for given Avro dataset
+    pub fn try_infer_schema(filenames: &[String]) -> Result<Schema> {
+        let mut schemas = Vec::new();
+        for filename in filenames {
+            let mut file = File::open(filename)?;
+            let schema = infer_avro_schema_from_reader(&mut file)?;
+            schemas.push(schema);
+        }
+
+        Ok(Schema::try_merge(schemas)?)
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for AvroExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.projected_schema.clone()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(match &self.source {
+            Source::PartitionedFiles { filenames, .. } => filenames.len(),
+            Source::Reader(_) => 1,
+        })
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        Vec::new()
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if !children.is_empty() {
+            Err(DataFusionError::Internal(format!(
+                "Children cannot be replaced in {:?}",
+                self
+            )))
+        } else if let Source::PartitionedFiles { filenames, path } = &self.source {
+            Ok(Arc::new(Self {
+                source: Source::PartitionedFiles {
+                    filenames: filenames.clone(),
+                    path: path.clone(),
+                },
+                schema: self.schema.clone(),
+                projection: self.projection.clone(),
+                projected_schema: self.projected_schema.clone(),
+                batch_size: self.batch_size,
+                limit: self.limit,
+                file_extension: self.file_extension.clone(),
+            }))
+        } else {
+            Err(DataFusionError::Internal(
+                "AvroExec with reader source cannot be used with `with_new_children`"
+                    .to_string(),
+            ))
+        }
+    }
+
+    async fn execute(
+        &self,
+        partition: usize,
+    ) -> Result<super::SendableRecordBatchStream> {
+        let mut builder = crate::avro_to_arrow::ReaderBuilder::new()
+            .with_schema(self.schema.clone())
+            .with_batch_size(self.batch_size);
+        if let Some(proj) = &self.projection {
+            builder = builder.with_projection(
+                proj.iter()
+                    .map(|col_idx| self.schema.field(*col_idx).name())
+                    .cloned()
+                    .collect(),
+            );
+        }
+        match &self.source {
+            Source::PartitionedFiles { filenames, .. } => {
+                let file = File::open(&filenames[partition])?;
+
+                Ok(Box::pin(AvroStream::new(builder.build(file)?, self.limit)))
+            }
+            Source::Reader(rdr) => {
+                if partition != 0 {
+                    Err(DataFusionError::Internal(
+                        "Only partition 0 is valid when CSV comes from a reader"

Review comment:
       ```suggestion
                           "Only partition 0 is valid when AVRO comes from a reader"
   ```

##########
File path: datafusion/src/avro_to_arrow/reader.rs
##########
@@ -0,0 +1,289 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::datatypes::{Schema, SchemaRef};
+use crate::arrow::record_batch::RecordBatch;
+use crate::avro_to_arrow::arrow_array_reader::AvroArrowArrayReader;
+use crate::error::Result;
+use arrow::error::Result as ArrowResult;
+use avro_rs::Reader as AvroReader;
+use std::io::{Read, Seek, SeekFrom};
+use std::sync::Arc;
+
+/// Avro file reader builder
+#[derive(Debug)]
+pub struct ReaderBuilder {
+    /// Optional schema for the Avro file
+    ///
+    /// If the schema is not supplied, the reader will try to infer the schema
+    /// based on the Avro structure.
+    schema: Option<SchemaRef>,
+    /// Batch size (number of records to load each time)
+    ///
+    /// The default batch size when using the `ReaderBuilder` is 1024 records
+    batch_size: usize,
+    /// Optional projection for which columns to load (zero-based column indices)
+    projection: Option<Vec<String>>,
+}
+
+impl Default for ReaderBuilder {
+    fn default() -> Self {
+        Self {
+            schema: None,
+            batch_size: 1024,
+            projection: None,
+        }
+    }
+}
+
+impl ReaderBuilder {
+    /// Create a new builder for configuring Avro parsing options.
+    ///
+    /// To convert a builder into a reader, call `Reader::from_builder`
+    ///
+    /// # Example
+    ///
+    /// ```
+    /// extern crate avro_rs;
+    ///
+    /// use std::fs::File;
+    ///
+    /// fn example() -> crate::datafusion::avro_to_arrow::Reader<'static, File> {
+    ///     let file = File::open("test/data/basic.avro").unwrap();
+    ///
+    ///     // create a builder, inferring the schema with the first 100 records
+    ///     let builder = crate::datafusion::avro_to_arrow::ReaderBuilder::new().infer_schema().with_batch_size(100);
+    ///
+    ///     let reader = builder.build::<File>(file).unwrap();
+    ///
+    ///     reader
+    /// }
+    /// ```
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Set the Avro file's schema
+    pub fn with_schema(mut self, schema: SchemaRef) -> Self {
+        self.schema = Some(schema);
+        self
+    }
+
+    /// Set the Avro reader to infer the schema of the file
+    pub fn infer_schema(mut self) -> Self {
+        // remove any schema that is set
+        self.schema = None;
+        self
+    }
+
+    /// Set the batch size (number of records to load at one time)
+    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+        self.batch_size = batch_size;
+        self
+    }
+
+    /// Set the reader's column projection
+    pub fn with_projection(mut self, projection: Vec<String>) -> Self {
+        self.projection = Some(projection);
+        self
+    }
+
+    /// Create a new `Reader` from the `ReaderBuilder`
+    pub fn build<'a, R>(self, source: R) -> Result<Reader<'a, R>>
+    where
+        R: Read + Seek,
+    {
+        let mut source = source;
+
+        // check if schema should be inferred
+        let schema = match self.schema {
+            Some(schema) => schema,
+            None => Arc::new(infer_avro_schema_from_reader(&mut source)?),
+        };
+        source.seek(SeekFrom::Start(0))?;
+        Reader::try_new(source, schema, self.batch_size, self.projection)
+    }
+}
+
+/// Avro file record  reader
+pub struct Reader<'a, R: Read> {
+    array_reader: AvroArrowArrayReader<'a, R>,
+    schema: SchemaRef,
+    batch_size: usize,
+}
+
+impl<'a, R: Read> Reader<'a, R> {
+    /// Create a new Avro Reader from any value that implements the `Read` trait.
+    ///
+    /// If reading a `File`, you can customise the Reader, such as to enable schema
+    /// inference, use `ReaderBuilder`.
+    pub fn try_new(
+        reader: R,
+        schema: SchemaRef,
+        batch_size: usize,
+        projection: Option<Vec<String>>,
+    ) -> Result<Self> {
+        Ok(Self {
+            array_reader: AvroArrowArrayReader::try_new(
+                AvroReader::new(reader)?,
+                schema.clone(),
+                projection,
+            )?,
+            schema,
+            batch_size,
+        })
+    }
+
+    /// Returns the schema of the reader, useful for getting the schema without reading
+    /// record batches
+    pub fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Read the next batch of records
+    #[allow(clippy::should_implement_trait)]
+    pub fn next(&mut self) -> ArrowResult<Option<RecordBatch>> {
+        self.array_reader.next_batch(self.batch_size)
+    }
+}
+
+impl<'a, R: Read> Iterator for Reader<'a, R> {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.next().transpose()
+    }
+}
+
+/// Infer Avro schema given a reader
+pub fn infer_avro_schema_from_reader<R: Read + Seek>(reader: &mut R) -> Result<Schema> {
+    let avro_reader = avro_rs::Reader::new(reader)?;
+    let schema = avro_reader.writer_schema();
+    super::to_arrow_schema(schema)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::arrow::array::*;
+    use crate::arrow::datatypes::{DataType, Field};
+    use arrow::datatypes::TimeUnit;
+    use std::fs::File;
+
+    fn build_reader(name: &str) -> Reader<File> {
+        let testdata = crate::test_util::arrow_test_data();
+        let filename = format!("{}/avro/{}", testdata, name);
+        let builder = ReaderBuilder::new().infer_schema().with_batch_size(64);
+        builder.build(File::open(filename).unwrap()).unwrap()
+    }
+
+    fn get_col<'a, T: 'static>(
+        batch: &'a RecordBatch,
+        col: (usize, &Field),
+    ) -> Option<&'a T> {
+        batch.column(col.0).as_any().downcast_ref::<T>()
+    }
+
+    #[test]
+    fn test_avro_basic() {
+        let mut reader = build_reader("alltypes_dictionary.avro");
+        let batch = reader.next().unwrap().unwrap();
+
+        assert_eq!(11, batch.num_columns());
+        assert_eq!(2, batch.num_rows());
+
+        let schema = reader.schema();
+        let batch_schema = batch.schema();
+        assert_eq!(schema, batch_schema);
+
+        let id = schema.column_with_name("id").unwrap();
+        assert_eq!(0, id.0);
+        assert_eq!(&DataType::Int32, id.1.data_type());
+        let col = get_col::<Int32Array>(&batch, id).unwrap();

Review comment:
       I personally the `assert_batches_eq!` approach for comparing the output of record batches, but this works too

##########
File path: datafusion/src/avro_to_arrow/arrow_array_reader.rs
##########
@@ -0,0 +1,1093 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Avro to Arrow array readers
+
+use crate::arrow::array::{
+    make_array, Array, ArrayBuilder, ArrayData, ArrayDataBuilder, ArrayRef,
+    BooleanBuilder, LargeStringArray, ListBuilder, NullArray, OffsetSizeTrait,
+    PrimitiveArray, PrimitiveBuilder, StringArray, StringBuilder,
+    StringDictionaryBuilder,
+};
+use crate::arrow::buffer::{Buffer, MutableBuffer};
+use crate::arrow::datatypes::{
+    ArrowDictionaryKeyType, ArrowNumericType, ArrowPrimitiveType, DataType, Date32Type,
+    Date64Type, Field, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type,
+    Int8Type, Schema, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
+    Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType,
+    TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, UInt64Type,
+    UInt8Type,
+};
+use crate::arrow::error::ArrowError;
+use crate::arrow::record_batch::RecordBatch;
+use crate::arrow::util::bit_util;
+use crate::error::{DataFusionError, Result};
+use arrow::array::{BinaryArray, GenericListArray};
+use arrow::datatypes::SchemaRef;
+use arrow::error::ArrowError::SchemaError;
+use arrow::error::Result as ArrowResult;
+use avro_rs::schema::Schema as AvroSchema;
+use avro_rs::schema::SchemaKind;
+use avro_rs::types::Value;
+use avro_rs::{AvroResult, Reader as AvroReader};
+use num_traits::NumCast;
+use std::collections::HashMap;
+use std::io::Read;
+use std::sync::Arc;
+
+type RecordSlice<'a> = &'a [Vec<(String, Value)>];
+
+pub struct AvroArrowArrayReader<'a, R: Read> {
+    reader: AvroReader<'a, R>,
+    schema: SchemaRef,
+    projection: Option<Vec<String>>,
+    schema_lookup: HashMap<String, usize>,
+}
+
+impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
+    pub fn try_new(
+        reader: AvroReader<'a, R>,
+        schema: SchemaRef,
+        projection: Option<Vec<String>>,
+    ) -> Result<Self> {
+        let writer_schema = reader.writer_schema().clone();
+        let schema_lookup = Self::schema_lookup(writer_schema)?;
+        Ok(Self {
+            reader,
+            schema,
+            projection,
+            schema_lookup,
+        })
+    }
+
+    pub fn schema_lookup(schema: AvroSchema) -> Result<HashMap<String, usize>> {
+        match schema {
+            AvroSchema::Record {
+                lookup: ref schema_lookup,
+                ..
+            } => Ok(schema_lookup.clone()),
+            _ => Err(DataFusionError::ArrowError(SchemaError(
+                "expected avro schema to be a record".to_string(),
+            ))),
+        }
+    }
+
+    /// Read the next batch of records
+    #[allow(clippy::should_implement_trait)]
+    pub fn next_batch(&mut self, batch_size: usize) -> ArrowResult<Option<RecordBatch>> {
+        let mut rows = Vec::with_capacity(batch_size);
+        for value in self.reader.by_ref().take(batch_size) {
+            let v = value.map_err(|e| {
+                ArrowError::ParseError(format!("Failed to parse avro value: {:?}", e))
+            })?;
+            match v {
+                Value::Record(v) => {
+                    rows.push(v);
+                }
+                other => {
+                    return Err(ArrowError::ParseError(format!(
+                        "Row needs to be of type object, got: {:?}",
+                        other
+                    )))
+                }
+            }
+        }
+        if rows.is_empty() {
+            // reached end of file
+            return Ok(None);
+        }
+        let rows = &rows[..];
+        let projection = self.projection.clone().unwrap_or_else(Vec::new);
+        let arrays = self.build_struct_array(rows, self.schema.fields(), &projection);
+        let projected_fields: Vec<Field> = if projection.is_empty() {
+            self.schema.fields().to_vec()
+        } else {
+            projection
+                .iter()
+                .map(|name| self.schema.column_with_name(name))
+                .flatten()
+                .map(|(_, field)| field.clone())
+                .collect()
+        };
+        let projected_schema = Arc::new(Schema::new(projected_fields));
+        arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr).map(Some))
+    }
+
+    fn build_boolean_array(
+        &self,
+        rows: RecordSlice,
+        col_name: &str,
+    ) -> ArrowResult<ArrayRef> {
+        let mut builder = BooleanBuilder::new(rows.len());
+        for row in rows {
+            if let Some(value) = self.field_lookup(col_name, row) {
+                if let Some(boolean) = resolve_boolean(&value) {
+                    builder.append_value(boolean)?
+                } else {
+                    builder.append_null()?;
+                }
+            } else {
+                builder.append_null()?;
+            }
+        }
+        Ok(Arc::new(builder.finish()))
+    }
+
+    #[allow(clippy::unnecessary_wraps)]
+    fn build_primitive_array<T: ArrowPrimitiveType + Resolver>(
+        &self,
+        rows: RecordSlice,
+        col_name: &str,
+    ) -> ArrowResult<ArrayRef>
+    where
+        T: ArrowNumericType,
+        T::Native: num_traits::cast::NumCast,
+    {
+        Ok(Arc::new(
+            rows.iter()
+                .map(|row| {
+                    self.field_lookup(col_name, row)
+                        .and_then(|value| resolve_item::<T>(&value))
+                })
+                .collect::<PrimitiveArray<T>>(),
+        ))
+    }
+
+    #[inline(always)]
+    #[allow(clippy::unnecessary_wraps)]
+    fn build_string_dictionary_builder<T>(
+        &self,
+        row_len: usize,
+    ) -> ArrowResult<StringDictionaryBuilder<T>>
+    where
+        T: ArrowPrimitiveType + ArrowDictionaryKeyType,
+    {
+        let key_builder = PrimitiveBuilder::<T>::new(row_len);
+        let values_builder = StringBuilder::new(row_len * 5);
+        Ok(StringDictionaryBuilder::new(key_builder, values_builder))
+    }
+
+    fn build_wrapped_list_array(
+        &self,
+        rows: RecordSlice,
+        col_name: &str,
+        key_type: &DataType,
+    ) -> ArrowResult<ArrayRef> {
+        match *key_type {
+            DataType::Int8 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::Int8),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<Int8Type>(&dtype, col_name, rows)
+            }
+            DataType::Int16 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::Int16),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<Int16Type>(&dtype, col_name, rows)
+            }
+            DataType::Int32 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::Int32),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<Int32Type>(&dtype, col_name, rows)
+            }
+            DataType::Int64 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::Int64),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<Int64Type>(&dtype, col_name, rows)
+            }
+            DataType::UInt8 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::UInt8),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<UInt8Type>(&dtype, col_name, rows)
+            }
+            DataType::UInt16 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::UInt16),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<UInt16Type>(&dtype, col_name, rows)
+            }
+            DataType::UInt32 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::UInt32),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<UInt32Type>(&dtype, col_name, rows)
+            }
+            DataType::UInt64 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::UInt64),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<UInt64Type>(&dtype, col_name, rows)
+            }
+            ref e => Err(SchemaError(format!(
+                "Data type is currently not supported for dictionaries in list : {:?}",
+                e
+            ))),
+        }
+    }
+
+    #[inline(always)]
+    fn list_array_string_array_builder<D>(
+        &self,
+        data_type: &DataType,
+        col_name: &str,
+        rows: RecordSlice,
+    ) -> ArrowResult<ArrayRef>
+    where
+        D: ArrowPrimitiveType + ArrowDictionaryKeyType,
+    {
+        let mut builder: Box<dyn ArrayBuilder> = match data_type {
+            DataType::Utf8 => {
+                let values_builder = StringBuilder::new(rows.len() * 5);
+                Box::new(ListBuilder::new(values_builder))
+            }
+            DataType::Dictionary(_, _) => {
+                let values_builder =
+                    self.build_string_dictionary_builder::<D>(rows.len() * 5)?;
+                Box::new(ListBuilder::new(values_builder))
+            }
+            e => {
+                return Err(SchemaError(format!(
+                    "Nested list data builder type is not supported: {:?}",
+                    e
+                )))
+            }
+        };
+
+        for row in rows {
+            if let Some(value) = self.field_lookup(col_name, row) {
+                // value can be an array or a scalar
+                let vals: Vec<Option<String>> = if let Value::String(v) = value {
+                    vec![Some(v.to_string())]
+                } else if let Value::Array(n) = value {
+                    n.into_iter()
+                        .map(|v| {
+                            resolve_string(&v)
+                            // else if matches!(

Review comment:
       perhaps this should return an error?

##########
File path: datafusion/Cargo.toml
##########
@@ -69,6 +69,8 @@ regex = { version = "^1.4.3", optional = true }
 lazy_static = { version = "^1.4.0", optional = true }
 smallvec = { version = "1.6", features = ["union"] }
 rand = "0.8"
+avro-rs = { version = "0.13", features = ["snappy"] }

Review comment:
       As the datafusion codebase already has a non trivial number of depenencies I would personally prefer we did not add additional required ones.
   
   What would you think about making
   
   ```suggestion
   avro-rs = { version = "0.13", features = ["snappy"], optional=true }
   ```
   
   And then document it as a crate `feature` -- e.g. like `regex_expressions` (but not a default feature)?
   
   I think most of the rest of the code in this PR could be left the same and only the part that interacts with `avro-rs` could be `#[cfg(...)]` out

##########
File path: datafusion/src/physical_plan/avro.rs
##########
@@ -0,0 +1,395 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for reading line-delimited Avro files
+use async_trait::async_trait;
+use futures::Stream;
+
+use super::{common, source::Source, ExecutionPlan, Partitioning, RecordBatchStream};
+use crate::avro_to_arrow::infer_avro_schema_from_reader;
+use crate::error::{DataFusionError, Result};
+use arrow::{
+    datatypes::{Schema, SchemaRef},
+    error::Result as ArrowResult,
+    record_batch::RecordBatch,
+};
+use std::fs::File;
+use std::{any::Any, io::Seek};
+use std::{
+    io::Read,
+    pin::Pin,
+    sync::{Arc, Mutex},
+    task::{Context, Poll},
+};
+
+/// Line-delimited Avro read options
+#[derive(Clone)]
+pub struct AvroReadOptions<'a> {
+    /// The data source schema.
+    pub schema: Option<SchemaRef>,
+
+    /// File extension; only files with this extension are selected for data input.
+    /// Defaults to ".avro".
+    pub file_extension: &'a str,
+}
+
+impl<'a> Default for AvroReadOptions<'a> {
+    fn default() -> Self {
+        Self {
+            schema: None,
+            file_extension: ".avro",
+        }
+    }
+}
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+/// Execution plan for scanning Avro data source
+#[derive(Debug)]
+pub struct AvroExec {
+    source: Source<Box<dyn SeekRead + Send + Sync>>,
+    schema: SchemaRef,
+    projection: Option<Vec<usize>>,
+    projected_schema: SchemaRef,
+    file_extension: String,
+    batch_size: usize,
+    limit: Option<usize>,
+}
+
+impl AvroExec {
+    /// Create a new execution plan for reading from a path
+    pub fn try_from_path(
+        path: &str,
+        options: AvroReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let file_extension = options.file_extension.to_string();
+
+        let filenames = common::build_file_list(path, &file_extension)?;
+
+        if filenames.is_empty() {
+            return Err(DataFusionError::Execution(format!(
+                "No files found at {path} with file extension {file_extension}",
+                path = path,
+                file_extension = file_extension.as_str()
+            )));
+        }
+
+        let schema = match options.schema {
+            Some(s) => s,
+            None => Arc::new(AvroExec::try_infer_schema(filenames.as_slice())?),
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::PartitionedFiles {
+                path: path.to_string(),
+                filenames,
+            },
+            schema,
+            projected_schema,
+            file_extension,
+            projection,
+            batch_size,
+            limit,
+        })
+    }
+    /// Create a new execution plan for reading from a reader
+    pub fn try_new_from_reader(
+        reader: impl Read + Seek + Send + Sync + 'static,
+        options: AvroReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let schema = match options.schema {
+            Some(s) => s,
+            None => {
+                return Err(DataFusionError::Execution(
+                    "The schema must be provided in options when reading from a reader"
+                        .to_string(),
+                ));
+            }
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+            schema,
+            file_extension: String::new(),
+            projection,
+            projected_schema,
+            batch_size,
+            limit,
+        })
+    }
+
+    /// Path to directory containing partitioned CSV files with the same schema
+    pub fn path(&self) -> &str {
+        self.source.path()
+    }
+
+    /// The individual files under path
+    pub fn filenames(&self) -> &[String] {
+        self.source.filenames()
+    }
+
+    /// File extension
+    pub fn file_extension(&self) -> &str {
+        &self.file_extension
+    }
+
+    /// Get the schema of the avro file
+    pub fn file_schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Optional projection for which columns to load
+    pub fn projection(&self) -> Option<&Vec<usize>> {
+        self.projection.as_ref()
+    }
+
+    /// Batch size
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Limit
+    pub fn limit(&self) -> Option<usize> {
+        self.limit
+    }
+
+    /// Infer schema for given Avro dataset
+    pub fn try_infer_schema(filenames: &[String]) -> Result<Schema> {
+        let mut schemas = Vec::new();
+        for filename in filenames {
+            let mut file = File::open(filename)?;
+            let schema = infer_avro_schema_from_reader(&mut file)?;
+            schemas.push(schema);
+        }
+
+        Ok(Schema::try_merge(schemas)?)
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for AvroExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.projected_schema.clone()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(match &self.source {
+            Source::PartitionedFiles { filenames, .. } => filenames.len(),
+            Source::Reader(_) => 1,
+        })
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        Vec::new()
+    }
+
+    fn with_new_children(

Review comment:
       This code seems to support creating a table backed by multiple avro files (as is supported by the csv and parquet readers), but I don't see a test for that functionality anywhere.
   
   Maybe you could have a test in sql.rs that refered to the same test files twice and ensured we got back two copies of the data

##########
File path: datafusion/src/physical_plan/avro.rs
##########
@@ -0,0 +1,395 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for reading line-delimited Avro files
+use async_trait::async_trait;
+use futures::Stream;
+
+use super::{common, source::Source, ExecutionPlan, Partitioning, RecordBatchStream};
+use crate::avro_to_arrow::infer_avro_schema_from_reader;
+use crate::error::{DataFusionError, Result};
+use arrow::{
+    datatypes::{Schema, SchemaRef},
+    error::Result as ArrowResult,
+    record_batch::RecordBatch,
+};
+use std::fs::File;
+use std::{any::Any, io::Seek};
+use std::{
+    io::Read,
+    pin::Pin,
+    sync::{Arc, Mutex},
+    task::{Context, Poll},
+};
+
+/// Line-delimited Avro read options
+#[derive(Clone)]
+pub struct AvroReadOptions<'a> {
+    /// The data source schema.
+    pub schema: Option<SchemaRef>,
+
+    /// File extension; only files with this extension are selected for data input.
+    /// Defaults to ".avro".
+    pub file_extension: &'a str,
+}
+
+impl<'a> Default for AvroReadOptions<'a> {
+    fn default() -> Self {
+        Self {
+            schema: None,
+            file_extension: ".avro",
+        }
+    }
+}
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+/// Execution plan for scanning Avro data source
+#[derive(Debug)]
+pub struct AvroExec {
+    source: Source<Box<dyn SeekRead + Send + Sync>>,
+    schema: SchemaRef,
+    projection: Option<Vec<usize>>,
+    projected_schema: SchemaRef,
+    file_extension: String,
+    batch_size: usize,
+    limit: Option<usize>,
+}
+
+impl AvroExec {
+    /// Create a new execution plan for reading from a path
+    pub fn try_from_path(
+        path: &str,
+        options: AvroReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let file_extension = options.file_extension.to_string();
+
+        let filenames = common::build_file_list(path, &file_extension)?;
+
+        if filenames.is_empty() {
+            return Err(DataFusionError::Execution(format!(
+                "No files found at {path} with file extension {file_extension}",
+                path = path,
+                file_extension = file_extension.as_str()
+            )));
+        }
+
+        let schema = match options.schema {
+            Some(s) => s,
+            None => Arc::new(AvroExec::try_infer_schema(filenames.as_slice())?),
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::PartitionedFiles {
+                path: path.to_string(),
+                filenames,
+            },
+            schema,
+            projected_schema,
+            file_extension,
+            projection,
+            batch_size,
+            limit,
+        })
+    }
+    /// Create a new execution plan for reading from a reader
+    pub fn try_new_from_reader(
+        reader: impl Read + Seek + Send + Sync + 'static,
+        options: AvroReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let schema = match options.schema {
+            Some(s) => s,
+            None => {
+                return Err(DataFusionError::Execution(
+                    "The schema must be provided in options when reading from a reader"
+                        .to_string(),
+                ));
+            }
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+            schema,
+            file_extension: String::new(),
+            projection,
+            projected_schema,
+            batch_size,
+            limit,
+        })
+    }
+
+    /// Path to directory containing partitioned CSV files with the same schema
+    pub fn path(&self) -> &str {
+        self.source.path()
+    }
+
+    /// The individual files under path
+    pub fn filenames(&self) -> &[String] {
+        self.source.filenames()
+    }
+
+    /// File extension
+    pub fn file_extension(&self) -> &str {
+        &self.file_extension
+    }
+
+    /// Get the schema of the avro file
+    pub fn file_schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Optional projection for which columns to load
+    pub fn projection(&self) -> Option<&Vec<usize>> {
+        self.projection.as_ref()
+    }
+
+    /// Batch size
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Limit
+    pub fn limit(&self) -> Option<usize> {
+        self.limit
+    }
+
+    /// Infer schema for given Avro dataset
+    pub fn try_infer_schema(filenames: &[String]) -> Result<Schema> {
+        let mut schemas = Vec::new();
+        for filename in filenames {
+            let mut file = File::open(filename)?;
+            let schema = infer_avro_schema_from_reader(&mut file)?;
+            schemas.push(schema);
+        }
+
+        Ok(Schema::try_merge(schemas)?)
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for AvroExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.projected_schema.clone()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(match &self.source {
+            Source::PartitionedFiles { filenames, .. } => filenames.len(),
+            Source::Reader(_) => 1,
+        })
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        Vec::new()
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if !children.is_empty() {
+            Err(DataFusionError::Internal(format!(
+                "Children cannot be replaced in {:?}",
+                self
+            )))
+        } else if let Source::PartitionedFiles { filenames, path } = &self.source {
+            Ok(Arc::new(Self {
+                source: Source::PartitionedFiles {
+                    filenames: filenames.clone(),
+                    path: path.clone(),
+                },
+                schema: self.schema.clone(),
+                projection: self.projection.clone(),
+                projected_schema: self.projected_schema.clone(),
+                batch_size: self.batch_size,
+                limit: self.limit,
+                file_extension: self.file_extension.clone(),
+            }))
+        } else {
+            Err(DataFusionError::Internal(
+                "AvroExec with reader source cannot be used with `with_new_children`"
+                    .to_string(),
+            ))
+        }
+    }
+
+    async fn execute(
+        &self,
+        partition: usize,
+    ) -> Result<super::SendableRecordBatchStream> {
+        let mut builder = crate::avro_to_arrow::ReaderBuilder::new()
+            .with_schema(self.schema.clone())
+            .with_batch_size(self.batch_size);
+        if let Some(proj) = &self.projection {
+            builder = builder.with_projection(
+                proj.iter()
+                    .map(|col_idx| self.schema.field(*col_idx).name())
+                    .cloned()
+                    .collect(),
+            );
+        }
+        match &self.source {
+            Source::PartitionedFiles { filenames, .. } => {
+                let file = File::open(&filenames[partition])?;
+
+                Ok(Box::pin(AvroStream::new(builder.build(file)?, self.limit)))
+            }
+            Source::Reader(rdr) => {
+                if partition != 0 {
+                    Err(DataFusionError::Internal(
+                        "Only partition 0 is valid when CSV comes from a reader"
+                            .to_string(),
+                    ))
+                } else if let Some(rdr) = rdr.lock().unwrap().take() {
+                    Ok(Box::pin(AvroStream::new(builder.build(rdr)?, self.limit)))
+                } else {
+                    Err(DataFusionError::Execution(
+                        "Error reading CSV: Data can only be read a single time when the source is a reader"

Review comment:
       ```suggestion
                           "Error reading AVRO: Data can only be read a single time when the source is a reader"
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org