You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/09/15 14:30:40 UTC

[arrow-datafusion] branch master updated: Avro Table Provider (#910)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 6402200  Avro Table Provider (#910)
6402200 is described below

commit 6402200c3ff1ca9732e85ca39fc89ae5f30cd965
Author: Guillaume Balaine <ig...@gmail.com>
AuthorDate: Wed Sep 15 16:30:36 2021 +0200

    Avro Table Provider (#910)
    
    * Add avro as a datasource, file and table provider
    
    * wip
    
    * Added support composite identifiers for struct type.
    
    * Fixed build.
    
    * cheat and add unions to valid composite column types
    
    * Implement the AvroArrayReader
    
    * Add binary types
    
    * Enable Avro as a FileType
    
    * Enable registering an avro table in the sql parsing
    
    * Change package name for datafusion/avro
    
    * Implement Avro datasource tests and fix avro_rs::Value resolution to Arrow types
    
    * Test for AvroExec::try_from_path
    
    * external table avro test
    
    * Basic schema conversion tests
    
    * Complete test for avro_to_arrow_reader on alltypes_dictionnary
    
    * fix_stable: .rewind is 'unstable'
    
    * Fix license files and remove the unused avro-converter crate
    
    * fix example test in avro_to_arrow
    
    * add avro_sql test to default workflow
    
    * Adress clippies
    
    * Enable avro as a valid datasource for client execution
    
    * Add avro to available logical plan nodes
    
    * Add ToTimestampMillis as a scalar function in protos
    
    * Allow Avro in PhysicalPlan nodes
    
    * Remove remaining confusing references to 'json' in avro mod
    
    * rename 'parquet' words in avro test and examples
    
    * Handle Union of nested lists in arrow reader
    
    * test timestamp arrays
    
    * remove debug statement
    
    * Make avro optional
    
    * Remove debug statement
    
    * Remove GetField usage (see #628)
    
    * Fix docstring in parser tests
    
    * Test batch output rather than just rows individually
    
    * Remove 'csv' from error strings in physical_plan::avro
    
    * Avro sample sql and explain queries tests in sql.rs
    
    * Activate avro feature for cargo tests in github workflow
    
    * Add a test for avro registering multiple files in a single table
    
    * Switch to Result instead of Option for resolve_string
    
    * Address missing clippy warning should_implement_trait in arrow_to_avro/reader
    
    * Add fmt display implementation for AvroExec
    
    * ci: fix cargo sql run example, use datafusion/avro feature instead of 'avro'
    
    * license: missing license file for avro_to_arrow/schema.rs
    
    * only run avro datasource tests if features have 'avro'
    
    * refactor: rename infer_avro_schema_from_reader to read_avro_schema_from_reader
    
    * Pass None as props to avro schema schema_to_field_with_props until further notice
    
    * Change schema inferance to FixedSizeBinary(16) for Uuid
    
    * schema: prefix metadata coming from avro with 'avro'
    
    * make num traits optional and part of the avro feature flag
    
    * Fix avro schema tests regarding external props
    
    * split avro physical plan test feature wise and add a non-implemented test
    
    * submodule: switch back to apache/arrow-testing
    
    * fix_test: columns are now prefixed in the plan
    
    * avro_test: fix clippy warning cmp-owned
    
    * avro: move statistics to the physical plan
    
    * Increase min stack size for cargo tests
    
    Co-authored-by: Jorge C. Leitao <jo...@gmail.com>
---
 .github/workflows/rust.yml                         |    5 +-
 ballista/rust/client/src/context.rs                |   40 +
 ballista/rust/core/proto/ballista.proto            |   24 +
 .../rust/core/src/serde/logical_plan/from_proto.rs |   29 +
 ballista/rust/core/src/serde/logical_plan/mod.rs   |    8 +-
 .../rust/core/src/serde/logical_plan/to_proto.rs   |   30 +-
 .../core/src/serde/physical_plan/from_proto.rs     |   17 +
 .../rust/core/src/serde/physical_plan/to_proto.rs  |   23 +
 datafusion-examples/Cargo.toml                     |    4 +
 datafusion-examples/examples/avro_sql.rs           |   49 +
 datafusion/Cargo.toml                              |    5 +-
 datafusion/src/avro_to_arrow/arrow_array_reader.rs | 1090 ++++++++++++++++++++
 datafusion/src/avro_to_arrow/mod.rs                |   47 +
 datafusion/src/avro_to_arrow/reader.rs             |  281 +++++
 datafusion/src/avro_to_arrow/schema.rs             |  464 +++++++++
 datafusion/src/datasource/avro.rs                  |  424 ++++++++
 datafusion/src/datasource/mod.rs                   |    1 +
 datafusion/src/error.rs                            |   16 +
 datafusion/src/execution/context.rs                |   32 +
 datafusion/src/lib.rs                              |    1 +
 datafusion/src/logical_plan/builder.rs             |   23 +
 datafusion/src/logical_plan/dfschema.rs            |    1 -
 datafusion/src/logical_plan/expr.rs                |   16 +-
 datafusion/src/physical_plan/avro.rs               |  457 ++++++++
 datafusion/src/physical_plan/common.rs             |   14 +
 .../src/physical_plan/datetime_expressions.rs      |   15 +-
 datafusion/src/physical_plan/mod.rs                |    1 +
 datafusion/src/physical_plan/source.rs             |    2 +-
 datafusion/src/physical_plan/string_expressions.rs |    1 +
 datafusion/src/sql/parser.rs                       |   18 +-
 datafusion/src/sql/planner.rs                      |    1 +
 datafusion/tests/sql.rs                            |  155 ++-
 testing                                            |    2 +-
 33 files changed, 3258 insertions(+), 38 deletions(-)

diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
index a75af64..d62b996 100644
--- a/.github/workflows/rust.yml
+++ b/.github/workflows/rust.yml
@@ -105,13 +105,14 @@ jobs:
         run: |
           export ARROW_TEST_DATA=$(pwd)/testing/data
           export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data
-          # run tests on all workspace members with default feature list
-          cargo test
+          # run tests on all workspace members with default feature list + avro
+          RUST_MIN_STACK=10485760 cargo test --features=avro
           # test datafusion examples
           cd datafusion-examples
           cargo test --no-default-features
           cargo run --example csv_sql
           cargo run --example parquet_sql
+          cargo run --example avro_sql --features=datafusion/avro
         env:
           CARGO_HOME: "/github/home/.cargo"
           CARGO_TARGET_DIR: "/github/home/target"
diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs
index ee2f656..3671f34 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -32,6 +32,7 @@ use datafusion::dataframe::DataFrame;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::execution::dataframe_impl::DataFrameImpl;
 use datafusion::logical_plan::LogicalPlan;
+use datafusion::physical_plan::avro::AvroReadOptions;
 use datafusion::physical_plan::csv::CsvReadOptions;
 use datafusion::sql::parser::FileType;
 
@@ -125,6 +126,30 @@ impl BallistaContext {
         })
     }
 
+    /// Create a DataFrame representing an Avro table scan
+
+    pub fn read_avro(
+        &self,
+        path: &str,
+        options: AvroReadOptions,
+    ) -> Result<Arc<dyn DataFrame>> {
+        // convert to absolute path because the executor likely has a different working directory
+        let path = PathBuf::from(path);
+        let path = fs::canonicalize(&path)?;
+
+        // use local DataFusion context for now but later this might call the scheduler
+        let mut ctx = {
+            let guard = self.state.lock().unwrap();
+            create_df_ctx_with_ballista_query_planner(
+                &guard.scheduler_host,
+                guard.scheduler_port,
+                guard.config(),
+            )
+        };
+        let df = ctx.read_avro(path.to_str().unwrap(), options)?;
+        Ok(df)
+    }
+
     /// Create a DataFrame representing a Parquet table scan
 
     pub fn read_parquet(&self, path: &str) -> Result<Arc<dyn DataFrame>> {
@@ -193,6 +218,17 @@ impl BallistaContext {
         self.register_table(name, df.as_ref())
     }
 
+    pub fn register_avro(
+        &self,
+        name: &str,
+        path: &str,
+        options: AvroReadOptions,
+    ) -> Result<()> {
+        let df = self.read_avro(path, options)?;
+        self.register_table(name, df.as_ref())?;
+        Ok(())
+    }
+
     /// Create a DataFrame from a SQL statement
     pub fn sql(&self, sql: &str) -> Result<Arc<dyn DataFrame>> {
         let mut ctx = {
@@ -240,6 +276,10 @@ impl BallistaContext {
                     self.register_parquet(name, location)?;
                     Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
                 }
+                FileType::Avro => {
+                    self.register_avro(name, location, AvroReadOptions::default())?;
+                    Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
+                }
                 _ => Err(DataFusionError::NotImplemented(format!(
                     "Unsupported file type {:?}.",
                     file_type
diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index dd9978f..3fc291e 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -152,6 +152,7 @@ enum ScalarFunction {
   SHA384 = 32;
   SHA512 = 33;
   LN = 34;
+  TOTIMESTAMPMILLIS = 35;
 }
 
 message ScalarFunctionNode {
@@ -253,6 +254,7 @@ message LogicalPlanNode {
     WindowNode window = 13;
     AnalyzeNode analyze = 14;
     CrossJoinNode cross_join = 15;
+    AvroTableScanNode avro_scan = 16;
   }
 }
 
@@ -296,6 +298,15 @@ message ParquetTableScanNode {
   repeated LogicalExprNode filters = 4;
 }
 
+message AvroTableScanNode {
+  string table_name = 1;
+  string path = 2;
+  string file_extension = 3;
+  ProjectionColumns projection = 4;
+  Schema schema = 5;
+  repeated LogicalExprNode filters = 6;
+}
+
 message ProjectionNode {
   LogicalPlanNode input = 1;
   repeated LogicalExprNode expr = 2;
@@ -340,6 +351,7 @@ enum FileType{
   NdJson = 0;
   Parquet = 1;
   CSV = 2;
+  Avro = 3;
 }
 
 message AnalyzeNode {
@@ -456,6 +468,7 @@ message PhysicalPlanNode {
     WindowAggExecNode window = 17;
     ShuffleWriterExecNode shuffle_writer = 18;
     CrossJoinExecNode cross_join = 19;
+    AvroScanExecNode avro_scan = 20;
   }
 }
 
@@ -609,6 +622,17 @@ message CsvScanExecNode {
   repeated string filename = 8;
 }
 
+message AvroScanExecNode {
+  string path = 1;
+  repeated uint32 projection = 2;
+  Schema schema = 3;
+  string file_extension = 4;
+  uint32 batch_size = 5;
+
+  // partition filenames
+  repeated string filename = 8;
+}
+
 enum PartitionMode {
   COLLECT_LEFT = 0;
   PARTITIONED = 1;
diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index 38de341..8ffdb65 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -32,6 +32,7 @@ use datafusion::logical_plan::{
     LogicalPlan, LogicalPlanBuilder, Operator,
 };
 use datafusion::physical_plan::aggregates::AggregateFunction;
+use datafusion::physical_plan::avro::AvroReadOptions;
 use datafusion::physical_plan::csv::CsvReadOptions;
 use datafusion::physical_plan::window_functions::BuiltInWindowFunction;
 use datafusion::scalar::ScalarValue;
@@ -171,6 +172,32 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
                 .build()
                 .map_err(|e| e.into())
             }
+            LogicalPlanType::AvroScan(scan) => {
+                let schema: Schema = convert_required!(scan.schema)?;
+                let options = AvroReadOptions {
+                    schema: Some(Arc::new(schema.clone())),
+                    file_extension: &scan.file_extension,
+                };
+
+                let mut projection = None;
+                if let Some(columns) = &scan.projection {
+                    let column_indices = columns
+                        .columns
+                        .iter()
+                        .map(|name| schema.index_of(name))
+                        .collect::<Result<Vec<usize>, _>>()?;
+                    projection = Some(column_indices);
+                }
+
+                LogicalPlanBuilder::scan_avro_with_name(
+                    &scan.path,
+                    options,
+                    projection,
+                    &scan.table_name,
+                )?
+                .build()
+                .map_err(|e| e.into())
+            }
             LogicalPlanType::Sort(sort) => {
                 let input: LogicalPlan = convert_box_required!(sort.input)?;
                 let sort_expr: Vec<Expr> = sort
@@ -1193,6 +1220,7 @@ impl TryFrom<i32> for protobuf::FileType {
             _x if _x == FileType::NdJson as i32 => Ok(FileType::NdJson),
             _x if _x == FileType::Parquet as i32 => Ok(FileType::Parquet),
             _x if _x == FileType::Csv as i32 => Ok(FileType::Csv),
+            _x if _x == FileType::Avro as i32 => Ok(FileType::Avro),
             invalid => Err(BallistaError::General(format!(
                 "Attempted to convert invalid i32 to protobuf::Filetype: {}",
                 invalid
@@ -1209,6 +1237,7 @@ impl Into<datafusion::sql::parser::FileType> for protobuf::FileType {
             protobuf::FileType::NdJson => FileType::NdJson,
             protobuf::FileType::Parquet => FileType::Parquet,
             protobuf::FileType::Csv => FileType::CSV,
+            protobuf::FileType::Avro => FileType::Avro,
         }
     }
 }
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index dbaac1d..ada3c85 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -643,8 +643,12 @@ mod roundtrip_tests {
 
         let df_schema_ref = schema.to_dfschema_ref()?;
 
-        let filetypes: [FileType; 3] =
-            [FileType::NdJson, FileType::Parquet, FileType::CSV];
+        let filetypes: [FileType; 4] = [
+            FileType::NdJson,
+            FileType::Parquet,
+            FileType::CSV,
+            FileType::Avro,
+        ];
 
         for file in filetypes.iter() {
             let create_table_node = LogicalPlan::CreateExternalTable {
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index 10bc63e..e195e82 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -25,6 +25,7 @@ use crate::serde::{protobuf, BallistaError};
 use datafusion::arrow::datatypes::{
     DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit,
 };
+use datafusion::datasource::avro::AvroFile;
 use datafusion::datasource::{CsvFile, PartitionedFile, TableDescriptor};
 use datafusion::logical_plan::{
     window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
@@ -793,6 +794,19 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
                             },
                         )),
                     })
+                } else if let Some(avro) = source.downcast_ref::<AvroFile>() {
+                    Ok(protobuf::LogicalPlanNode {
+                        logical_plan_type: Some(LogicalPlanType::AvroScan(
+                            protobuf::AvroTableScanNode {
+                                table_name: table_name.to_owned(),
+                                path: avro.path().to_owned(),
+                                projection,
+                                schema: Some(schema),
+                                file_extension: avro.file_extension().to_string(),
+                                filters,
+                            },
+                        )),
+                    })
                 } else {
                     Err(BallistaError::General(format!(
                         "logical plan to_proto unsupported table provider {:?}",
@@ -974,6 +988,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
                     FileType::NdJson => protobuf::FileType::NdJson,
                     FileType::Parquet => protobuf::FileType::Parquet,
                     FileType::CSV => protobuf::FileType::Csv,
+                    FileType::Avro => protobuf::FileType::Avro,
                 };
 
                 Ok(protobuf::LogicalPlanNode {
@@ -1098,7 +1113,13 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
                         )
                     }
                 };
-                let arg = &args[0];
+                let arg_expr: Option<Box<protobuf::LogicalExprNode>> = if !args.is_empty()
+                {
+                    let arg = &args[0];
+                    Some(Box::new(arg.try_into()?))
+                } else {
+                    None
+                };
                 let partition_by = partition_by
                     .iter()
                     .map(|e| e.try_into())
@@ -1111,7 +1132,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
                     protobuf::window_expr_node::WindowFrame::Frame(window_frame.into())
                 });
                 let window_expr = Box::new(protobuf::WindowExprNode {
-                    expr: Some(Box::new(arg.try_into()?)),
+                    expr: arg_expr,
                     window_function: Some(window_function),
                     partition_by,
                     order_by,
@@ -1284,7 +1305,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
             Expr::Wildcard => Ok(protobuf::LogicalExprNode {
                 expr_type: Some(protobuf::logical_expr_node::ExprType::Wildcard(true)),
             }),
-            Expr::TryCast { .. } => unimplemented!(),
+            _ => unimplemented!(),
         }
     }
 }
@@ -1473,6 +1494,9 @@ impl TryInto<protobuf::ScalarFunction> for &BuiltinScalarFunction {
             BuiltinScalarFunction::SHA256 => Ok(protobuf::ScalarFunction::Sha256),
             BuiltinScalarFunction::SHA384 => Ok(protobuf::ScalarFunction::Sha384),
             BuiltinScalarFunction::SHA512 => Ok(protobuf::ScalarFunction::Sha512),
+            BuiltinScalarFunction::ToTimestampMillis => {
+                Ok(protobuf::ScalarFunction::Totimestampmillis)
+            }
             _ => Err(BallistaError::General(format!(
                 "logical_plan::to_proto() unsupported scalar function {:?}",
                 self
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 3cd8cf3..0d23372 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -43,6 +43,7 @@ use datafusion::logical_plan::{
     window_frames::WindowFrame, DFSchema, Expr, JoinConstraint, JoinType,
 };
 use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunction};
+use datafusion::physical_plan::avro::{AvroExec, AvroReadOptions};
 use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
 use datafusion::physical_plan::hash_join::PartitionMode;
@@ -153,6 +154,21 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
                     None,
                 )))
             }
+            PhysicalPlanType::AvroScan(scan) => {
+                let schema = Arc::new(convert_required!(scan.schema)?);
+                let options = AvroReadOptions {
+                    schema: Some(schema),
+                    file_extension: &scan.file_extension,
+                };
+                let projection = scan.projection.iter().map(|i| *i as usize).collect();
+                Ok(Arc::new(AvroExec::try_from_path(
+                    &scan.path,
+                    options,
+                    Some(projection),
+                    scan.batch_size as usize,
+                    None,
+                )?))
+            }
             PhysicalPlanType::CoalesceBatches(coalesce_batches) => {
                 let input: Arc<dyn ExecutionPlan> =
                     convert_box_required!(coalesce_batches.input)?;
@@ -544,6 +560,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
             ScalarFunction::Sha384 => BuiltinScalarFunction::SHA384,
             ScalarFunction::Sha512 => BuiltinScalarFunction::SHA512,
             ScalarFunction::Ln => BuiltinScalarFunction::Ln,
+            ScalarFunction::Totimestampmillis => BuiltinScalarFunction::ToTimestampMillis,
         }
     }
 }
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index e7d4ac6..22a49cb 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -62,6 +62,7 @@ use crate::execution_plans::{
 use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
 use crate::serde::scheduler::PartitionLocation;
 use crate::serde::{protobuf, BallistaError};
+use datafusion::physical_plan::avro::AvroExec;
 use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use datafusion::physical_plan::functions::{BuiltinScalarFunction, ScalarFunctionExpr};
 use datafusion::physical_plan::repartition::RepartitionExec;
@@ -285,6 +286,28 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
                     },
                 )),
             })
+        } else if let Some(exec) = plan.downcast_ref::<AvroExec>() {
+            Ok(protobuf::PhysicalPlanNode {
+                physical_plan_type: Some(PhysicalPlanType::AvroScan(
+                    protobuf::AvroScanExecNode {
+                        path: exec.path().to_owned(),
+                        filename: exec.filenames().to_vec(),
+                        projection: exec
+                            .projection()
+                            .ok_or_else(|| {
+                                BallistaError::General(
+                                    "projection in AvroExec doesn't exist.".to_owned(),
+                                )
+                            })?
+                            .iter()
+                            .map(|n| *n as u32)
+                            .collect(),
+                        file_extension: exec.file_extension().to_owned(),
+                        schema: Some(exec.file_schema().as_ref().into()),
+                        batch_size: exec.batch_size() as u32,
+                    },
+                )),
+            })
         } else if let Some(exec) = plan.downcast_ref::<ShuffleReaderExec>() {
             let mut partition = vec![];
             for location in &exec.partition {
diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml
index 9b859c6..113cd5b 100644
--- a/datafusion-examples/Cargo.toml
+++ b/datafusion-examples/Cargo.toml
@@ -27,6 +27,10 @@ keywords = [ "arrow", "query", "sql" ]
 edition = "2018"
 publish = false
 
+[[example]]
+name = "avro_sql"
+path = "examples/avro_sql.rs"
+required-features = ["datafusion/avro"]
 
 [dev-dependencies]
 arrow-flight = { version = "^5.3" }
diff --git a/datafusion-examples/examples/avro_sql.rs b/datafusion-examples/examples/avro_sql.rs
new file mode 100644
index 0000000..e9676a0
--- /dev/null
+++ b/datafusion-examples/examples/avro_sql.rs
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::arrow::util::pretty;
+
+use datafusion::error::Result;
+use datafusion::physical_plan::avro::AvroReadOptions;
+use datafusion::prelude::*;
+
+/// This example demonstrates executing a simple query against an Arrow data source (Avro) and
+/// fetching results
+#[tokio::main]
+async fn main() -> Result<()> {
+    // create local execution context
+    let mut ctx = ExecutionContext::new();
+
+    let testdata = datafusion::arrow::util::test_util::arrow_test_data();
+
+    // register avro file with the execution context
+    let avro_file = &format!("{}/avro/alltypes_plain.avro", testdata);
+    ctx.register_avro("alltypes_plain", avro_file, AvroReadOptions::default())?;
+
+    // execute the query
+    let df = ctx.sql(
+        "SELECT int_col, double_col, CAST(date_string_col as VARCHAR) \
+        FROM alltypes_plain \
+        WHERE id > 1 AND tinyint_col < double_col",
+    )?;
+    let results = df.collect().await?;
+
+    // print the results
+    pretty::print_batches(&results)?;
+
+    Ok(())
+}
diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml
index f30db02..c1998be 100644
--- a/datafusion/Cargo.toml
+++ b/datafusion/Cargo.toml
@@ -44,7 +44,8 @@ regex_expressions = ["regex", "lazy_static"]
 unicode_expressions = ["unicode-segmentation"]
 # Used for testing ONLY: causes all values to hash to the same value (test for collisions)
 force_hash_collisions = []
-
+# Used to enable the avro format
+avro = ["avro-rs", "num-traits"]
 
 [dependencies]
 ahash = "0.7"
@@ -69,6 +70,8 @@ regex = { version = "^1.4.3", optional = true }
 lazy_static = { version = "^1.4.0", optional = true }
 smallvec = { version = "1.6", features = ["union"] }
 rand = "0.8"
+avro-rs = { version = "0.13", features = ["snappy"], optional = true }
+num-traits = { version = "0.2", optional = true }
 
 [dev-dependencies]
 criterion = "0.3"
diff --git a/datafusion/src/avro_to_arrow/arrow_array_reader.rs b/datafusion/src/avro_to_arrow/arrow_array_reader.rs
new file mode 100644
index 0000000..cc8ed8e
--- /dev/null
+++ b/datafusion/src/avro_to_arrow/arrow_array_reader.rs
@@ -0,0 +1,1090 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Avro to Arrow array readers
+
+use crate::arrow::array::{
+    make_array, Array, ArrayBuilder, ArrayData, ArrayDataBuilder, ArrayRef,
+    BooleanBuilder, LargeStringArray, ListBuilder, NullArray, OffsetSizeTrait,
+    PrimitiveArray, PrimitiveBuilder, StringArray, StringBuilder,
+    StringDictionaryBuilder,
+};
+use crate::arrow::buffer::{Buffer, MutableBuffer};
+use crate::arrow::datatypes::{
+    ArrowDictionaryKeyType, ArrowNumericType, ArrowPrimitiveType, DataType, Date32Type,
+    Date64Type, Field, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type,
+    Int8Type, Schema, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
+    Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType,
+    TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, UInt64Type,
+    UInt8Type,
+};
+use crate::arrow::error::ArrowError;
+use crate::arrow::record_batch::RecordBatch;
+use crate::arrow::util::bit_util;
+use crate::error::{DataFusionError, Result};
+use arrow::array::{BinaryArray, GenericListArray};
+use arrow::datatypes::SchemaRef;
+use arrow::error::ArrowError::SchemaError;
+use arrow::error::Result as ArrowResult;
+use avro_rs::{
+    schema::{Schema as AvroSchema, SchemaKind},
+    types::Value,
+    AvroResult, Error as AvroError, Reader as AvroReader,
+};
+use num_traits::NumCast;
+use std::collections::HashMap;
+use std::io::Read;
+use std::sync::Arc;
+
+type RecordSlice<'a> = &'a [Vec<(String, Value)>];
+
+pub struct AvroArrowArrayReader<'a, R: Read> {
+    reader: AvroReader<'a, R>,
+    schema: SchemaRef,
+    projection: Option<Vec<String>>,
+    schema_lookup: HashMap<String, usize>,
+}
+
+impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
+    pub fn try_new(
+        reader: R,
+        schema: SchemaRef,
+        projection: Option<Vec<String>>,
+    ) -> Result<Self> {
+        let reader = AvroReader::new(reader)?;
+        let writer_schema = reader.writer_schema().clone();
+        let schema_lookup = Self::schema_lookup(writer_schema)?;
+        Ok(Self {
+            reader,
+            schema,
+            projection,
+            schema_lookup,
+        })
+    }
+
+    pub fn schema_lookup(schema: AvroSchema) -> Result<HashMap<String, usize>> {
+        match schema {
+            AvroSchema::Record {
+                lookup: ref schema_lookup,
+                ..
+            } => Ok(schema_lookup.clone()),
+            _ => Err(DataFusionError::ArrowError(SchemaError(
+                "expected avro schema to be a record".to_string(),
+            ))),
+        }
+    }
+
+    /// Read the next batch of records
+    #[allow(clippy::should_implement_trait)]
+    pub fn next_batch(&mut self, batch_size: usize) -> ArrowResult<Option<RecordBatch>> {
+        let mut rows = Vec::with_capacity(batch_size);
+        for value in self.reader.by_ref().take(batch_size) {
+            let v = value.map_err(|e| {
+                ArrowError::ParseError(format!("Failed to parse avro value: {:?}", e))
+            })?;
+            match v {
+                Value::Record(v) => {
+                    rows.push(v);
+                }
+                other => {
+                    return Err(ArrowError::ParseError(format!(
+                        "Row needs to be of type object, got: {:?}",
+                        other
+                    )))
+                }
+            }
+        }
+        if rows.is_empty() {
+            // reached end of file
+            return Ok(None);
+        }
+        let rows = &rows[..];
+        let projection = self.projection.clone().unwrap_or_else(Vec::new);
+        let arrays = self.build_struct_array(rows, self.schema.fields(), &projection);
+        let projected_fields: Vec<Field> = if projection.is_empty() {
+            self.schema.fields().to_vec()
+        } else {
+            projection
+                .iter()
+                .map(|name| self.schema.column_with_name(name))
+                .flatten()
+                .map(|(_, field)| field.clone())
+                .collect()
+        };
+        let projected_schema = Arc::new(Schema::new(projected_fields));
+        arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr).map(Some))
+    }
+
+    fn build_boolean_array(
+        &self,
+        rows: RecordSlice,
+        col_name: &str,
+    ) -> ArrowResult<ArrayRef> {
+        let mut builder = BooleanBuilder::new(rows.len());
+        for row in rows {
+            if let Some(value) = self.field_lookup(col_name, row) {
+                if let Some(boolean) = resolve_boolean(&value) {
+                    builder.append_value(boolean)?
+                } else {
+                    builder.append_null()?;
+                }
+            } else {
+                builder.append_null()?;
+            }
+        }
+        Ok(Arc::new(builder.finish()))
+    }
+
+    #[allow(clippy::unnecessary_wraps)]
+    fn build_primitive_array<T: ArrowPrimitiveType + Resolver>(
+        &self,
+        rows: RecordSlice,
+        col_name: &str,
+    ) -> ArrowResult<ArrayRef>
+    where
+        T: ArrowNumericType,
+        T::Native: num_traits::cast::NumCast,
+    {
+        Ok(Arc::new(
+            rows.iter()
+                .map(|row| {
+                    self.field_lookup(col_name, row)
+                        .and_then(|value| resolve_item::<T>(&value))
+                })
+                .collect::<PrimitiveArray<T>>(),
+        ))
+    }
+
+    #[inline(always)]
+    #[allow(clippy::unnecessary_wraps)]
+    fn build_string_dictionary_builder<T>(
+        &self,
+        row_len: usize,
+    ) -> ArrowResult<StringDictionaryBuilder<T>>
+    where
+        T: ArrowPrimitiveType + ArrowDictionaryKeyType,
+    {
+        let key_builder = PrimitiveBuilder::<T>::new(row_len);
+        let values_builder = StringBuilder::new(row_len * 5);
+        Ok(StringDictionaryBuilder::new(key_builder, values_builder))
+    }
+
+    fn build_wrapped_list_array(
+        &self,
+        rows: RecordSlice,
+        col_name: &str,
+        key_type: &DataType,
+    ) -> ArrowResult<ArrayRef> {
+        match *key_type {
+            DataType::Int8 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::Int8),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<Int8Type>(&dtype, col_name, rows)
+            }
+            DataType::Int16 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::Int16),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<Int16Type>(&dtype, col_name, rows)
+            }
+            DataType::Int32 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::Int32),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<Int32Type>(&dtype, col_name, rows)
+            }
+            DataType::Int64 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::Int64),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<Int64Type>(&dtype, col_name, rows)
+            }
+            DataType::UInt8 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::UInt8),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<UInt8Type>(&dtype, col_name, rows)
+            }
+            DataType::UInt16 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::UInt16),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<UInt16Type>(&dtype, col_name, rows)
+            }
+            DataType::UInt32 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::UInt32),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<UInt32Type>(&dtype, col_name, rows)
+            }
+            DataType::UInt64 => {
+                let dtype = DataType::Dictionary(
+                    Box::new(DataType::UInt64),
+                    Box::new(DataType::Utf8),
+                );
+                self.list_array_string_array_builder::<UInt64Type>(&dtype, col_name, rows)
+            }
+            ref e => Err(SchemaError(format!(
+                "Data type is currently not supported for dictionaries in list : {:?}",
+                e
+            ))),
+        }
+    }
+
+    #[inline(always)]
+    fn list_array_string_array_builder<D>(
+        &self,
+        data_type: &DataType,
+        col_name: &str,
+        rows: RecordSlice,
+    ) -> ArrowResult<ArrayRef>
+    where
+        D: ArrowPrimitiveType + ArrowDictionaryKeyType,
+    {
+        let mut builder: Box<dyn ArrayBuilder> = match data_type {
+            DataType::Utf8 => {
+                let values_builder = StringBuilder::new(rows.len() * 5);
+                Box::new(ListBuilder::new(values_builder))
+            }
+            DataType::Dictionary(_, _) => {
+                let values_builder =
+                    self.build_string_dictionary_builder::<D>(rows.len() * 5)?;
+                Box::new(ListBuilder::new(values_builder))
+            }
+            e => {
+                return Err(SchemaError(format!(
+                    "Nested list data builder type is not supported: {:?}",
+                    e
+                )))
+            }
+        };
+
+        for row in rows {
+            if let Some(value) = self.field_lookup(col_name, row) {
+                // value can be an array or a scalar
+                let vals: Vec<Option<String>> = if let Value::String(v) = value {
+                    vec![Some(v.to_string())]
+                } else if let Value::Array(n) = value {
+                    n.into_iter()
+                        .map(|v| resolve_string(&v))
+                        .collect::<ArrowResult<Vec<String>>>()?
+                        .into_iter()
+                        .map(Some)
+                        .collect::<Vec<Option<String>>>()
+                } else if let Value::Null = value {
+                    vec![None]
+                } else if !matches!(value, Value::Record(_)) {
+                    vec![Some(resolve_string(&value)?)]
+                } else {
+                    return Err(SchemaError(
+                        "Only scalars are currently supported in Avro arrays".to_string(),
+                    ));
+                };
+
+                // TODO: ARROW-10335: APIs of dictionary arrays and others are different. Unify
+                // them.
+                match data_type {
+                    DataType::Utf8 => {
+                        let builder = builder
+                            .as_any_mut()
+                            .downcast_mut::<ListBuilder<StringBuilder>>()
+                            .ok_or_else(||ArrowError::SchemaError(
+                                "Cast failed for ListBuilder<StringBuilder> during nested data parsing".to_string(),
+                            ))?;
+                        for val in vals {
+                            if let Some(v) = val {
+                                builder.values().append_value(&v)?
+                            } else {
+                                builder.values().append_null()?
+                            };
+                        }
+
+                        // Append to the list
+                        builder.append(true)?;
+                    }
+                    DataType::Dictionary(_, _) => {
+                        let builder = builder.as_any_mut().downcast_mut::<ListBuilder<StringDictionaryBuilder<D>>>().ok_or_else(||ArrowError::SchemaError(
+                            "Cast failed for ListBuilder<StringDictionaryBuilder> during nested data parsing".to_string(),
+                        ))?;
+                        for val in vals {
+                            if let Some(v) = val {
+                                let _ = builder.values().append(&v)?;
+                            } else {
+                                builder.values().append_null()?
+                            };
+                        }
+
+                        // Append to the list
+                        builder.append(true)?;
+                    }
+                    e => {
+                        return Err(SchemaError(format!(
+                            "Nested list data builder type is not supported: {:?}",
+                            e
+                        )))
+                    }
+                }
+            }
+        }
+
+        Ok(builder.finish() as ArrayRef)
+    }
+
+    #[inline(always)]
+    fn build_dictionary_array<T>(
+        &self,
+        rows: RecordSlice,
+        col_name: &str,
+    ) -> ArrowResult<ArrayRef>
+    where
+        T::Native: num_traits::cast::NumCast,
+        T: ArrowPrimitiveType + ArrowDictionaryKeyType,
+    {
+        let mut builder: StringDictionaryBuilder<T> =
+            self.build_string_dictionary_builder(rows.len())?;
+        for row in rows {
+            if let Some(value) = self.field_lookup(col_name, row) {
+                if let Ok(str_v) = resolve_string(&value) {
+                    builder.append(str_v).map(drop)?
+                } else {
+                    builder.append_null()?
+                }
+            } else {
+                builder.append_null()?
+            }
+        }
+        Ok(Arc::new(builder.finish()) as ArrayRef)
+    }
+
+    #[inline(always)]
+    fn build_string_dictionary_array(
+        &self,
+        rows: RecordSlice,
+        col_name: &str,
+        key_type: &DataType,
+        value_type: &DataType,
+    ) -> ArrowResult<ArrayRef> {
+        if let DataType::Utf8 = *value_type {
+            match *key_type {
+                DataType::Int8 => self.build_dictionary_array::<Int8Type>(rows, col_name),
+                DataType::Int16 => {
+                    self.build_dictionary_array::<Int16Type>(rows, col_name)
+                }
+                DataType::Int32 => {
+                    self.build_dictionary_array::<Int32Type>(rows, col_name)
+                }
+                DataType::Int64 => {
+                    self.build_dictionary_array::<Int64Type>(rows, col_name)
+                }
+                DataType::UInt8 => {
+                    self.build_dictionary_array::<UInt8Type>(rows, col_name)
+                }
+                DataType::UInt16 => {
+                    self.build_dictionary_array::<UInt16Type>(rows, col_name)
+                }
+                DataType::UInt32 => {
+                    self.build_dictionary_array::<UInt32Type>(rows, col_name)
+                }
+                DataType::UInt64 => {
+                    self.build_dictionary_array::<UInt64Type>(rows, col_name)
+                }
+                _ => Err(ArrowError::SchemaError(
+                    "unsupported dictionary key type".to_string(),
+                )),
+            }
+        } else {
+            Err(ArrowError::SchemaError(
+                "dictionary types other than UTF-8 not yet supported".to_string(),
+            ))
+        }
+    }
+
+    /// Build a nested GenericListArray from a list of unnested `Value`s
+    fn build_nested_list_array<OffsetSize: OffsetSizeTrait>(
+        &self,
+        rows: &[Value],
+        list_field: &Field,
+    ) -> ArrowResult<ArrayRef> {
+        // build list offsets
+        let mut cur_offset = OffsetSize::zero();
+        let list_len = rows.len();
+        let num_list_bytes = bit_util::ceil(list_len, 8);
+        let mut offsets = Vec::with_capacity(list_len + 1);
+        let mut list_nulls = MutableBuffer::from_len_zeroed(num_list_bytes);
+        let list_nulls = list_nulls.as_slice_mut();
+        offsets.push(cur_offset);
+        rows.iter().enumerate().for_each(|(i, v)| {
+            // TODO: unboxing Union(Array(Union(...))) should probably be done earlier
+            let v = maybe_resolve_union(v);
+            if let Value::Array(a) = v {
+                cur_offset += OffsetSize::from_usize(a.len()).unwrap();
+                bit_util::set_bit(list_nulls, i);
+            } else if let Value::Null = v {
+                // value is null, not incremented
+            } else {
+                cur_offset += OffsetSize::one();
+            }
+            offsets.push(cur_offset);
+        });
+        let valid_len = cur_offset.to_usize().unwrap();
+        let array_data = match list_field.data_type() {
+            DataType::Null => NullArray::new(valid_len).data().clone(),
+            DataType::Boolean => {
+                let num_bytes = bit_util::ceil(valid_len, 8);
+                let mut bool_values = MutableBuffer::from_len_zeroed(num_bytes);
+                let mut bool_nulls =
+                    MutableBuffer::new(num_bytes).with_bitset(num_bytes, true);
+                let mut curr_index = 0;
+                rows.iter().for_each(|v| {
+                    if let Value::Array(vs) = v {
+                        vs.iter().for_each(|value| {
+                            if let Value::Boolean(child) = value {
+                                // if valid boolean, append value
+                                if *child {
+                                    bit_util::set_bit(
+                                        bool_values.as_slice_mut(),
+                                        curr_index,
+                                    );
+                                }
+                            } else {
+                                // null slot
+                                bit_util::unset_bit(
+                                    bool_nulls.as_slice_mut(),
+                                    curr_index,
+                                );
+                            }
+                            curr_index += 1;
+                        });
+                    }
+                });
+                ArrayData::builder(list_field.data_type().clone())
+                    .len(valid_len)
+                    .add_buffer(bool_values.into())
+                    .null_bit_buffer(bool_nulls.into())
+                    .build()
+            }
+            DataType::Int8 => self.read_primitive_list_values::<Int8Type>(rows),
+            DataType::Int16 => self.read_primitive_list_values::<Int16Type>(rows),
+            DataType::Int32 => self.read_primitive_list_values::<Int32Type>(rows),
+            DataType::Int64 => self.read_primitive_list_values::<Int64Type>(rows),
+            DataType::UInt8 => self.read_primitive_list_values::<UInt8Type>(rows),
+            DataType::UInt16 => self.read_primitive_list_values::<UInt16Type>(rows),
+            DataType::UInt32 => self.read_primitive_list_values::<UInt32Type>(rows),
+            DataType::UInt64 => self.read_primitive_list_values::<UInt64Type>(rows),
+            DataType::Float16 => {
+                return Err(ArrowError::SchemaError("Float16 not supported".to_string()))
+            }
+            DataType::Float32 => self.read_primitive_list_values::<Float32Type>(rows),
+            DataType::Float64 => self.read_primitive_list_values::<Float64Type>(rows),
+            DataType::Timestamp(_, _)
+            | DataType::Date32
+            | DataType::Date64
+            | DataType::Time32(_)
+            | DataType::Time64(_) => {
+                return Err(ArrowError::SchemaError(
+                    "Temporal types are not yet supported, see ARROW-4803".to_string(),
+                ))
+            }
+            DataType::Utf8 => flatten_string_values(rows)
+                .into_iter()
+                .collect::<StringArray>()
+                .data()
+                .clone(),
+            DataType::LargeUtf8 => flatten_string_values(rows)
+                .into_iter()
+                .collect::<LargeStringArray>()
+                .data()
+                .clone(),
+            DataType::List(field) => {
+                let child =
+                    self.build_nested_list_array::<i32>(&flatten_values(rows), field)?;
+                child.data().clone()
+            }
+            DataType::LargeList(field) => {
+                let child =
+                    self.build_nested_list_array::<i64>(&flatten_values(rows), field)?;
+                child.data().clone()
+            }
+            DataType::Struct(fields) => {
+                // extract list values, with non-lists converted to Value::Null
+                let array_item_count = rows
+                    .iter()
+                    .map(|row| match row {
+                        Value::Array(values) => values.len(),
+                        _ => 1,
+                    })
+                    .sum();
+                let num_bytes = bit_util::ceil(array_item_count, 8);
+                let mut null_buffer = MutableBuffer::from_len_zeroed(num_bytes);
+                let mut struct_index = 0;
+                let rows: Vec<Vec<(String, Value)>> = rows
+                    .iter()
+                    .map(|row| {
+                        if let Value::Array(values) = row {
+                            values.iter().for_each(|_| {
+                                bit_util::set_bit(
+                                    null_buffer.as_slice_mut(),
+                                    struct_index,
+                                );
+                                struct_index += 1;
+                            });
+                            values
+                                .iter()
+                                .map(|v| ("".to_string(), v.clone()))
+                                .collect::<Vec<(String, Value)>>()
+                        } else {
+                            struct_index += 1;
+                            vec![("null".to_string(), Value::Null)]
+                        }
+                    })
+                    .collect();
+                let arrays =
+                    self.build_struct_array(rows.as_slice(), fields.as_slice(), &[])?;
+                let data_type = DataType::Struct(fields.clone());
+                let buf = null_buffer.into();
+                ArrayDataBuilder::new(data_type)
+                    .len(rows.len())
+                    .null_bit_buffer(buf)
+                    .child_data(arrays.into_iter().map(|a| a.data().clone()).collect())
+                    .build()
+            }
+            datatype => {
+                return Err(ArrowError::SchemaError(format!(
+                    "Nested list of {:?} not supported",
+                    datatype
+                )));
+            }
+        };
+        // build list
+        let list_data = ArrayData::builder(DataType::List(Box::new(list_field.clone())))
+            .len(list_len)
+            .add_buffer(Buffer::from_slice_ref(&offsets))
+            .add_child_data(array_data)
+            .null_bit_buffer(list_nulls.into())
+            .build();
+        Ok(Arc::new(GenericListArray::<OffsetSize>::from(list_data)))
+    }
+
+    /// Builds the child values of a `StructArray`, falling short of constructing the StructArray.
+    /// The function does not construct the StructArray as some callers would want the child arrays.
+    ///
+    /// *Note*: The function is recursive, and will read nested structs.
+    ///
+    /// If `projection` is not empty, then all values are returned. The first level of projection
+    /// occurs at the `RecordBatch` level. No further projection currently occurs, but would be
+    /// useful if plucking values from a struct, e.g. getting `a.b.c.e` from `a.b.c.{d, e}`.
+    fn build_struct_array(
+        &self,
+        rows: RecordSlice,
+        struct_fields: &[Field],
+        projection: &[String],
+    ) -> ArrowResult<Vec<ArrayRef>> {
+        let arrays: ArrowResult<Vec<ArrayRef>> = struct_fields
+            .iter()
+            .filter(|field| projection.is_empty() || projection.contains(field.name()))
+            .map(|field| {
+                match field.data_type() {
+                    DataType::Null => {
+                        Ok(Arc::new(NullArray::new(rows.len())) as ArrayRef)
+                    }
+                    DataType::Boolean => self.build_boolean_array(rows, field.name()),
+                    DataType::Float64 => {
+                        self.build_primitive_array::<Float64Type>(rows, field.name())
+                    }
+                    DataType::Float32 => {
+                        self.build_primitive_array::<Float32Type>(rows, field.name())
+                    }
+                    DataType::Int64 => {
+                        self.build_primitive_array::<Int64Type>(rows, field.name())
+                    }
+                    DataType::Int32 => {
+                        self.build_primitive_array::<Int32Type>(rows, field.name())
+                    }
+                    DataType::Int16 => {
+                        self.build_primitive_array::<Int16Type>(rows, field.name())
+                    }
+                    DataType::Int8 => {
+                        self.build_primitive_array::<Int8Type>(rows, field.name())
+                    }
+                    DataType::UInt64 => {
+                        self.build_primitive_array::<UInt64Type>(rows, field.name())
+                    }
+                    DataType::UInt32 => {
+                        self.build_primitive_array::<UInt32Type>(rows, field.name())
+                    }
+                    DataType::UInt16 => {
+                        self.build_primitive_array::<UInt16Type>(rows, field.name())
+                    }
+                    DataType::UInt8 => {
+                        self.build_primitive_array::<UInt8Type>(rows, field.name())
+                    }
+                    // TODO: this is incomplete
+                    DataType::Timestamp(unit, _) => match unit {
+                        TimeUnit::Second => self
+                            .build_primitive_array::<TimestampSecondType>(
+                                rows,
+                                field.name(),
+                            ),
+                        TimeUnit::Microsecond => self
+                            .build_primitive_array::<TimestampMicrosecondType>(
+                                rows,
+                                field.name(),
+                            ),
+                        TimeUnit::Millisecond => self
+                            .build_primitive_array::<TimestampMillisecondType>(
+                                rows,
+                                field.name(),
+                            ),
+                        TimeUnit::Nanosecond => self
+                            .build_primitive_array::<TimestampNanosecondType>(
+                                rows,
+                                field.name(),
+                            ),
+                    },
+                    DataType::Date64 => {
+                        self.build_primitive_array::<Date64Type>(rows, field.name())
+                    }
+                    DataType::Date32 => {
+                        self.build_primitive_array::<Date32Type>(rows, field.name())
+                    }
+                    DataType::Time64(unit) => match unit {
+                        TimeUnit::Microsecond => self
+                            .build_primitive_array::<Time64MicrosecondType>(
+                                rows,
+                                field.name(),
+                            ),
+                        TimeUnit::Nanosecond => self
+                            .build_primitive_array::<Time64NanosecondType>(
+                                rows,
+                                field.name(),
+                            ),
+                        t => Err(ArrowError::SchemaError(format!(
+                            "TimeUnit {:?} not supported with Time64",
+                            t
+                        ))),
+                    },
+                    DataType::Time32(unit) => match unit {
+                        TimeUnit::Second => self
+                            .build_primitive_array::<Time32SecondType>(
+                                rows,
+                                field.name(),
+                            ),
+                        TimeUnit::Millisecond => self
+                            .build_primitive_array::<Time32MillisecondType>(
+                                rows,
+                                field.name(),
+                            ),
+                        t => Err(ArrowError::SchemaError(format!(
+                            "TimeUnit {:?} not supported with Time32",
+                            t
+                        ))),
+                    },
+                    DataType::Utf8 | DataType::LargeUtf8 => Ok(Arc::new(
+                        rows.iter()
+                            .map(|row| {
+                                let maybe_value = self.field_lookup(field.name(), row);
+                                maybe_value
+                                    .map(|value| resolve_string(&value))
+                                    .transpose()
+                            })
+                            .collect::<ArrowResult<StringArray>>()?,
+                    )
+                        as ArrayRef),
+                    DataType::Binary | DataType::LargeBinary => Ok(Arc::new(
+                        rows.iter()
+                            .map(|row| {
+                                let maybe_value = self.field_lookup(field.name(), row);
+                                maybe_value.and_then(resolve_bytes)
+                            })
+                            .collect::<BinaryArray>(),
+                    )
+                        as ArrayRef),
+                    DataType::List(ref list_field) => {
+                        match list_field.data_type() {
+                            DataType::Dictionary(ref key_ty, _) => {
+                                self.build_wrapped_list_array(rows, field.name(), key_ty)
+                            }
+                            _ => {
+                                // extract rows by name
+                                let extracted_rows = rows
+                                    .iter()
+                                    .map(|row| {
+                                        self.field_lookup(field.name(), row)
+                                            .unwrap_or(Value::Null)
+                                    })
+                                    .collect::<Vec<Value>>();
+                                self.build_nested_list_array::<i32>(
+                                    extracted_rows.as_slice(),
+                                    list_field,
+                                )
+                            }
+                        }
+                    }
+                    DataType::Dictionary(ref key_ty, ref val_ty) => self
+                        .build_string_dictionary_array(
+                            rows,
+                            field.name(),
+                            key_ty,
+                            val_ty,
+                        ),
+                    DataType::Struct(fields) => {
+                        let len = rows.len();
+                        let num_bytes = bit_util::ceil(len, 8);
+                        let mut null_buffer = MutableBuffer::from_len_zeroed(num_bytes);
+                        let struct_rows = rows
+                            .iter()
+                            .enumerate()
+                            .map(|(i, row)| (i, self.field_lookup(field.name(), row)))
+                            .map(|(i, v)| match v {
+                                // we want the field as an object, if it's not, we treat as null
+                                Some(Value::Record(ref value)) => {
+                                    bit_util::set_bit(null_buffer.as_slice_mut(), i);
+                                    value.clone()
+                                }
+                                _ => vec![],
+                            })
+                            .collect::<Vec<Vec<(String, Value)>>>();
+                        let arrays =
+                            self.build_struct_array(struct_rows.as_slice(), fields, &[])?;
+                        // construct a struct array's data in order to set null buffer
+                        let data_type = DataType::Struct(fields.clone());
+                        let data = ArrayDataBuilder::new(data_type)
+                            .len(len)
+                            .null_bit_buffer(null_buffer.into())
+                            .child_data(
+                                arrays.into_iter().map(|a| a.data().clone()).collect(),
+                            )
+                            .build();
+                        Ok(make_array(data))
+                    }
+                    _ => Err(ArrowError::SchemaError(format!(
+                        "type {:?} not supported",
+                        field.data_type()
+                    ))),
+                }
+            })
+            .collect();
+        arrays
+    }
+
+    /// Read the primitive list's values into ArrayData
+    fn read_primitive_list_values<T>(&self, rows: &[Value]) -> ArrayData
+    where
+        T: ArrowPrimitiveType + ArrowNumericType,
+        T::Native: num_traits::cast::NumCast,
+    {
+        let values = rows
+            .iter()
+            .flat_map(|row| {
+                let row = maybe_resolve_union(row);
+                if let Value::Array(values) = row {
+                    values
+                        .iter()
+                        .map(resolve_item::<T>)
+                        .collect::<Vec<Option<T::Native>>>()
+                } else if let Some(f) = resolve_item::<T>(row) {
+                    vec![Some(f)]
+                } else {
+                    vec![]
+                }
+            })
+            .collect::<Vec<Option<T::Native>>>();
+        let array = values.iter().collect::<PrimitiveArray<T>>();
+        array.data().clone()
+    }
+
+    fn field_lookup(&self, name: &str, row: &[(String, Value)]) -> Option<Value> {
+        self.schema_lookup
+            .get(name)
+            .and_then(|i| row.get(*i))
+            .map(|o| o.1.clone())
+    }
+}
+
+/// Flattens a list of Avro values, by flattening lists, and treating all other values as
+/// single-value lists.
+/// This is used to read into nested lists (list of list, list of struct) and non-dictionary lists.
+#[inline]
+fn flatten_values(values: &[Value]) -> Vec<Value> {
+    values
+        .iter()
+        .flat_map(|row| {
+            if let Value::Array(values) = row {
+                values.clone()
+            } else if let Value::Null = row {
+                vec![Value::Null]
+            } else {
+                // we interpret a scalar as a single-value list to minimise data loss
+                vec![row.clone()]
+            }
+        })
+        .collect()
+}
+
+/// Flattens a list into string values, dropping Value::Null in the process.
+/// This is useful for interpreting any Avro array as string, dropping nulls.
+/// See `value_as_string`.
+#[inline]
+fn flatten_string_values(values: &[Value]) -> Vec<Option<String>> {
+    values
+        .iter()
+        .flat_map(|row| {
+            if let Value::Array(values) = row {
+                values
+                    .iter()
+                    .map(|s| resolve_string(s).ok())
+                    .collect::<Vec<Option<_>>>()
+            } else if let Value::Null = row {
+                vec![]
+            } else {
+                vec![resolve_string(row).ok()]
+            }
+        })
+        .collect::<Vec<Option<_>>>()
+}
+
+/// Reads an Avro value as a string, regardless of its type.
+/// This is useful if the expected datatype is a string, in which case we preserve
+/// all the values regardless of they type.
+fn resolve_string(v: &Value) -> ArrowResult<String> {
+    let v = if let Value::Union(b) = v { b } else { v };
+    match v {
+        Value::String(s) => Ok(s.clone()),
+        Value::Bytes(bytes) => {
+            String::from_utf8(bytes.to_vec()).map_err(AvroError::ConvertToUtf8)
+        }
+        other => Err(AvroError::GetString(other.into())),
+    }
+    .map_err(|e| SchemaError(format!("expected resolvable string : {}", e)))
+}
+
+fn resolve_u8(v: Value) -> AvroResult<u8> {
+    let int = v.resolve(&AvroSchema::Int)?;
+    if let Value::Int(n) = int {
+        if n >= 0 && n <= std::convert::From::from(u8::MAX) {
+            return Ok(n as u8);
+        }
+    }
+
+    Err(AvroError::GetU8(int.into()))
+}
+
+fn resolve_bytes(v: Value) -> Option<Vec<u8>> {
+    let v = if let Value::Union(b) = v { *b } else { v };
+    match v {
+        Value::Bytes(bytes) => Ok(Value::Bytes(bytes)),
+        Value::String(s) => Ok(Value::Bytes(s.into_bytes())),
+        Value::Array(items) => Ok(Value::Bytes(
+            items
+                .into_iter()
+                .map(resolve_u8)
+                .collect::<std::result::Result<Vec<_>, _>>()
+                .ok()?,
+        )),
+        other => Err(AvroError::GetBytes(other.into())),
+    }
+    .ok()
+    .and_then(|v| match v {
+        Value::Bytes(s) => Some(s),
+        _ => None,
+    })
+}
+
+fn resolve_boolean(value: &Value) -> Option<bool> {
+    let v = if let Value::Union(b) = value {
+        b
+    } else {
+        value
+    };
+    match v {
+        Value::Boolean(boolean) => Some(*boolean),
+        _ => None,
+    }
+}
+
+trait Resolver: ArrowPrimitiveType {
+    fn resolve(value: &Value) -> Option<Self::Native>;
+}
+
+fn resolve_item<T: Resolver>(value: &Value) -> Option<T::Native> {
+    T::resolve(value)
+}
+
+fn maybe_resolve_union(value: &Value) -> &Value {
+    if SchemaKind::from(value) == SchemaKind::Union {
+        // Pull out the Union, and attempt to resolve against it.
+        match value {
+            Value::Union(b) => b,
+            _ => unreachable!(),
+        }
+    } else {
+        value
+    }
+}
+
+impl<N> Resolver for N
+where
+    N: ArrowNumericType,
+    N::Native: num_traits::cast::NumCast,
+{
+    fn resolve(value: &Value) -> Option<Self::Native> {
+        let value = maybe_resolve_union(value);
+        match value {
+            Value::Int(i) | Value::TimeMillis(i) | Value::Date(i) => NumCast::from(*i),
+            Value::Long(l)
+            | Value::TimeMicros(l)
+            | Value::TimestampMillis(l)
+            | Value::TimestampMicros(l) => NumCast::from(*l),
+            Value::Float(f) => NumCast::from(*f),
+            Value::Double(f) => NumCast::from(*f),
+            Value::Duration(_d) => unimplemented!(), // shenanigans type
+            Value::Null => None,
+            _ => unreachable!(),
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use crate::arrow::array::Array;
+    use crate::arrow::datatypes::{Field, TimeUnit};
+    use crate::avro_to_arrow::{Reader, ReaderBuilder};
+    use arrow::array::{Int32Array, Int64Array, ListArray, TimestampMicrosecondArray};
+    use arrow::datatypes::DataType;
+    use std::fs::File;
+
+    fn build_reader(name: &str, batch_size: usize) -> Reader<File> {
+        let testdata = crate::test_util::arrow_test_data();
+        let filename = format!("{}/avro/{}", testdata, name);
+        let builder = ReaderBuilder::new()
+            .read_schema()
+            .with_batch_size(batch_size);
+        builder.build(File::open(filename).unwrap()).unwrap()
+    }
+
+    // TODO: Fixed, Enum, Dictionary
+
+    #[test]
+    fn test_time_avro_milliseconds() {
+        let mut reader = build_reader("alltypes_plain.avro", 10);
+        let batch = reader.next().unwrap().unwrap();
+
+        assert_eq!(11, batch.num_columns());
+        assert_eq!(8, batch.num_rows());
+
+        let schema = reader.schema();
+        let batch_schema = batch.schema();
+        assert_eq!(schema, batch_schema);
+
+        let timestamp_col = schema.column_with_name("timestamp_col").unwrap();
+        assert_eq!(
+            &DataType::Timestamp(TimeUnit::Microsecond, None),
+            timestamp_col.1.data_type()
+        );
+        let timestamp_array = batch
+            .column(timestamp_col.0)
+            .as_any()
+            .downcast_ref::<TimestampMicrosecondArray>()
+            .unwrap();
+        for i in 0..timestamp_array.len() {
+            assert!(timestamp_array.is_valid(i));
+        }
+        assert_eq!(1235865600000000, timestamp_array.value(0));
+        assert_eq!(1235865660000000, timestamp_array.value(1));
+        assert_eq!(1238544000000000, timestamp_array.value(2));
+        assert_eq!(1238544060000000, timestamp_array.value(3));
+        assert_eq!(1233446400000000, timestamp_array.value(4));
+        assert_eq!(1233446460000000, timestamp_array.value(5));
+        assert_eq!(1230768000000000, timestamp_array.value(6));
+        assert_eq!(1230768060000000, timestamp_array.value(7));
+    }
+
+    #[test]
+    fn test_avro_read_list() {
+        let mut reader = build_reader("list_columns.avro", 3);
+        let schema = reader.schema();
+        let (col_id_index, _) = schema.column_with_name("int64_list").unwrap();
+        let batch = reader.next().unwrap().unwrap();
+        assert_eq!(batch.num_columns(), 2);
+        assert_eq!(batch.num_rows(), 3);
+        let a_array = batch
+            .column(col_id_index)
+            .as_any()
+            .downcast_ref::<ListArray>()
+            .unwrap();
+        assert_eq!(
+            *a_array.data_type(),
+            DataType::List(Box::new(Field::new("bigint", DataType::Int64, true)))
+        );
+        let array = a_array.value(0);
+        assert_eq!(*array.data_type(), DataType::Int64);
+
+        assert_eq!(
+            6,
+            array
+                .as_any()
+                .downcast_ref::<Int64Array>()
+                .unwrap()
+                .iter()
+                .flatten()
+                .sum::<i64>()
+        );
+    }
+    #[test]
+    fn test_avro_read_nested_list() {
+        let mut reader = build_reader("nested_lists.snappy.avro", 3);
+        let batch = reader.next().unwrap().unwrap();
+        assert_eq!(batch.num_columns(), 2);
+        assert_eq!(batch.num_rows(), 3);
+    }
+
+    #[test]
+    fn test_avro_iterator() {
+        let reader = build_reader("alltypes_plain.avro", 5);
+        let schema = reader.schema();
+        let (col_id_index, _) = schema.column_with_name("id").unwrap();
+
+        let mut sum_num_rows = 0;
+        let mut num_batches = 0;
+        let mut sum_id = 0;
+        for batch in reader {
+            let batch = batch.unwrap();
+            assert_eq!(11, batch.num_columns());
+            sum_num_rows += batch.num_rows();
+            num_batches += 1;
+            let batch_schema = batch.schema();
+            assert_eq!(schema, batch_schema);
+            let a_array = batch
+                .column(col_id_index)
+                .as_any()
+                .downcast_ref::<Int32Array>()
+                .unwrap();
+            sum_id += (0..a_array.len()).map(|i| a_array.value(i)).sum::<i32>();
+        }
+        assert_eq!(8, sum_num_rows);
+        assert_eq!(2, num_batches);
+        assert_eq!(28, sum_id);
+    }
+}
diff --git a/datafusion/src/avro_to_arrow/mod.rs b/datafusion/src/avro_to_arrow/mod.rs
new file mode 100644
index 0000000..531b109
--- /dev/null
+++ b/datafusion/src/avro_to_arrow/mod.rs
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains utilities to manipulate avro metadata.
+
+#[cfg(feature = "avro")]
+mod arrow_array_reader;
+#[cfg(feature = "avro")]
+mod reader;
+#[cfg(feature = "avro")]
+mod schema;
+
+use crate::arrow::datatypes::Schema;
+use crate::error::Result;
+#[cfg(feature = "avro")]
+pub use reader::{Reader, ReaderBuilder};
+use std::io::{Read, Seek};
+
+#[cfg(feature = "avro")]
+/// Read Avro schema given a reader
+pub fn read_avro_schema_from_reader<R: Read + Seek>(reader: &mut R) -> Result<Schema> {
+    let avro_reader = avro_rs::Reader::new(reader)?;
+    let schema = avro_reader.writer_schema();
+    schema::to_arrow_schema(schema)
+}
+
+#[cfg(not(feature = "avro"))]
+/// Read Avro schema given a reader (requires the avro feature)
+pub fn read_avro_schema_from_reader<R: Read + Seek>(_: &mut R) -> Result<Schema> {
+    Err(crate::error::DataFusionError::NotImplemented(
+        "cannot read avro schema without the 'avro' feature enabled".to_string(),
+    ))
+}
diff --git a/datafusion/src/avro_to_arrow/reader.rs b/datafusion/src/avro_to_arrow/reader.rs
new file mode 100644
index 0000000..8baad14
--- /dev/null
+++ b/datafusion/src/avro_to_arrow/reader.rs
@@ -0,0 +1,281 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::arrow_array_reader::AvroArrowArrayReader;
+use crate::arrow::datatypes::SchemaRef;
+use crate::arrow::record_batch::RecordBatch;
+use crate::error::Result;
+use arrow::error::Result as ArrowResult;
+use std::io::{Read, Seek, SeekFrom};
+use std::sync::Arc;
+
+/// Avro file reader builder
+#[derive(Debug)]
+pub struct ReaderBuilder {
+    /// Optional schema for the Avro file
+    ///
+    /// If the schema is not supplied, the reader will try to read the schema.
+    schema: Option<SchemaRef>,
+    /// Batch size (number of records to load each time)
+    ///
+    /// The default batch size when using the `ReaderBuilder` is 1024 records
+    batch_size: usize,
+    /// Optional projection for which columns to load (zero-based column indices)
+    projection: Option<Vec<String>>,
+}
+
+impl Default for ReaderBuilder {
+    fn default() -> Self {
+        Self {
+            schema: None,
+            batch_size: 1024,
+            projection: None,
+        }
+    }
+}
+
+impl ReaderBuilder {
+    /// Create a new builder for configuring Avro parsing options.
+    ///
+    /// To convert a builder into a reader, call `Reader::from_builder`
+    ///
+    /// # Example
+    ///
+    /// ```
+    /// extern crate avro_rs;
+    ///
+    /// use std::fs::File;
+    ///
+    /// fn example() -> crate::datafusion::avro_to_arrow::Reader<'static, File> {
+    ///     let file = File::open("test/data/basic.avro").unwrap();
+    ///
+    ///     // create a builder, inferring the schema with the first 100 records
+    ///     let builder = crate::datafusion::avro_to_arrow::ReaderBuilder::new().read_schema().with_batch_size(100);
+    ///
+    ///     let reader = builder.build::<File>(file).unwrap();
+    ///
+    ///     reader
+    /// }
+    /// ```
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Set the Avro file's schema
+    pub fn with_schema(mut self, schema: SchemaRef) -> Self {
+        self.schema = Some(schema);
+        self
+    }
+
+    /// Set the Avro reader to infer the schema of the file
+    pub fn read_schema(mut self) -> Self {
+        // remove any schema that is set
+        self.schema = None;
+        self
+    }
+
+    /// Set the batch size (number of records to load at one time)
+    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+        self.batch_size = batch_size;
+        self
+    }
+
+    /// Set the reader's column projection
+    pub fn with_projection(mut self, projection: Vec<String>) -> Self {
+        self.projection = Some(projection);
+        self
+    }
+
+    /// Create a new `Reader` from the `ReaderBuilder`
+    pub fn build<'a, R>(self, source: R) -> Result<Reader<'a, R>>
+    where
+        R: Read + Seek,
+    {
+        let mut source = source;
+
+        // check if schema should be inferred
+        let schema = match self.schema {
+            Some(schema) => schema,
+            None => Arc::new(super::read_avro_schema_from_reader(&mut source)?),
+        };
+        source.seek(SeekFrom::Start(0))?;
+        Reader::try_new(source, schema, self.batch_size, self.projection)
+    }
+}
+
+/// Avro file record  reader
+pub struct Reader<'a, R: Read> {
+    array_reader: AvroArrowArrayReader<'a, R>,
+    schema: SchemaRef,
+    batch_size: usize,
+}
+
+impl<'a, R: Read> Reader<'a, R> {
+    /// Create a new Avro Reader from any value that implements the `Read` trait.
+    ///
+    /// If reading a `File`, you can customise the Reader, such as to enable schema
+    /// inference, use `ReaderBuilder`.
+    pub fn try_new(
+        reader: R,
+        schema: SchemaRef,
+        batch_size: usize,
+        projection: Option<Vec<String>>,
+    ) -> Result<Self> {
+        Ok(Self {
+            array_reader: AvroArrowArrayReader::try_new(
+                reader,
+                schema.clone(),
+                projection,
+            )?,
+            schema,
+            batch_size,
+        })
+    }
+
+    /// Returns the schema of the reader, useful for getting the schema without reading
+    /// record batches
+    pub fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Returns the next batch of results (defined by `self.batch_size`), or `None` if there
+    /// are no more results
+    #[allow(clippy::should_implement_trait)]
+    pub fn next(&mut self) -> ArrowResult<Option<RecordBatch>> {
+        self.array_reader.next_batch(self.batch_size)
+    }
+}
+
+impl<'a, R: Read> Iterator for Reader<'a, R> {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.next().transpose()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::arrow::array::*;
+    use crate::arrow::datatypes::{DataType, Field};
+    use arrow::datatypes::TimeUnit;
+    use std::fs::File;
+
+    fn build_reader(name: &str) -> Reader<File> {
+        let testdata = crate::test_util::arrow_test_data();
+        let filename = format!("{}/avro/{}", testdata, name);
+        let builder = ReaderBuilder::new().read_schema().with_batch_size(64);
+        builder.build(File::open(filename).unwrap()).unwrap()
+    }
+
+    fn get_col<'a, T: 'static>(
+        batch: &'a RecordBatch,
+        col: (usize, &Field),
+    ) -> Option<&'a T> {
+        batch.column(col.0).as_any().downcast_ref::<T>()
+    }
+
+    #[test]
+    fn test_avro_basic() {
+        let mut reader = build_reader("alltypes_dictionary.avro");
+        let batch = reader.next().unwrap().unwrap();
+
+        assert_eq!(11, batch.num_columns());
+        assert_eq!(2, batch.num_rows());
+
+        let schema = reader.schema();
+        let batch_schema = batch.schema();
+        assert_eq!(schema, batch_schema);
+
+        let id = schema.column_with_name("id").unwrap();
+        assert_eq!(0, id.0);
+        assert_eq!(&DataType::Int32, id.1.data_type());
+        let col = get_col::<Int32Array>(&batch, id).unwrap();
+        assert_eq!(0, col.value(0));
+        assert_eq!(1, col.value(1));
+        let bool_col = schema.column_with_name("bool_col").unwrap();
+        assert_eq!(1, bool_col.0);
+        assert_eq!(&DataType::Boolean, bool_col.1.data_type());
+        let col = get_col::<BooleanArray>(&batch, bool_col).unwrap();
+        assert!(col.value(0));
+        assert!(!col.value(1));
+        let tinyint_col = schema.column_with_name("tinyint_col").unwrap();
+        assert_eq!(2, tinyint_col.0);
+        assert_eq!(&DataType::Int32, tinyint_col.1.data_type());
+        let col = get_col::<Int32Array>(&batch, tinyint_col).unwrap();
+        assert_eq!(0, col.value(0));
+        assert_eq!(1, col.value(1));
+        let smallint_col = schema.column_with_name("smallint_col").unwrap();
+        assert_eq!(3, smallint_col.0);
+        assert_eq!(&DataType::Int32, smallint_col.1.data_type());
+        let col = get_col::<Int32Array>(&batch, smallint_col).unwrap();
+        assert_eq!(0, col.value(0));
+        assert_eq!(1, col.value(1));
+        let int_col = schema.column_with_name("int_col").unwrap();
+        assert_eq!(4, int_col.0);
+        let col = get_col::<Int32Array>(&batch, int_col).unwrap();
+        assert_eq!(0, col.value(0));
+        assert_eq!(1, col.value(1));
+        assert_eq!(&DataType::Int32, int_col.1.data_type());
+        let col = get_col::<Int32Array>(&batch, int_col).unwrap();
+        assert_eq!(0, col.value(0));
+        assert_eq!(1, col.value(1));
+        let bigint_col = schema.column_with_name("bigint_col").unwrap();
+        assert_eq!(5, bigint_col.0);
+        let col = get_col::<Int64Array>(&batch, bigint_col).unwrap();
+        assert_eq!(0, col.value(0));
+        assert_eq!(10, col.value(1));
+        assert_eq!(&DataType::Int64, bigint_col.1.data_type());
+        let float_col = schema.column_with_name("float_col").unwrap();
+        assert_eq!(6, float_col.0);
+        let col = get_col::<Float32Array>(&batch, float_col).unwrap();
+        assert_eq!(0.0, col.value(0));
+        assert_eq!(1.1, col.value(1));
+        assert_eq!(&DataType::Float32, float_col.1.data_type());
+        let col = get_col::<Float32Array>(&batch, float_col).unwrap();
+        assert_eq!(0.0, col.value(0));
+        assert_eq!(1.1, col.value(1));
+        let double_col = schema.column_with_name("double_col").unwrap();
+        assert_eq!(7, double_col.0);
+        assert_eq!(&DataType::Float64, double_col.1.data_type());
+        let col = get_col::<Float64Array>(&batch, double_col).unwrap();
+        assert_eq!(0.0, col.value(0));
+        assert_eq!(10.1, col.value(1));
+        let date_string_col = schema.column_with_name("date_string_col").unwrap();
+        assert_eq!(8, date_string_col.0);
+        assert_eq!(&DataType::Binary, date_string_col.1.data_type());
+        let col = get_col::<BinaryArray>(&batch, date_string_col).unwrap();
+        assert_eq!("01/01/09".as_bytes(), col.value(0));
+        assert_eq!("01/01/09".as_bytes(), col.value(1));
+        let string_col = schema.column_with_name("string_col").unwrap();
+        assert_eq!(9, string_col.0);
+        assert_eq!(&DataType::Binary, string_col.1.data_type());
+        let col = get_col::<BinaryArray>(&batch, string_col).unwrap();
+        assert_eq!("0".as_bytes(), col.value(0));
+        assert_eq!("1".as_bytes(), col.value(1));
+        let timestamp_col = schema.column_with_name("timestamp_col").unwrap();
+        assert_eq!(10, timestamp_col.0);
+        assert_eq!(
+            &DataType::Timestamp(TimeUnit::Microsecond, None),
+            timestamp_col.1.data_type()
+        );
+        let col = get_col::<TimestampMicrosecondArray>(&batch, timestamp_col).unwrap();
+        assert_eq!(1230768000000000, col.value(0));
+        assert_eq!(1230768060000000, col.value(1));
+    }
+}
diff --git a/datafusion/src/avro_to_arrow/schema.rs b/datafusion/src/avro_to_arrow/schema.rs
new file mode 100644
index 0000000..c2927f0
--- /dev/null
+++ b/datafusion/src/avro_to_arrow/schema.rs
@@ -0,0 +1,464 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::datatypes::{DataType, IntervalUnit, Schema, TimeUnit};
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::Field;
+use avro_rs::schema::Name;
+use avro_rs::types::Value;
+use avro_rs::Schema as AvroSchema;
+use std::collections::BTreeMap;
+use std::convert::TryFrom;
+
+/// Converts an avro schema to an arrow schema
+pub fn to_arrow_schema(avro_schema: &avro_rs::Schema) -> Result<Schema> {
+    let mut schema_fields = vec![];
+    match avro_schema {
+        AvroSchema::Record { fields, .. } => {
+            for field in fields {
+                schema_fields.push(schema_to_field_with_props(
+                    &field.schema,
+                    Some(&field.name),
+                    false,
+                    Some(&external_props(&field.schema)),
+                )?)
+            }
+        }
+        schema => schema_fields.push(schema_to_field(schema, Some(""), false)?),
+    }
+
+    let schema = Schema::new(schema_fields);
+    Ok(schema)
+}
+
+fn schema_to_field(
+    schema: &avro_rs::Schema,
+    name: Option<&str>,
+    nullable: bool,
+) -> Result<Field> {
+    schema_to_field_with_props(schema, name, nullable, None)
+}
+
+fn schema_to_field_with_props(
+    schema: &AvroSchema,
+    name: Option<&str>,
+    nullable: bool,
+    props: Option<&BTreeMap<String, String>>,
+) -> Result<Field> {
+    let mut nullable = nullable;
+    let field_type: DataType = match schema {
+        AvroSchema::Null => DataType::Null,
+        AvroSchema::Boolean => DataType::Boolean,
+        AvroSchema::Int => DataType::Int32,
+        AvroSchema::Long => DataType::Int64,
+        AvroSchema::Float => DataType::Float32,
+        AvroSchema::Double => DataType::Float64,
+        AvroSchema::Bytes => DataType::Binary,
+        AvroSchema::String => DataType::Utf8,
+        AvroSchema::Array(item_schema) => DataType::List(Box::new(
+            schema_to_field_with_props(item_schema, None, false, None)?,
+        )),
+        AvroSchema::Map(value_schema) => {
+            let value_field =
+                schema_to_field_with_props(value_schema, Some("value"), false, None)?;
+            DataType::Dictionary(
+                Box::new(DataType::Utf8),
+                Box::new(value_field.data_type().clone()),
+            )
+        }
+        AvroSchema::Union(us) => {
+            // If there are only two variants and one of them is null, set the other type as the field data type
+            let has_nullable = us.find_schema(&Value::Null).is_some();
+            let sub_schemas = us.variants();
+            if has_nullable && sub_schemas.len() == 2 {
+                nullable = true;
+                if let Some(schema) = sub_schemas
+                    .iter()
+                    .find(|&schema| !matches!(schema, AvroSchema::Null))
+                {
+                    schema_to_field_with_props(schema, None, has_nullable, None)?
+                        .data_type()
+                        .clone()
+                } else {
+                    return Err(DataFusionError::AvroError(
+                        avro_rs::Error::GetUnionDuplicate,
+                    ));
+                }
+            } else {
+                let fields = sub_schemas
+                    .iter()
+                    .map(|s| schema_to_field_with_props(s, None, has_nullable, None))
+                    .collect::<Result<Vec<Field>>>()?;
+                DataType::Union(fields)
+            }
+        }
+        AvroSchema::Record { name, fields, .. } => {
+            let fields: Result<Vec<Field>> = fields
+                .iter()
+                .map(|field| {
+                    let mut props = BTreeMap::new();
+                    if let Some(doc) = &field.doc {
+                        props.insert("avro::doc".to_string(), doc.clone());
+                    }
+                    /*if let Some(aliases) = fields.aliases {
+                        props.insert("aliases", aliases);
+                    }*/
+                    schema_to_field_with_props(
+                        &field.schema,
+                        Some(&format!("{}.{}", name.fullname(None), field.name)),
+                        false,
+                        Some(&props),
+                    )
+                })
+                .collect();
+            DataType::Struct(fields?)
+        }
+        AvroSchema::Enum { symbols, name, .. } => {
+            return Ok(Field::new_dict(
+                &name.fullname(None),
+                index_type(symbols.len()),
+                false,
+                0,
+                false,
+            ))
+        }
+        AvroSchema::Fixed { size, .. } => DataType::FixedSizeBinary(*size as i32),
+        AvroSchema::Decimal {
+            precision, scale, ..
+        } => DataType::Decimal(*precision, *scale),
+        AvroSchema::Uuid => DataType::FixedSizeBinary(16),
+        AvroSchema::Date => DataType::Date32,
+        AvroSchema::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
+        AvroSchema::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
+        AvroSchema::TimestampMillis => DataType::Timestamp(TimeUnit::Millisecond, None),
+        AvroSchema::TimestampMicros => DataType::Timestamp(TimeUnit::Microsecond, None),
+        AvroSchema::Duration => DataType::Duration(TimeUnit::Millisecond),
+    };
+
+    let data_type = field_type.clone();
+    let name = name.unwrap_or_else(|| default_field_name(&data_type));
+
+    let mut field = Field::new(name, field_type, nullable);
+    field.set_metadata(props.cloned());
+    Ok(field)
+}
+
+fn default_field_name(dt: &DataType) -> &str {
+    match dt {
+        DataType::Null => "null",
+        DataType::Boolean => "bit",
+        DataType::Int8 => "tinyint",
+        DataType::Int16 => "smallint",
+        DataType::Int32 => "int",
+        DataType::Int64 => "bigint",
+        DataType::UInt8 => "uint1",
+        DataType::UInt16 => "uint2",
+        DataType::UInt32 => "uint4",
+        DataType::UInt64 => "uint8",
+        DataType::Float16 => "float2",
+        DataType::Float32 => "float4",
+        DataType::Float64 => "float8",
+        DataType::Date32 => "dateday",
+        DataType::Date64 => "datemilli",
+        DataType::Time32(tu) | DataType::Time64(tu) => match tu {
+            TimeUnit::Second => "timesec",
+            TimeUnit::Millisecond => "timemilli",
+            TimeUnit::Microsecond => "timemicro",
+            TimeUnit::Nanosecond => "timenano",
+        },
+        DataType::Timestamp(tu, tz) => {
+            if tz.is_some() {
+                match tu {
+                    TimeUnit::Second => "timestampsectz",
+                    TimeUnit::Millisecond => "timestampmillitz",
+                    TimeUnit::Microsecond => "timestampmicrotz",
+                    TimeUnit::Nanosecond => "timestampnanotz",
+                }
+            } else {
+                match tu {
+                    TimeUnit::Second => "timestampsec",
+                    TimeUnit::Millisecond => "timestampmilli",
+                    TimeUnit::Microsecond => "timestampmicro",
+                    TimeUnit::Nanosecond => "timestampnano",
+                }
+            }
+        }
+        DataType::Duration(_) => "duration",
+        DataType::Interval(unit) => match unit {
+            IntervalUnit::YearMonth => "intervalyear",
+            IntervalUnit::DayTime => "intervalmonth",
+        },
+        DataType::Binary => "varbinary",
+        DataType::FixedSizeBinary(_) => "fixedsizebinary",
+        DataType::LargeBinary => "largevarbinary",
+        DataType::Utf8 => "varchar",
+        DataType::LargeUtf8 => "largevarchar",
+        DataType::List(_) => "list",
+        DataType::FixedSizeList(_, _) => "fixed_size_list",
+        DataType::LargeList(_) => "largelist",
+        DataType::Struct(_) => "struct",
+        DataType::Union(_) => "union",
+        DataType::Dictionary(_, _) => "map",
+        DataType::Decimal(_, _) => "decimal",
+    }
+}
+
+fn index_type(len: usize) -> DataType {
+    if len <= usize::from(u8::MAX) {
+        DataType::Int8
+    } else if len <= usize::from(u16::MAX) {
+        DataType::Int16
+    } else if usize::try_from(u32::MAX).map(|i| len < i).unwrap_or(false) {
+        DataType::Int32
+    } else {
+        DataType::Int64
+    }
+}
+
+fn external_props(schema: &AvroSchema) -> BTreeMap<String, String> {
+    let mut props = BTreeMap::new();
+    match &schema {
+        AvroSchema::Record {
+            doc: Some(ref doc), ..
+        }
+        | AvroSchema::Enum {
+            doc: Some(ref doc), ..
+        } => {
+            props.insert("avro::doc".to_string(), doc.clone());
+        }
+        _ => {}
+    }
+    match &schema {
+        AvroSchema::Record {
+            name:
+                Name {
+                    aliases: Some(aliases),
+                    namespace,
+                    ..
+                },
+            ..
+        }
+        | AvroSchema::Enum {
+            name:
+                Name {
+                    aliases: Some(aliases),
+                    namespace,
+                    ..
+                },
+            ..
+        }
+        | AvroSchema::Fixed {
+            name:
+                Name {
+                    aliases: Some(aliases),
+                    namespace,
+                    ..
+                },
+            ..
+        } => {
+            let aliases: Vec<String> = aliases
+                .iter()
+                .map(|alias| aliased(alias, namespace.as_deref(), None))
+                .collect();
+            props.insert(
+                "avro::aliases".to_string(),
+                format!("[{}]", aliases.join(",")),
+            );
+        }
+        _ => {}
+    }
+    props
+}
+
+#[allow(dead_code)]
+fn get_metadata(
+    _schema: AvroSchema,
+    props: BTreeMap<String, String>,
+) -> BTreeMap<String, String> {
+    let mut metadata: BTreeMap<String, String> = Default::default();
+    metadata.extend(props);
+    metadata
+}
+
+/// Returns the fully qualified name for a field
+pub fn aliased(
+    name: &str,
+    namespace: Option<&str>,
+    default_namespace: Option<&str>,
+) -> String {
+    if name.contains('.') {
+        name.to_string()
+    } else {
+        let namespace = namespace.as_ref().copied().or(default_namespace);
+
+        match namespace {
+            Some(ref namespace) => format!("{}.{}", namespace, name),
+            None => name.to_string(),
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::{aliased, external_props, to_arrow_schema};
+    use crate::arrow::datatypes::DataType::{Binary, Float32, Float64, Timestamp, Utf8};
+    use crate::arrow::datatypes::TimeUnit::Microsecond;
+    use crate::arrow::datatypes::{Field, Schema};
+    use arrow::datatypes::DataType::{Boolean, Int32, Int64};
+    use avro_rs::schema::Name;
+    use avro_rs::Schema as AvroSchema;
+
+    #[test]
+    fn test_alias() {
+        assert_eq!(aliased("foo.bar", None, None), "foo.bar");
+        assert_eq!(aliased("bar", Some("foo"), None), "foo.bar");
+        assert_eq!(aliased("bar", Some("foo"), Some("cat")), "foo.bar");
+        assert_eq!(aliased("bar", None, Some("cat")), "cat.bar");
+    }
+
+    #[test]
+    fn test_external_props() {
+        let record_schema = AvroSchema::Record {
+            name: Name {
+                name: "record".to_string(),
+                namespace: None,
+                aliases: Some(vec!["fooalias".to_string(), "baralias".to_string()]),
+            },
+            doc: Some("record documentation".to_string()),
+            fields: vec![],
+            lookup: Default::default(),
+        };
+        let props = external_props(&record_schema);
+        assert_eq!(
+            props.get("avro::doc"),
+            Some(&"record documentation".to_string())
+        );
+        assert_eq!(
+            props.get("avro::aliases"),
+            Some(&"[fooalias,baralias]".to_string())
+        );
+        let enum_schema = AvroSchema::Enum {
+            name: Name {
+                name: "enum".to_string(),
+                namespace: None,
+                aliases: Some(vec!["fooenum".to_string(), "barenum".to_string()]),
+            },
+            doc: Some("enum documentation".to_string()),
+            symbols: vec![],
+        };
+        let props = external_props(&enum_schema);
+        assert_eq!(
+            props.get("avro::doc"),
+            Some(&"enum documentation".to_string())
+        );
+        assert_eq!(
+            props.get("avro::aliases"),
+            Some(&"[fooenum,barenum]".to_string())
+        );
+        let fixed_schema = AvroSchema::Fixed {
+            name: Name {
+                name: "fixed".to_string(),
+                namespace: None,
+                aliases: Some(vec!["foofixed".to_string(), "barfixed".to_string()]),
+            },
+            size: 1,
+        };
+        let props = external_props(&fixed_schema);
+        assert_eq!(
+            props.get("avro::aliases"),
+            Some(&"[foofixed,barfixed]".to_string())
+        );
+    }
+
+    #[test]
+    fn test_invalid_avro_schema() {}
+
+    #[test]
+    fn test_plain_types_schema() {
+        let schema = AvroSchema::parse_str(
+            r#"
+            {
+              "type" : "record",
+              "name" : "topLevelRecord",
+              "fields" : [ {
+                "name" : "id",
+                "type" : [ "int", "null" ]
+              }, {
+                "name" : "bool_col",
+                "type" : [ "boolean", "null" ]
+              }, {
+                "name" : "tinyint_col",
+                "type" : [ "int", "null" ]
+              }, {
+                "name" : "smallint_col",
+                "type" : [ "int", "null" ]
+              }, {
+                "name" : "int_col",
+                "type" : [ "int", "null" ]
+              }, {
+                "name" : "bigint_col",
+                "type" : [ "long", "null" ]
+              }, {
+                "name" : "float_col",
+                "type" : [ "float", "null" ]
+              }, {
+                "name" : "double_col",
+                "type" : [ "double", "null" ]
+              }, {
+                "name" : "date_string_col",
+                "type" : [ "bytes", "null" ]
+              }, {
+                "name" : "string_col",
+                "type" : [ "bytes", "null" ]
+              }, {
+                "name" : "timestamp_col",
+                "type" : [ {
+                  "type" : "long",
+                  "logicalType" : "timestamp-micros"
+                }, "null" ]
+              } ]
+            }"#,
+        );
+        assert!(schema.is_ok(), "{:?}", schema);
+        let arrow_schema = to_arrow_schema(&schema.unwrap());
+        assert!(arrow_schema.is_ok(), "{:?}", arrow_schema);
+        let expected = Schema::new(vec![
+            Field::new("id", Int32, true),
+            Field::new("bool_col", Boolean, true),
+            Field::new("tinyint_col", Int32, true),
+            Field::new("smallint_col", Int32, true),
+            Field::new("int_col", Int32, true),
+            Field::new("bigint_col", Int64, true),
+            Field::new("float_col", Float32, true),
+            Field::new("double_col", Float64, true),
+            Field::new("date_string_col", Binary, true),
+            Field::new("string_col", Binary, true),
+            Field::new("timestamp_col", Timestamp(Microsecond, None), true),
+        ]);
+        assert_eq!(arrow_schema.unwrap(), expected);
+    }
+
+    #[test]
+    fn test_non_record_schema() {
+        let arrow_schema = to_arrow_schema(&AvroSchema::String);
+        assert!(arrow_schema.is_ok(), "{:?}", arrow_schema);
+        assert_eq!(
+            arrow_schema.unwrap(),
+            Schema::new(vec![Field::new("", Utf8, false)])
+        );
+    }
+}
diff --git a/datafusion/src/datasource/avro.rs b/datafusion/src/datasource/avro.rs
new file mode 100644
index 0000000..ee0fabf
--- /dev/null
+++ b/datafusion/src/datasource/avro.rs
@@ -0,0 +1,424 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Line-delimited Avro data source
+//!
+//! This data source allows Line-delimited Avro records or files to be used as input for queries.
+//!
+
+use std::{
+    any::Any,
+    io::{Read, Seek},
+    sync::{Arc, Mutex},
+};
+
+use arrow::datatypes::SchemaRef;
+
+use crate::physical_plan::avro::{AvroExec, AvroReadOptions};
+use crate::{
+    datasource::{Source, TableProvider},
+    error::{DataFusionError, Result},
+    physical_plan::{common, ExecutionPlan},
+};
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+
+/// Represents a  line-delimited Avro file with a provided schema
+pub struct AvroFile {
+    source: Source<Box<dyn SeekRead + Send + Sync + 'static>>,
+    schema: SchemaRef,
+    file_extension: String,
+}
+
+impl AvroFile {
+    /// Attempt to initialize a `AvroFile` from a path. The schema can be read automatically.
+    pub fn try_new(path: &str, options: AvroReadOptions) -> Result<Self> {
+        let schema = if let Some(schema) = options.schema {
+            schema
+        } else {
+            let filenames =
+                common::build_checked_file_list(path, options.file_extension)?;
+            Arc::new(AvroExec::try_read_schema(&filenames)?)
+        };
+
+        Ok(Self {
+            source: Source::Path(path.to_string()),
+            schema,
+            file_extension: options.file_extension.to_string(),
+        })
+    }
+
+    /// Attempt to initialize a `AvroFile` from a reader. The schema MUST be provided in options
+    pub fn try_new_from_reader<R: Read + Seek + Send + Sync + 'static>(
+        reader: R,
+        options: AvroReadOptions,
+    ) -> Result<Self> {
+        let schema = match options.schema {
+            Some(s) => s,
+            None => {
+                return Err(DataFusionError::Execution(
+                    "Schema must be provided to CsvRead".to_string(),
+                ));
+            }
+        };
+        Ok(Self {
+            source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+            schema,
+            file_extension: String::new(),
+        })
+    }
+
+    /// Attempt to initialize an AvroFile from a reader impls Seek. The schema can be read automatically.
+    pub fn try_new_from_reader_schema<R: Read + Seek + Send + Sync + 'static>(
+        mut reader: R,
+        options: AvroReadOptions,
+    ) -> Result<Self> {
+        let schema = {
+            if let Some(schema) = options.schema {
+                schema
+            } else {
+                Arc::new(crate::avro_to_arrow::read_avro_schema_from_reader(
+                    &mut reader,
+                )?)
+            }
+        };
+
+        Ok(Self {
+            source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+            schema,
+            file_extension: String::new(),
+        })
+    }
+
+    /// Get the path for Avro file(s) represented by this AvroFile instance
+    pub fn path(&self) -> &str {
+        match &self.source {
+            Source::Reader(_) => "",
+            Source::Path(path) => path,
+        }
+    }
+
+    /// Get the file extension for the Avro file(s) represented by this AvroFile instance
+    pub fn file_extension(&self) -> &str {
+        &self.file_extension
+    }
+}
+
+impl TableProvider for AvroFile {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn scan(
+        &self,
+        projection: &Option<Vec<usize>>,
+        batch_size: usize,
+        _filters: &[crate::logical_plan::Expr],
+        limit: Option<usize>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let opts = AvroReadOptions {
+            schema: Some(self.schema.clone()),
+            file_extension: self.file_extension.as_str(),
+        };
+        let batch_size = limit
+            .map(|l| std::cmp::min(l, batch_size))
+            .unwrap_or(batch_size);
+
+        let exec = match &self.source {
+            Source::Reader(maybe_reader) => {
+                if let Some(rdr) = maybe_reader.lock().unwrap().take() {
+                    AvroExec::try_new_from_reader(
+                        rdr,
+                        opts,
+                        projection.clone(),
+                        batch_size,
+                        limit,
+                    )?
+                } else {
+                    return Err(DataFusionError::Execution(
+                        "You can only read once if the data comes from a reader"
+                            .to_string(),
+                    ));
+                }
+            }
+            Source::Path(p) => {
+                AvroExec::try_from_path(p, opts, projection.clone(), batch_size, limit)?
+            }
+        };
+        Ok(Arc::new(exec))
+    }
+}
+
+#[cfg(test)]
+#[cfg(feature = "avro")]
+mod tests {
+    use arrow::array::{
+        BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
+        TimestampMicrosecondArray,
+    };
+    use arrow::record_batch::RecordBatch;
+    use futures::StreamExt;
+
+    use super::*;
+
+    #[tokio::test]
+    async fn read_small_batches() -> Result<()> {
+        let table = load_table("alltypes_plain.avro")?;
+        let projection = None;
+        let exec = table.scan(&projection, 2, &[], None)?;
+        let stream = exec.execute(0).await?;
+
+        let _ = stream
+            .map(|batch| {
+                let batch = batch.unwrap();
+                assert_eq!(11, batch.num_columns());
+                assert_eq!(2, batch.num_rows());
+            })
+            .fold(0, |acc, _| async move { acc + 1i32 })
+            .await;
+
+        Ok(())
+    }
+
+    #[cfg(feature = "avro")]
+    #[tokio::test]
+    async fn read_alltypes_plain_avro() -> Result<()> {
+        let table = load_table("alltypes_plain.avro")?;
+
+        let x: Vec<String> = table
+            .schema()
+            .fields()
+            .iter()
+            .map(|f| format!("{}: {:?}", f.name(), f.data_type()))
+            .collect();
+        let y = x.join("\n");
+        assert_eq!(
+            "id: Int32\n\
+             bool_col: Boolean\n\
+             tinyint_col: Int32\n\
+             smallint_col: Int32\n\
+             int_col: Int32\n\
+             bigint_col: Int64\n\
+             float_col: Float32\n\
+             double_col: Float64\n\
+             date_string_col: Binary\n\
+             string_col: Binary\n\
+             timestamp_col: Timestamp(Microsecond, None)",
+            y
+        );
+
+        let projection = None;
+        let batch = get_first_batch(table, &projection).await?;
+        let expected =  vec![
+            "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+            "| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col  | string_col | timestamp_col       |",
+            "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+            "| 4  | true     | 0           | 0            | 0       | 0          | 0         | 0          | 30332f30312f3039 | 30         | 2009-03-01 00:00:00 |",
+            "| 5  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30332f30312f3039 | 31         | 2009-03-01 00:01:00 |",
+            "| 6  | true     | 0           | 0            | 0       | 0          | 0         | 0          | 30342f30312f3039 | 30         | 2009-04-01 00:00:00 |",
+            "| 7  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30342f30312f3039 | 31         | 2009-04-01 00:01:00 |",
+            "| 2  | true     | 0           | 0            | 0       | 0          | 0         | 0          | 30322f30312f3039 | 30         | 2009-02-01 00:00:00 |",
+            "| 3  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30322f30312f3039 | 31         | 2009-02-01 00:01:00 |",
+            "| 0  | true     | 0           | 0            | 0       | 0          | 0         | 0          | 30312f30312f3039 | 30         | 2009-01-01 00:00:00 |",
+            "| 1  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30312f30312f3039 | 31         | 2009-01-01 00:01:00 |",
+            "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+        ];
+
+        crate::assert_batches_eq!(expected, &[batch]);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn read_bool_alltypes_plain_avro() -> Result<()> {
+        let table = load_table("alltypes_plain.avro")?;
+        let projection = Some(vec![1]);
+        let batch = get_first_batch(table, &projection).await?;
+
+        assert_eq!(1, batch.num_columns());
+        assert_eq!(8, batch.num_rows());
+
+        let array = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<BooleanArray>()
+            .unwrap();
+        let mut values: Vec<bool> = vec![];
+        for i in 0..batch.num_rows() {
+            values.push(array.value(i));
+        }
+
+        assert_eq!(
+            "[true, false, true, false, true, false, true, false]",
+            format!("{:?}", values)
+        );
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn read_i32_alltypes_plain_avro() -> Result<()> {
+        let table = load_table("alltypes_plain.avro")?;
+        let projection = Some(vec![0]);
+        let batch = get_first_batch(table, &projection).await?;
+
+        assert_eq!(1, batch.num_columns());
+        assert_eq!(8, batch.num_rows());
+
+        let array = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        let mut values: Vec<i32> = vec![];
+        for i in 0..batch.num_rows() {
+            values.push(array.value(i));
+        }
+
+        assert_eq!("[4, 5, 6, 7, 2, 3, 0, 1]", format!("{:?}", values));
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn read_i96_alltypes_plain_avro() -> Result<()> {
+        let table = load_table("alltypes_plain.avro")?;
+        let projection = Some(vec![10]);
+        let batch = get_first_batch(table, &projection).await?;
+        assert_eq!(1, batch.num_columns());
+        assert_eq!(8, batch.num_rows());
+
+        let array = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<TimestampMicrosecondArray>()
+            .unwrap();
+        let mut values: Vec<i64> = vec![];
+        for i in 0..batch.num_rows() {
+            values.push(array.value(i));
+        }
+
+        assert_eq!("[1235865600000000, 1235865660000000, 1238544000000000, 1238544060000000, 1233446400000000, 1233446460000000, 1230768000000000, 1230768060000000]", format!("{:?}", values));
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn read_f32_alltypes_plain_avro() -> Result<()> {
+        let table = load_table("alltypes_plain.avro")?;
+        let projection = Some(vec![6]);
+        let batch = get_first_batch(table, &projection).await?;
+
+        assert_eq!(1, batch.num_columns());
+        assert_eq!(8, batch.num_rows());
+
+        let array = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<Float32Array>()
+            .unwrap();
+        let mut values: Vec<f32> = vec![];
+        for i in 0..batch.num_rows() {
+            values.push(array.value(i));
+        }
+
+        assert_eq!(
+            "[0.0, 1.1, 0.0, 1.1, 0.0, 1.1, 0.0, 1.1]",
+            format!("{:?}", values)
+        );
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn read_f64_alltypes_plain_avro() -> Result<()> {
+        let table = load_table("alltypes_plain.avro")?;
+        let projection = Some(vec![7]);
+        let batch = get_first_batch(table, &projection).await?;
+
+        assert_eq!(1, batch.num_columns());
+        assert_eq!(8, batch.num_rows());
+
+        let array = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<Float64Array>()
+            .unwrap();
+        let mut values: Vec<f64> = vec![];
+        for i in 0..batch.num_rows() {
+            values.push(array.value(i));
+        }
+
+        assert_eq!(
+            "[0.0, 10.1, 0.0, 10.1, 0.0, 10.1, 0.0, 10.1]",
+            format!("{:?}", values)
+        );
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn read_binary_alltypes_plain_avro() -> Result<()> {
+        let table = load_table("alltypes_plain.avro")?;
+        let projection = Some(vec![9]);
+        let batch = get_first_batch(table, &projection).await?;
+
+        assert_eq!(1, batch.num_columns());
+        assert_eq!(8, batch.num_rows());
+
+        let array = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<BinaryArray>()
+            .unwrap();
+        let mut values: Vec<&str> = vec![];
+        for i in 0..batch.num_rows() {
+            values.push(std::str::from_utf8(array.value(i)).unwrap());
+        }
+
+        assert_eq!(
+            "[\"0\", \"1\", \"0\", \"1\", \"0\", \"1\", \"0\", \"1\"]",
+            format!("{:?}", values)
+        );
+
+        Ok(())
+    }
+
+    fn load_table(name: &str) -> Result<Arc<dyn TableProvider>> {
+        let testdata = crate::test_util::arrow_test_data();
+        let filename = format!("{}/avro/{}", testdata, name);
+        let table = AvroFile::try_new(&filename, AvroReadOptions::default())?;
+        Ok(Arc::new(table))
+    }
+
+    async fn get_first_batch(
+        table: Arc<dyn TableProvider>,
+        projection: &Option<Vec<usize>>,
+    ) -> Result<RecordBatch> {
+        let exec = table.scan(projection, 1024, &[], None)?;
+        let mut it = exec.execute(0).await?;
+        it.next()
+            .await
+            .expect("should have received at least one batch")
+            .map_err(|e| e.into())
+    }
+}
diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs
index 53ba517..cfa9003 100644
--- a/datafusion/src/datasource/mod.rs
+++ b/datafusion/src/datasource/mod.rs
@@ -17,6 +17,7 @@
 
 //! DataFusion data sources
 
+pub mod avro;
 pub mod csv;
 pub mod datasource;
 pub mod empty;
diff --git a/datafusion/src/error.rs b/datafusion/src/error.rs
index 903faea..6b6bb13 100644
--- a/datafusion/src/error.rs
+++ b/datafusion/src/error.rs
@@ -23,6 +23,8 @@ use std::io;
 use std::result;
 
 use arrow::error::ArrowError;
+#[cfg(feature = "avro")]
+use avro_rs::Error as AvroError;
 use parquet::errors::ParquetError;
 use sqlparser::parser::ParserError;
 
@@ -37,6 +39,9 @@ pub enum DataFusionError {
     ArrowError(ArrowError),
     /// Wraps an error from the Parquet crate
     ParquetError(ParquetError),
+    /// Wraps an error from the Avro crate
+    #[cfg(feature = "avro")]
+    AvroError(AvroError),
     /// Error associated to I/O operations and associated traits.
     IoError(io::Error),
     /// Error returned when SQL is syntactically incorrect.
@@ -83,6 +88,13 @@ impl From<ParquetError> for DataFusionError {
     }
 }
 
+#[cfg(feature = "avro")]
+impl From<AvroError> for DataFusionError {
+    fn from(e: AvroError) -> Self {
+        DataFusionError::AvroError(e)
+    }
+}
+
 impl From<ParserError> for DataFusionError {
     fn from(e: ParserError) -> Self {
         DataFusionError::SQL(e)
@@ -96,6 +108,10 @@ impl Display for DataFusionError {
             DataFusionError::ParquetError(ref desc) => {
                 write!(f, "Parquet error: {}", desc)
             }
+            #[cfg(feature = "avro")]
+            DataFusionError::AvroError(ref desc) => {
+                write!(f, "Avro error: {}", desc)
+            }
             DataFusionError::IoError(ref desc) => write!(f, "IO error: {}", desc),
             DataFusionError::SQL(ref desc) => {
                 write!(f, "SQL error: {:?}", desc)
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index 82947aa..5327c58 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -67,6 +67,8 @@ use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
 use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
 use crate::physical_optimizer::repartition::Repartition;
 
+use crate::datasource::avro::AvroFile;
+use crate::physical_plan::avro::AvroReadOptions;
 use crate::physical_plan::csv::CsvReadOptions;
 use crate::physical_plan::planner::DefaultPhysicalPlanner;
 use crate::physical_plan::udf::ScalarUDF;
@@ -197,6 +199,11 @@ impl ExecutionContext {
                     let plan = LogicalPlanBuilder::empty(false).build()?;
                     Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
                 }
+                FileType::Avro => {
+                    self.register_avro(name, location, AvroReadOptions::default())?;
+                    let plan = LogicalPlanBuilder::empty(false).build()?;
+                    Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
+                }
                 _ => Err(DataFusionError::NotImplemented(format!(
                     "Unsupported file type {:?}.",
                     file_type
@@ -271,6 +278,19 @@ impl ExecutionContext {
             .insert(f.name.clone(), Arc::new(f));
     }
 
+    /// Creates a DataFrame for reading an Avro data source.
+
+    pub fn read_avro(
+        &mut self,
+        filename: impl Into<String>,
+        options: AvroReadOptions,
+    ) -> Result<Arc<dyn DataFrame>> {
+        Ok(Arc::new(DataFrameImpl::new(
+            self.state.clone(),
+            &LogicalPlanBuilder::scan_avro(filename, options, None)?.build()?,
+        )))
+    }
+
     /// Creates a DataFrame for reading a CSV data source.
     pub fn read_csv(
         &mut self,
@@ -334,6 +354,18 @@ impl ExecutionContext {
         Ok(())
     }
 
+    /// Registers an Avro data source so that it can be referenced from SQL statements
+    /// executed against this context.
+    pub fn register_avro(
+        &mut self,
+        name: &str,
+        filename: &str,
+        options: AvroReadOptions,
+    ) -> Result<()> {
+        self.register_table(name, Arc::new(AvroFile::try_new(filename, options)?))?;
+        Ok(())
+    }
+
     /// Registers a named catalog using a custom `CatalogProvider` so that
     /// it can be referenced from SQL statements executed against this
     /// context.
diff --git a/datafusion/src/lib.rs b/datafusion/src/lib.rs
index eac9b5f..adaca11 100644
--- a/datafusion/src/lib.rs
+++ b/datafusion/src/lib.rs
@@ -212,6 +212,7 @@
 
 extern crate sqlparser;
 
+pub mod avro_to_arrow;
 pub mod catalog;
 pub mod dataframe;
 pub mod datasource;
diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs
index f31dd37..5555939 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -36,10 +36,12 @@ use crate::{
 
 use super::dfschema::ToDFSchema;
 use super::{exprlist_to_fields, Expr, JoinConstraint, JoinType, LogicalPlan, PlanType};
+use crate::datasource::avro::AvroFile;
 use crate::logical_plan::{
     columnize_expr, normalize_col, normalize_cols, Column, DFField, DFSchema,
     DFSchemaRef, Partitioning,
 };
+use crate::physical_plan::avro::AvroReadOptions;
 
 /// Default table name for unnamed table
 pub const UNNAMED_TABLE: &str = "?table?";
@@ -154,6 +156,27 @@ impl LogicalPlanBuilder {
         Self::scan(table_name, provider, projection)
     }
 
+    /// Scan an Avro data source
+    pub fn scan_avro(
+        path: impl Into<String>,
+        options: AvroReadOptions,
+        projection: Option<Vec<usize>>,
+    ) -> Result<Self> {
+        let path = path.into();
+        Self::scan_avro_with_name(path.clone(), options, projection, path)
+    }
+
+    /// Scan an Avro data source and register it with a given table name
+    pub fn scan_avro_with_name(
+        path: impl Into<String>,
+        options: AvroReadOptions,
+        projection: Option<Vec<usize>>,
+        table_name: impl Into<String>,
+    ) -> Result<Self> {
+        let provider = Arc::new(AvroFile::try_new(&path.into(), options)?);
+        Self::scan(table_name, provider, projection)
+    }
+
     /// Scan an empty data source, mainly used in tests
     pub fn scan_empty(
         name: Option<&str>,
diff --git a/datafusion/src/logical_plan/dfschema.rs b/datafusion/src/logical_plan/dfschema.rs
index c067b5f..1ef8ac7 100644
--- a/datafusion/src/logical_plan/dfschema.rs
+++ b/datafusion/src/logical_plan/dfschema.rs
@@ -167,7 +167,6 @@ impl DFSchema {
                 (None, Some(_)) | (None, None) => field.name() == name,
             })
             .map(|(idx, _)| idx);
-
         match matches.next() {
             None => Err(DataFusionError::Plan(format!(
                 "No field named '{}.{}'. Valid fields are {}.",
diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs
index ec017d0..eb46099 100644
--- a/datafusion/src/logical_plan/expr.rs
+++ b/datafusion/src/logical_plan/expr.rs
@@ -1911,17 +1911,13 @@ mod tests {
     impl ExprRewriter for FooBarRewriter {
         fn mutate(&mut self, expr: Expr) -> Result<Expr> {
             match expr {
-                Expr::Literal(scalar) => {
-                    if let ScalarValue::Utf8(Some(utf8_val)) = scalar {
-                        let utf8_val = if utf8_val == "foo" {
-                            "bar".to_string()
-                        } else {
-                            utf8_val
-                        };
-                        Ok(lit(utf8_val))
+                Expr::Literal(ScalarValue::Utf8(Some(utf8_val))) => {
+                    let utf8_val = if utf8_val == "foo" {
+                        "bar".to_string()
                     } else {
-                        Ok(Expr::Literal(scalar))
-                    }
+                        utf8_val
+                    };
+                    Ok(lit(utf8_val))
                 }
                 // otherwise, return the expression unchanged
                 expr => Ok(expr),
diff --git a/datafusion/src/physical_plan/avro.rs b/datafusion/src/physical_plan/avro.rs
new file mode 100644
index 0000000..3f0b007
--- /dev/null
+++ b/datafusion/src/physical_plan/avro.rs
@@ -0,0 +1,457 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for reading line-delimited Avro files
+#[cfg(feature = "avro")]
+use super::RecordBatchStream;
+use super::{common, source::Source, ExecutionPlan, Partitioning};
+use crate::avro_to_arrow::read_avro_schema_from_reader;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{DisplayFormatType, Statistics};
+use arrow::datatypes::{Schema, SchemaRef};
+#[cfg(feature = "avro")]
+use arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
+use async_trait::async_trait;
+#[cfg(feature = "avro")]
+use futures::Stream;
+use std::fs::File;
+use std::{any::Any, io::Seek};
+use std::{
+    io::Read,
+    sync::{Arc, Mutex},
+};
+#[cfg(feature = "avro")]
+use std::{
+    pin::Pin,
+    task::{Context, Poll},
+};
+
+/// Line-delimited Avro read options
+#[derive(Clone)]
+pub struct AvroReadOptions<'a> {
+    /// The data source schema.
+    pub schema: Option<SchemaRef>,
+
+    /// File extension; only files with this extension are selected for data input.
+    /// Defaults to ".avro".
+    pub file_extension: &'a str,
+}
+
+impl<'a> Default for AvroReadOptions<'a> {
+    fn default() -> Self {
+        Self {
+            schema: None,
+            file_extension: ".avro",
+        }
+    }
+}
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+/// Execution plan for scanning Avro data source
+#[derive(Debug)]
+pub struct AvroExec {
+    source: Source<Box<dyn SeekRead + Send + Sync>>,
+    schema: SchemaRef,
+    projection: Option<Vec<usize>>,
+    projected_schema: SchemaRef,
+    file_extension: String,
+    batch_size: usize,
+    limit: Option<usize>,
+}
+
+impl AvroExec {
+    /// Create a new execution plan for reading from a path
+    pub fn try_from_path(
+        path: &str,
+        options: AvroReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let file_extension = options.file_extension.to_string();
+
+        let filenames = common::build_file_list(path, &file_extension)?;
+
+        if filenames.is_empty() {
+            return Err(DataFusionError::Execution(format!(
+                "No files found at {path} with file extension {file_extension}",
+                path = path,
+                file_extension = file_extension.as_str()
+            )));
+        }
+
+        let schema = match options.schema {
+            Some(s) => s,
+            None => Arc::new(AvroExec::try_read_schema(filenames.as_slice())?),
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::PartitionedFiles {
+                path: path.to_string(),
+                filenames,
+            },
+            schema,
+            projected_schema,
+            file_extension,
+            projection,
+            batch_size,
+            limit,
+        })
+    }
+    /// Create a new execution plan for reading from a reader
+    pub fn try_new_from_reader(
+        reader: impl Read + Seek + Send + Sync + 'static,
+        options: AvroReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let schema = match options.schema {
+            Some(s) => s,
+            None => {
+                return Err(DataFusionError::Execution(
+                    "The schema must be provided in options when reading from a reader"
+                        .to_string(),
+                ));
+            }
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+            schema,
+            file_extension: String::new(),
+            projection,
+            projected_schema,
+            batch_size,
+            limit,
+        })
+    }
+
+    /// Path to directory containing partitioned CSV files with the same schema
+    pub fn path(&self) -> &str {
+        self.source.path()
+    }
+
+    /// The individual files under path
+    pub fn filenames(&self) -> &[String] {
+        self.source.filenames()
+    }
+
+    /// File extension
+    pub fn file_extension(&self) -> &str {
+        &self.file_extension
+    }
+
+    /// Get the schema of the avro file
+    pub fn file_schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Optional projection for which columns to load
+    pub fn projection(&self) -> Option<&Vec<usize>> {
+        self.projection.as_ref()
+    }
+
+    /// Batch size
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Limit
+    pub fn limit(&self) -> Option<usize> {
+        self.limit
+    }
+
+    /// Read schema for given Avro dataset
+    pub fn try_read_schema(filenames: &[String]) -> Result<Schema> {
+        let mut schemas = Vec::new();
+        for filename in filenames {
+            let mut file = File::open(filename)?;
+            let schema = read_avro_schema_from_reader(&mut file)?;
+            schemas.push(schema);
+        }
+
+        Ok(Schema::try_merge(schemas)?)
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for AvroExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.projected_schema.clone()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(match &self.source {
+            Source::PartitionedFiles { filenames, .. } => filenames.len(),
+            Source::Reader(_) => 1,
+        })
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        Vec::new()
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if !children.is_empty() {
+            Err(DataFusionError::Internal(format!(
+                "Children cannot be replaced in {:?}",
+                self
+            )))
+        } else if let Source::PartitionedFiles { filenames, path } = &self.source {
+            Ok(Arc::new(Self {
+                source: Source::PartitionedFiles {
+                    filenames: filenames.clone(),
+                    path: path.clone(),
+                },
+                schema: self.schema.clone(),
+                projection: self.projection.clone(),
+                projected_schema: self.projected_schema.clone(),
+                batch_size: self.batch_size,
+                limit: self.limit,
+                file_extension: self.file_extension.clone(),
+            }))
+        } else {
+            Err(DataFusionError::Internal(
+                "AvroExec with reader source cannot be used with `with_new_children`"
+                    .to_string(),
+            ))
+        }
+    }
+
+    #[cfg(not(feature = "avro"))]
+    async fn execute(
+        &self,
+        _partition: usize,
+    ) -> Result<super::SendableRecordBatchStream> {
+        Err(DataFusionError::NotImplemented(
+            "Cannot execute avro plan without avro feature enabled".to_string(),
+        ))
+    }
+
+    #[cfg(feature = "avro")]
+    async fn execute(
+        &self,
+        partition: usize,
+    ) -> Result<super::SendableRecordBatchStream> {
+        let mut builder = crate::avro_to_arrow::ReaderBuilder::new()
+            .with_schema(self.schema.clone())
+            .with_batch_size(self.batch_size);
+        if let Some(proj) = &self.projection {
+            builder = builder.with_projection(
+                proj.iter()
+                    .map(|col_idx| self.schema.field(*col_idx).name())
+                    .cloned()
+                    .collect(),
+            );
+        }
+        match &self.source {
+            Source::PartitionedFiles { filenames, .. } => {
+                let file = File::open(&filenames[partition])?;
+
+                Ok(Box::pin(AvroStream::new(builder.build(file)?, self.limit)))
+            }
+            Source::Reader(rdr) => {
+                if partition != 0 {
+                    Err(DataFusionError::Internal(
+                        "Only partition 0 is valid when Avro comes from a reader"
+                            .to_string(),
+                    ))
+                } else if let Some(rdr) = rdr.lock().unwrap().take() {
+                    Ok(Box::pin(AvroStream::new(builder.build(rdr)?, self.limit)))
+                } else {
+                    Err(DataFusionError::Execution(
+                        "Error reading Avro: Data can only be read a single time when the source is a reader"
+                            .to_string(),
+                    ))
+                }
+            }
+        }
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(
+                    f,
+                    "AvroExec: source={}, batch_size={}, limit={:?}",
+                    self.source, self.batch_size, self.limit
+                )
+            }
+        }
+    }
+
+    fn statistics(&self) -> Statistics {
+        Statistics::default()
+    }
+}
+
+#[cfg(feature = "avro")]
+struct AvroStream<'a, R: Read> {
+    reader: crate::avro_to_arrow::Reader<'a, R>,
+    remain: Option<usize>,
+}
+
+#[cfg(feature = "avro")]
+impl<'a, R: Read> AvroStream<'a, R> {
+    fn new(reader: crate::avro_to_arrow::Reader<'a, R>, limit: Option<usize>) -> Self {
+        Self {
+            reader,
+            remain: limit,
+        }
+    }
+}
+
+#[cfg(feature = "avro")]
+impl<R: Read + Unpin> Stream for AvroStream<'_, R> {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        if let Some(remain) = self.remain.as_mut() {
+            if *remain < 1 {
+                return Poll::Ready(None);
+            }
+        }
+
+        Poll::Ready(match self.reader.next() {
+            Ok(Some(item)) => {
+                if let Some(remain) = self.remain.as_mut() {
+                    if *remain >= item.num_rows() {
+                        *remain -= item.num_rows();
+                        Some(Ok(item))
+                    } else {
+                        let len = *remain;
+                        *remain = 0;
+                        Some(Ok(RecordBatch::try_new(
+                            item.schema(),
+                            item.columns()
+                                .iter()
+                                .map(|column| column.slice(0, len))
+                                .collect(),
+                        )?))
+                    }
+                } else {
+                    Some(Ok(item))
+                }
+            }
+            Ok(None) => None,
+            Err(err) => Some(Err(err)),
+        })
+    }
+}
+
+#[cfg(feature = "avro")]
+impl<R: Read + Unpin> RecordBatchStream for AvroStream<'_, R> {
+    fn schema(&self) -> SchemaRef {
+        self.reader.schema()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[tokio::test]
+    #[cfg(feature = "avro")]
+    async fn test() -> Result<()> {
+        use futures::StreamExt;
+
+        let testdata = crate::test_util::arrow_test_data();
+        let filename = format!("{}/avro/alltypes_plain.avro", testdata);
+        let avro_exec = AvroExec::try_from_path(
+            &filename,
+            AvroReadOptions::default(),
+            Some(vec![0, 1, 2]),
+            1024,
+            None,
+        )?;
+        assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
+
+        let mut results = avro_exec.execute(0).await?;
+        let batch = results.next().await.unwrap()?;
+
+        assert_eq!(8, batch.num_rows());
+        assert_eq!(3, batch.num_columns());
+
+        let schema = batch.schema();
+        let field_names: Vec<&str> =
+            schema.fields().iter().map(|f| f.name().as_str()).collect();
+        assert_eq!(vec!["id", "bool_col", "tinyint_col"], field_names);
+
+        let batch = results.next().await;
+        assert!(batch.is_none());
+
+        let batch = results.next().await;
+        assert!(batch.is_none());
+
+        let batch = results.next().await;
+        assert!(batch.is_none());
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    #[cfg(not(feature = "avro"))]
+    async fn test() -> Result<()> {
+        let testdata = crate::test_util::arrow_test_data();
+        let filename = format!("{}/avro/alltypes_plain.avro", testdata);
+        let avro_exec = AvroExec::try_from_path(
+            &filename,
+            AvroReadOptions::default(),
+            Some(vec![0, 1, 2]),
+            1024,
+            None,
+        );
+        assert!(matches!(
+            avro_exec,
+            Err(DataFusionError::NotImplemented(msg))
+            if msg == *"cannot read avro schema without the 'avro' feature enabled"
+        ));
+
+        Ok(())
+    }
+}
diff --git a/datafusion/src/physical_plan/common.rs b/datafusion/src/physical_plan/common.rs
index d0b7a07..3be9e72 100644
--- a/datafusion/src/physical_plan/common.rs
+++ b/datafusion/src/physical_plan/common.rs
@@ -108,6 +108,20 @@ pub(crate) fn combine_batches(
 }
 
 /// Recursively builds a list of files in a directory with a given extension
+pub fn build_checked_file_list(dir: &str, ext: &str) -> Result<Vec<String>> {
+    let mut filenames: Vec<String> = Vec::new();
+    build_file_list_recurse(dir, &mut filenames, ext)?;
+    if filenames.is_empty() {
+        return Err(DataFusionError::Plan(format!(
+            "No files found at {path} with file extension {file_extension}",
+            path = dir,
+            file_extension = ext
+        )));
+    }
+    Ok(filenames)
+}
+
+/// Recursively builds a list of files in a directory with a given extension
 pub fn build_file_list(dir: &str, ext: &str) -> Result<Vec<String>> {
     let mut filenames: Vec<String> = Vec::new();
     build_file_list_recurse(dir, &mut filenames, ext)?;
diff --git a/datafusion/src/physical_plan/datetime_expressions.rs b/datafusion/src/physical_plan/datetime_expressions.rs
index 39ae70d..a776c42f 100644
--- a/datafusion/src/physical_plan/datetime_expressions.rs
+++ b/datafusion/src/physical_plan/datetime_expressions.rs
@@ -236,14 +236,8 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
     let f = |x: Option<i64>| x.map(|x| date_trunc_single(granularity, x)).transpose();
 
     Ok(match array {
-        ColumnarValue::Scalar(scalar) => {
-            if let ScalarValue::TimestampNanosecond(v) = scalar {
-                ColumnarValue::Scalar(ScalarValue::TimestampNanosecond((f)(*v)?))
-            } else {
-                return Err(DataFusionError::Execution(
-                    "array of `date_trunc` must be non-null scalar Utf8".to_string(),
-                ));
-            }
+        ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v)) => {
+            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond((f)(*v)?))
         }
         ColumnarValue::Array(array) => {
             let array = array
@@ -257,6 +251,11 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
 
             ColumnarValue::Array(Arc::new(array))
         }
+        _ => {
+            return Err(DataFusionError::Execution(
+                "array of `date_trunc` must be non-null scalar Utf8".to_string(),
+            ));
+        }
     })
 }
 
diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs
index af86887..3701e90 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -603,6 +603,7 @@ pub trait Accumulator: Send + Sync + Debug {
 pub mod aggregates;
 pub mod analyze;
 pub mod array_expressions;
+pub mod avro;
 pub mod coalesce_batches;
 pub mod coalesce_partitions;
 pub mod common;
diff --git a/datafusion/src/physical_plan/source.rs b/datafusion/src/physical_plan/source.rs
index 012405a..32fa9c3 100644
--- a/datafusion/src/physical_plan/source.rs
+++ b/datafusion/src/physical_plan/source.rs
@@ -46,7 +46,7 @@ impl<R> std::fmt::Debug for Source<R> {
         Ok(())
     }
 }
-impl std::fmt::Display for Source {
+impl<R> std::fmt::Display for Source<R> {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         match self {
             Source::PartitionedFiles { path, filenames } => {
diff --git a/datafusion/src/physical_plan/string_expressions.rs b/datafusion/src/physical_plan/string_expressions.rs
index 09e19c4..7cbebce 100644
--- a/datafusion/src/physical_plan/string_expressions.rs
+++ b/datafusion/src/physical_plan/string_expressions.rs
@@ -290,6 +290,7 @@ pub fn concat(args: &[ColumnarValue]) -> Result<ColumnarValue> {
             .map(|index| {
                 let mut owned_string: String = "".to_owned();
                 for arg in args {
+                    #[allow(clippy::collapsible_match)]
                     match arg {
                         ColumnarValue::Scalar(ScalarValue::Utf8(maybe_value)) => {
                             if let Some(value) = maybe_value {
diff --git a/datafusion/src/sql/parser.rs b/datafusion/src/sql/parser.rs
index bb2f9e6..864801c 100644
--- a/datafusion/src/sql/parser.rs
+++ b/datafusion/src/sql/parser.rs
@@ -43,6 +43,8 @@ pub enum FileType {
     Parquet,
     /// Comma separated values
     CSV,
+    /// Avro binary records
+    Avro,
 }
 
 impl FromStr for FileType {
@@ -53,8 +55,9 @@ impl FromStr for FileType {
             "PARQUET" => Ok(Self::Parquet),
             "NDJSON" => Ok(Self::NdJson),
             "CSV" => Ok(Self::CSV),
+            "AVRO" => Ok(Self::Avro),
             other => Err(ParserError::ParserError(format!(
-                "expect one of PARQUET, NDJSON, or CSV, found: {}",
+                "expect one of PARQUET, AVRO, NDJSON, or CSV, found: {}",
                 other
             ))),
         }
@@ -390,10 +393,21 @@ mod tests {
         });
         expect_parse_ok(sql, expected)?;
 
+        // positive case: it is ok for avro files not to have columns specified
+        let sql = "CREATE EXTERNAL TABLE t STORED AS AVRO LOCATION 'foo.avro'";
+        let expected = Statement::CreateExternalTable(CreateExternalTable {
+            name: "t".into(),
+            columns: vec![],
+            file_type: FileType::Avro,
+            has_header: false,
+            location: "foo.avro".into(),
+        });
+        expect_parse_ok(sql, expected)?;
+
         // Error cases: Invalid type
         let sql =
             "CREATE EXTERNAL TABLE t(c1 int) STORED AS UNKNOWN_TYPE LOCATION 'foo.csv'";
-        expect_parse_error(sql, "expect one of PARQUET, NDJSON, or CSV");
+        expect_parse_error(sql, "expect one of PARQUET, AVRO, NDJSON, or CSV");
 
         Ok(())
     }
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index e613ff3..50c36dd 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -212,6 +212,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 }
             }
             FileType::NdJson => {}
+            FileType::Avro => {}
         };
 
         let schema = self.build_schema(columns)?;
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 90173be..8916339 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -27,20 +27,16 @@ use chrono::Duration;
 extern crate arrow;
 extern crate datafusion;
 
-use arrow::{array::*, datatypes::TimeUnit};
-use arrow::{datatypes::Int32Type, datatypes::Int64Type, record_batch::RecordBatch};
 use arrow::{
-    datatypes::{
-        ArrowNativeType, ArrowPrimitiveType, ArrowTimestampType, DataType, Field, Schema,
-        SchemaRef, TimestampMicrosecondType, TimestampMillisecondType,
-        TimestampNanosecondType, TimestampSecondType,
-    },
+    array::*, datatypes::*, record_batch::RecordBatch,
     util::display::array_value_to_string,
 };
 
 use datafusion::assert_batches_eq;
 use datafusion::assert_batches_sorted_eq;
 use datafusion::logical_plan::LogicalPlan;
+#[cfg(feature = "avro")]
+use datafusion::physical_plan::avro::AvroReadOptions;
 use datafusion::physical_plan::metrics::MetricValue;
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion::physical_plan::ExecutionPlanVisitor;
@@ -2960,6 +2956,17 @@ fn register_alltypes_parquet(ctx: &mut ExecutionContext) {
     .unwrap();
 }
 
+#[cfg(feature = "avro")]
+fn register_alltypes_avro(ctx: &mut ExecutionContext) {
+    let testdata = datafusion::test_util::arrow_test_data();
+    ctx.register_avro(
+        "alltypes_plain",
+        &format!("{}/avro/alltypes_plain.avro", testdata),
+        AvroReadOptions::default(),
+    )
+    .unwrap();
+}
+
 /// Execute query and return result set as 2-d table of Vecs
 /// `result[row][column]`
 async fn execute_to_batches(ctx: &mut ExecutionContext, sql: &str) -> Vec<RecordBatch> {
@@ -4685,3 +4692,137 @@ async fn test_regexp_is_match() -> Result<()> {
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[cfg(feature = "avro")]
+#[tokio::test]
+async fn avro_query() {
+    let mut ctx = ExecutionContext::new();
+    register_alltypes_avro(&mut ctx);
+    // NOTE that string_col is actually a binary column and does not have the UTF8 logical type
+    // so we need an explicit cast
+    let sql = "SELECT id, CAST(string_col AS varchar) FROM alltypes_plain";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+----+-----------------------------------------+",
+        "| id | CAST(alltypes_plain.string_col AS Utf8) |",
+        "+----+-----------------------------------------+",
+        "| 4  | 0                                       |",
+        "| 5  | 1                                       |",
+        "| 6  | 0                                       |",
+        "| 7  | 1                                       |",
+        "| 2  | 0                                       |",
+        "| 3  | 1                                       |",
+        "| 0  | 0                                       |",
+        "| 1  | 1                                       |",
+        "+----+-----------------------------------------+",
+    ];
+
+    assert_batches_eq!(expected, &actual);
+}
+
+#[cfg(feature = "avro")]
+#[tokio::test]
+async fn avro_query_multiple_files() {
+    let tempdir = tempfile::tempdir().unwrap();
+    let table_path = tempdir.path();
+    let testdata = datafusion::test_util::arrow_test_data();
+    let alltypes_plain_file = format!("{}/avro/alltypes_plain.avro", testdata);
+    std::fs::copy(
+        &alltypes_plain_file,
+        format!("{}/alltypes_plain1.avro", table_path.display()),
+    )
+    .unwrap();
+    std::fs::copy(
+        &alltypes_plain_file,
+        format!("{}/alltypes_plain2.avro", table_path.display()),
+    )
+    .unwrap();
+
+    let mut ctx = ExecutionContext::new();
+    ctx.register_avro(
+        "alltypes_plain",
+        table_path.display().to_string().as_str(),
+        AvroReadOptions::default(),
+    )
+    .unwrap();
+    // NOTE that string_col is actually a binary column and does not have the UTF8 logical type
+    // so we need an explicit cast
+    let sql = "SELECT id, CAST(string_col AS varchar) FROM alltypes_plain";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+----+-----------------------------------------+",
+        "| id | CAST(alltypes_plain.string_col AS Utf8) |",
+        "+----+-----------------------------------------+",
+        "| 4  | 0                                       |",
+        "| 5  | 1                                       |",
+        "| 6  | 0                                       |",
+        "| 7  | 1                                       |",
+        "| 2  | 0                                       |",
+        "| 3  | 1                                       |",
+        "| 0  | 0                                       |",
+        "| 1  | 1                                       |",
+        "| 4  | 0                                       |",
+        "| 5  | 1                                       |",
+        "| 6  | 0                                       |",
+        "| 7  | 1                                       |",
+        "| 2  | 0                                       |",
+        "| 3  | 1                                       |",
+        "| 0  | 0                                       |",
+        "| 1  | 1                                       |",
+        "+----+-----------------------------------------+",
+    ];
+
+    assert_batches_eq!(expected, &actual);
+}
+
+#[cfg(feature = "avro")]
+#[tokio::test]
+async fn avro_single_nan_schema() {
+    let mut ctx = ExecutionContext::new();
+    let testdata = datafusion::test_util::arrow_test_data();
+    ctx.register_avro(
+        "single_nan",
+        &format!("{}/avro/single_nan.avro", testdata),
+        AvroReadOptions::default(),
+    )
+    .unwrap();
+    let sql = "SELECT mycol FROM single_nan";
+    let plan = ctx.create_logical_plan(sql).unwrap();
+    let plan = ctx.optimize(&plan).unwrap();
+    let plan = ctx.create_physical_plan(&plan).unwrap();
+    let results = collect(plan).await.unwrap();
+    for batch in results {
+        assert_eq!(1, batch.num_rows());
+        assert_eq!(1, batch.num_columns());
+    }
+}
+
+#[cfg(feature = "avro")]
+#[tokio::test]
+async fn avro_explain() {
+    let mut ctx = ExecutionContext::new();
+    register_alltypes_avro(&mut ctx);
+
+    let sql = "EXPLAIN SELECT count(*) from alltypes_plain";
+    let actual = execute(&mut ctx, sql).await;
+    let actual = normalize_vec_for_explain(actual);
+    let expected = vec![
+        vec![
+            "logical_plan",
+            "Projection: #COUNT(UInt8(1))\
+            \n  Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\
+            \n    TableScan: alltypes_plain projection=Some([0])",
+        ],
+        vec![
+            "physical_plan",
+            "ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))]\
+            \n  HashAggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\
+            \n    CoalescePartitionsExec\
+            \n      HashAggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\
+            \n        RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
+            \n          AvroExec: source=Path(ARROW_TEST_DATA/avro/alltypes_plain.avro: [ARROW_TEST_DATA/avro/alltypes_plain.avro]), batch_size=8192, limit=None\
+            \n",
+        ],
+    ];
+    assert_eq!(expected, actual);
+}
diff --git a/testing b/testing
index b658b08..a8f7be3 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit b658b087767b041b2081766814655b4dd5a9a439
+Subproject commit a8f7be380531758eb7962542a5eb020d8795aa20