You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/09/15 14:30:40 UTC
[arrow-datafusion] branch master updated: Avro Table Provider (#910)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 6402200 Avro Table Provider (#910)
6402200 is described below
commit 6402200c3ff1ca9732e85ca39fc89ae5f30cd965
Author: Guillaume Balaine <ig...@gmail.com>
AuthorDate: Wed Sep 15 16:30:36 2021 +0200
Avro Table Provider (#910)
* Add avro as a datasource, file and table provider
* wip
* Added support composite identifiers for struct type.
* Fixed build.
* cheat and add unions to valid composite column types
* Implement the AvroArrayReader
* Add binary types
* Enable Avro as a FileType
* Enable registering an avro table in the sql parsing
* Change package name for datafusion/avro
* Implement Avro datasource tests and fix avro_rs::Value resolution to Arrow types
* Test for AvroExec::try_from_path
* external table avro test
* Basic schema conversion tests
* Complete test for avro_to_arrow_reader on alltypes_dictionnary
* fix_stable: .rewind is 'unstable'
* Fix license files and remove the unused avro-converter crate
* fix example test in avro_to_arrow
* add avro_sql test to default workflow
* Adress clippies
* Enable avro as a valid datasource for client execution
* Add avro to available logical plan nodes
* Add ToTimestampMillis as a scalar function in protos
* Allow Avro in PhysicalPlan nodes
* Remove remaining confusing references to 'json' in avro mod
* rename 'parquet' words in avro test and examples
* Handle Union of nested lists in arrow reader
* test timestamp arrays
* remove debug statement
* Make avro optional
* Remove debug statement
* Remove GetField usage (see #628)
* Fix docstring in parser tests
* Test batch output rather than just rows individually
* Remove 'csv' from error strings in physical_plan::avro
* Avro sample sql and explain queries tests in sql.rs
* Activate avro feature for cargo tests in github workflow
* Add a test for avro registering multiple files in a single table
* Switch to Result instead of Option for resolve_string
* Address missing clippy warning should_implement_trait in arrow_to_avro/reader
* Add fmt display implementation for AvroExec
* ci: fix cargo sql run example, use datafusion/avro feature instead of 'avro'
* license: missing license file for avro_to_arrow/schema.rs
* only run avro datasource tests if features have 'avro'
* refactor: rename infer_avro_schema_from_reader to read_avro_schema_from_reader
* Pass None as props to avro schema schema_to_field_with_props until further notice
* Change schema inferance to FixedSizeBinary(16) for Uuid
* schema: prefix metadata coming from avro with 'avro'
* make num traits optional and part of the avro feature flag
* Fix avro schema tests regarding external props
* split avro physical plan test feature wise and add a non-implemented test
* submodule: switch back to apache/arrow-testing
* fix_test: columns are now prefixed in the plan
* avro_test: fix clippy warning cmp-owned
* avro: move statistics to the physical plan
* Increase min stack size for cargo tests
Co-authored-by: Jorge C. Leitao <jo...@gmail.com>
---
.github/workflows/rust.yml | 5 +-
ballista/rust/client/src/context.rs | 40 +
ballista/rust/core/proto/ballista.proto | 24 +
.../rust/core/src/serde/logical_plan/from_proto.rs | 29 +
ballista/rust/core/src/serde/logical_plan/mod.rs | 8 +-
.../rust/core/src/serde/logical_plan/to_proto.rs | 30 +-
.../core/src/serde/physical_plan/from_proto.rs | 17 +
.../rust/core/src/serde/physical_plan/to_proto.rs | 23 +
datafusion-examples/Cargo.toml | 4 +
datafusion-examples/examples/avro_sql.rs | 49 +
datafusion/Cargo.toml | 5 +-
datafusion/src/avro_to_arrow/arrow_array_reader.rs | 1090 ++++++++++++++++++++
datafusion/src/avro_to_arrow/mod.rs | 47 +
datafusion/src/avro_to_arrow/reader.rs | 281 +++++
datafusion/src/avro_to_arrow/schema.rs | 464 +++++++++
datafusion/src/datasource/avro.rs | 424 ++++++++
datafusion/src/datasource/mod.rs | 1 +
datafusion/src/error.rs | 16 +
datafusion/src/execution/context.rs | 32 +
datafusion/src/lib.rs | 1 +
datafusion/src/logical_plan/builder.rs | 23 +
datafusion/src/logical_plan/dfschema.rs | 1 -
datafusion/src/logical_plan/expr.rs | 16 +-
datafusion/src/physical_plan/avro.rs | 457 ++++++++
datafusion/src/physical_plan/common.rs | 14 +
.../src/physical_plan/datetime_expressions.rs | 15 +-
datafusion/src/physical_plan/mod.rs | 1 +
datafusion/src/physical_plan/source.rs | 2 +-
datafusion/src/physical_plan/string_expressions.rs | 1 +
datafusion/src/sql/parser.rs | 18 +-
datafusion/src/sql/planner.rs | 1 +
datafusion/tests/sql.rs | 155 ++-
testing | 2 +-
33 files changed, 3258 insertions(+), 38 deletions(-)
diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
index a75af64..d62b996 100644
--- a/.github/workflows/rust.yml
+++ b/.github/workflows/rust.yml
@@ -105,13 +105,14 @@ jobs:
run: |
export ARROW_TEST_DATA=$(pwd)/testing/data
export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data
- # run tests on all workspace members with default feature list
- cargo test
+ # run tests on all workspace members with default feature list + avro
+ RUST_MIN_STACK=10485760 cargo test --features=avro
# test datafusion examples
cd datafusion-examples
cargo test --no-default-features
cargo run --example csv_sql
cargo run --example parquet_sql
+ cargo run --example avro_sql --features=datafusion/avro
env:
CARGO_HOME: "/github/home/.cargo"
CARGO_TARGET_DIR: "/github/home/target"
diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs
index ee2f656..3671f34 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -32,6 +32,7 @@ use datafusion::dataframe::DataFrame;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::dataframe_impl::DataFrameImpl;
use datafusion::logical_plan::LogicalPlan;
+use datafusion::physical_plan::avro::AvroReadOptions;
use datafusion::physical_plan::csv::CsvReadOptions;
use datafusion::sql::parser::FileType;
@@ -125,6 +126,30 @@ impl BallistaContext {
})
}
+ /// Create a DataFrame representing an Avro table scan
+
+ pub fn read_avro(
+ &self,
+ path: &str,
+ options: AvroReadOptions,
+ ) -> Result<Arc<dyn DataFrame>> {
+ // convert to absolute path because the executor likely has a different working directory
+ let path = PathBuf::from(path);
+ let path = fs::canonicalize(&path)?;
+
+ // use local DataFusion context for now but later this might call the scheduler
+ let mut ctx = {
+ let guard = self.state.lock().unwrap();
+ create_df_ctx_with_ballista_query_planner(
+ &guard.scheduler_host,
+ guard.scheduler_port,
+ guard.config(),
+ )
+ };
+ let df = ctx.read_avro(path.to_str().unwrap(), options)?;
+ Ok(df)
+ }
+
/// Create a DataFrame representing a Parquet table scan
pub fn read_parquet(&self, path: &str) -> Result<Arc<dyn DataFrame>> {
@@ -193,6 +218,17 @@ impl BallistaContext {
self.register_table(name, df.as_ref())
}
+ pub fn register_avro(
+ &self,
+ name: &str,
+ path: &str,
+ options: AvroReadOptions,
+ ) -> Result<()> {
+ let df = self.read_avro(path, options)?;
+ self.register_table(name, df.as_ref())?;
+ Ok(())
+ }
+
/// Create a DataFrame from a SQL statement
pub fn sql(&self, sql: &str) -> Result<Arc<dyn DataFrame>> {
let mut ctx = {
@@ -240,6 +276,10 @@ impl BallistaContext {
self.register_parquet(name, location)?;
Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
}
+ FileType::Avro => {
+ self.register_avro(name, location, AvroReadOptions::default())?;
+ Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
+ }
_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported file type {:?}.",
file_type
diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index dd9978f..3fc291e 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -152,6 +152,7 @@ enum ScalarFunction {
SHA384 = 32;
SHA512 = 33;
LN = 34;
+ TOTIMESTAMPMILLIS = 35;
}
message ScalarFunctionNode {
@@ -253,6 +254,7 @@ message LogicalPlanNode {
WindowNode window = 13;
AnalyzeNode analyze = 14;
CrossJoinNode cross_join = 15;
+ AvroTableScanNode avro_scan = 16;
}
}
@@ -296,6 +298,15 @@ message ParquetTableScanNode {
repeated LogicalExprNode filters = 4;
}
+message AvroTableScanNode {
+ string table_name = 1;
+ string path = 2;
+ string file_extension = 3;
+ ProjectionColumns projection = 4;
+ Schema schema = 5;
+ repeated LogicalExprNode filters = 6;
+}
+
message ProjectionNode {
LogicalPlanNode input = 1;
repeated LogicalExprNode expr = 2;
@@ -340,6 +351,7 @@ enum FileType{
NdJson = 0;
Parquet = 1;
CSV = 2;
+ Avro = 3;
}
message AnalyzeNode {
@@ -456,6 +468,7 @@ message PhysicalPlanNode {
WindowAggExecNode window = 17;
ShuffleWriterExecNode shuffle_writer = 18;
CrossJoinExecNode cross_join = 19;
+ AvroScanExecNode avro_scan = 20;
}
}
@@ -609,6 +622,17 @@ message CsvScanExecNode {
repeated string filename = 8;
}
+message AvroScanExecNode {
+ string path = 1;
+ repeated uint32 projection = 2;
+ Schema schema = 3;
+ string file_extension = 4;
+ uint32 batch_size = 5;
+
+ // partition filenames
+ repeated string filename = 8;
+}
+
enum PartitionMode {
COLLECT_LEFT = 0;
PARTITIONED = 1;
diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index 38de341..8ffdb65 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -32,6 +32,7 @@ use datafusion::logical_plan::{
LogicalPlan, LogicalPlanBuilder, Operator,
};
use datafusion::physical_plan::aggregates::AggregateFunction;
+use datafusion::physical_plan::avro::AvroReadOptions;
use datafusion::physical_plan::csv::CsvReadOptions;
use datafusion::physical_plan::window_functions::BuiltInWindowFunction;
use datafusion::scalar::ScalarValue;
@@ -171,6 +172,32 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
.build()
.map_err(|e| e.into())
}
+ LogicalPlanType::AvroScan(scan) => {
+ let schema: Schema = convert_required!(scan.schema)?;
+ let options = AvroReadOptions {
+ schema: Some(Arc::new(schema.clone())),
+ file_extension: &scan.file_extension,
+ };
+
+ let mut projection = None;
+ if let Some(columns) = &scan.projection {
+ let column_indices = columns
+ .columns
+ .iter()
+ .map(|name| schema.index_of(name))
+ .collect::<Result<Vec<usize>, _>>()?;
+ projection = Some(column_indices);
+ }
+
+ LogicalPlanBuilder::scan_avro_with_name(
+ &scan.path,
+ options,
+ projection,
+ &scan.table_name,
+ )?
+ .build()
+ .map_err(|e| e.into())
+ }
LogicalPlanType::Sort(sort) => {
let input: LogicalPlan = convert_box_required!(sort.input)?;
let sort_expr: Vec<Expr> = sort
@@ -1193,6 +1220,7 @@ impl TryFrom<i32> for protobuf::FileType {
_x if _x == FileType::NdJson as i32 => Ok(FileType::NdJson),
_x if _x == FileType::Parquet as i32 => Ok(FileType::Parquet),
_x if _x == FileType::Csv as i32 => Ok(FileType::Csv),
+ _x if _x == FileType::Avro as i32 => Ok(FileType::Avro),
invalid => Err(BallistaError::General(format!(
"Attempted to convert invalid i32 to protobuf::Filetype: {}",
invalid
@@ -1209,6 +1237,7 @@ impl Into<datafusion::sql::parser::FileType> for protobuf::FileType {
protobuf::FileType::NdJson => FileType::NdJson,
protobuf::FileType::Parquet => FileType::Parquet,
protobuf::FileType::Csv => FileType::CSV,
+ protobuf::FileType::Avro => FileType::Avro,
}
}
}
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index dbaac1d..ada3c85 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -643,8 +643,12 @@ mod roundtrip_tests {
let df_schema_ref = schema.to_dfschema_ref()?;
- let filetypes: [FileType; 3] =
- [FileType::NdJson, FileType::Parquet, FileType::CSV];
+ let filetypes: [FileType; 4] = [
+ FileType::NdJson,
+ FileType::Parquet,
+ FileType::CSV,
+ FileType::Avro,
+ ];
for file in filetypes.iter() {
let create_table_node = LogicalPlan::CreateExternalTable {
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index 10bc63e..e195e82 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -25,6 +25,7 @@ use crate::serde::{protobuf, BallistaError};
use datafusion::arrow::datatypes::{
DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit,
};
+use datafusion::datasource::avro::AvroFile;
use datafusion::datasource::{CsvFile, PartitionedFile, TableDescriptor};
use datafusion::logical_plan::{
window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
@@ -793,6 +794,19 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
},
)),
})
+ } else if let Some(avro) = source.downcast_ref::<AvroFile>() {
+ Ok(protobuf::LogicalPlanNode {
+ logical_plan_type: Some(LogicalPlanType::AvroScan(
+ protobuf::AvroTableScanNode {
+ table_name: table_name.to_owned(),
+ path: avro.path().to_owned(),
+ projection,
+ schema: Some(schema),
+ file_extension: avro.file_extension().to_string(),
+ filters,
+ },
+ )),
+ })
} else {
Err(BallistaError::General(format!(
"logical plan to_proto unsupported table provider {:?}",
@@ -974,6 +988,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
FileType::NdJson => protobuf::FileType::NdJson,
FileType::Parquet => protobuf::FileType::Parquet,
FileType::CSV => protobuf::FileType::Csv,
+ FileType::Avro => protobuf::FileType::Avro,
};
Ok(protobuf::LogicalPlanNode {
@@ -1098,7 +1113,13 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
)
}
};
- let arg = &args[0];
+ let arg_expr: Option<Box<protobuf::LogicalExprNode>> = if !args.is_empty()
+ {
+ let arg = &args[0];
+ Some(Box::new(arg.try_into()?))
+ } else {
+ None
+ };
let partition_by = partition_by
.iter()
.map(|e| e.try_into())
@@ -1111,7 +1132,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
protobuf::window_expr_node::WindowFrame::Frame(window_frame.into())
});
let window_expr = Box::new(protobuf::WindowExprNode {
- expr: Some(Box::new(arg.try_into()?)),
+ expr: arg_expr,
window_function: Some(window_function),
partition_by,
order_by,
@@ -1284,7 +1305,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
Expr::Wildcard => Ok(protobuf::LogicalExprNode {
expr_type: Some(protobuf::logical_expr_node::ExprType::Wildcard(true)),
}),
- Expr::TryCast { .. } => unimplemented!(),
+ _ => unimplemented!(),
}
}
}
@@ -1473,6 +1494,9 @@ impl TryInto<protobuf::ScalarFunction> for &BuiltinScalarFunction {
BuiltinScalarFunction::SHA256 => Ok(protobuf::ScalarFunction::Sha256),
BuiltinScalarFunction::SHA384 => Ok(protobuf::ScalarFunction::Sha384),
BuiltinScalarFunction::SHA512 => Ok(protobuf::ScalarFunction::Sha512),
+ BuiltinScalarFunction::ToTimestampMillis => {
+ Ok(protobuf::ScalarFunction::Totimestampmillis)
+ }
_ => Err(BallistaError::General(format!(
"logical_plan::to_proto() unsupported scalar function {:?}",
self
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 3cd8cf3..0d23372 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -43,6 +43,7 @@ use datafusion::logical_plan::{
window_frames::WindowFrame, DFSchema, Expr, JoinConstraint, JoinType,
};
use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunction};
+use datafusion::physical_plan::avro::{AvroExec, AvroReadOptions};
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use datafusion::physical_plan::hash_join::PartitionMode;
@@ -153,6 +154,21 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
None,
)))
}
+ PhysicalPlanType::AvroScan(scan) => {
+ let schema = Arc::new(convert_required!(scan.schema)?);
+ let options = AvroReadOptions {
+ schema: Some(schema),
+ file_extension: &scan.file_extension,
+ };
+ let projection = scan.projection.iter().map(|i| *i as usize).collect();
+ Ok(Arc::new(AvroExec::try_from_path(
+ &scan.path,
+ options,
+ Some(projection),
+ scan.batch_size as usize,
+ None,
+ )?))
+ }
PhysicalPlanType::CoalesceBatches(coalesce_batches) => {
let input: Arc<dyn ExecutionPlan> =
convert_box_required!(coalesce_batches.input)?;
@@ -544,6 +560,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
ScalarFunction::Sha384 => BuiltinScalarFunction::SHA384,
ScalarFunction::Sha512 => BuiltinScalarFunction::SHA512,
ScalarFunction::Ln => BuiltinScalarFunction::Ln,
+ ScalarFunction::Totimestampmillis => BuiltinScalarFunction::ToTimestampMillis,
}
}
}
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index e7d4ac6..22a49cb 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -62,6 +62,7 @@ use crate::execution_plans::{
use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
use crate::serde::scheduler::PartitionLocation;
use crate::serde::{protobuf, BallistaError};
+use datafusion::physical_plan::avro::AvroExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::functions::{BuiltinScalarFunction, ScalarFunctionExpr};
use datafusion::physical_plan::repartition::RepartitionExec;
@@ -285,6 +286,28 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
},
)),
})
+ } else if let Some(exec) = plan.downcast_ref::<AvroExec>() {
+ Ok(protobuf::PhysicalPlanNode {
+ physical_plan_type: Some(PhysicalPlanType::AvroScan(
+ protobuf::AvroScanExecNode {
+ path: exec.path().to_owned(),
+ filename: exec.filenames().to_vec(),
+ projection: exec
+ .projection()
+ .ok_or_else(|| {
+ BallistaError::General(
+ "projection in AvroExec doesn't exist.".to_owned(),
+ )
+ })?
+ .iter()
+ .map(|n| *n as u32)
+ .collect(),
+ file_extension: exec.file_extension().to_owned(),
+ schema: Some(exec.file_schema().as_ref().into()),
+ batch_size: exec.batch_size() as u32,
+ },
+ )),
+ })
} else if let Some(exec) = plan.downcast_ref::<ShuffleReaderExec>() {
let mut partition = vec![];
for location in &exec.partition {
diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml
index 9b859c6..113cd5b 100644
--- a/datafusion-examples/Cargo.toml
+++ b/datafusion-examples/Cargo.toml
@@ -27,6 +27,10 @@ keywords = [ "arrow", "query", "sql" ]
edition = "2018"
publish = false
+[[example]]
+name = "avro_sql"
+path = "examples/avro_sql.rs"
+required-features = ["datafusion/avro"]
[dev-dependencies]
arrow-flight = { version = "^5.3" }
diff --git a/datafusion-examples/examples/avro_sql.rs b/datafusion-examples/examples/avro_sql.rs
new file mode 100644
index 0000000..e9676a0
--- /dev/null
+++ b/datafusion-examples/examples/avro_sql.rs
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::arrow::util::pretty;
+
+use datafusion::error::Result;
+use datafusion::physical_plan::avro::AvroReadOptions;
+use datafusion::prelude::*;
+
+/// This example demonstrates executing a simple query against an Arrow data source (Avro) and
+/// fetching results
+#[tokio::main]
+async fn main() -> Result<()> {
+ // create local execution context
+ let mut ctx = ExecutionContext::new();
+
+ let testdata = datafusion::arrow::util::test_util::arrow_test_data();
+
+ // register avro file with the execution context
+ let avro_file = &format!("{}/avro/alltypes_plain.avro", testdata);
+ ctx.register_avro("alltypes_plain", avro_file, AvroReadOptions::default())?;
+
+ // execute the query
+ let df = ctx.sql(
+ "SELECT int_col, double_col, CAST(date_string_col as VARCHAR) \
+ FROM alltypes_plain \
+ WHERE id > 1 AND tinyint_col < double_col",
+ )?;
+ let results = df.collect().await?;
+
+ // print the results
+ pretty::print_batches(&results)?;
+
+ Ok(())
+}
diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml
index f30db02..c1998be 100644
--- a/datafusion/Cargo.toml
+++ b/datafusion/Cargo.toml
@@ -44,7 +44,8 @@ regex_expressions = ["regex", "lazy_static"]
unicode_expressions = ["unicode-segmentation"]
# Used for testing ONLY: causes all values to hash to the same value (test for collisions)
force_hash_collisions = []
-
+# Used to enable the avro format
+avro = ["avro-rs", "num-traits"]
[dependencies]
ahash = "0.7"
@@ -69,6 +70,8 @@ regex = { version = "^1.4.3", optional = true }
lazy_static = { version = "^1.4.0", optional = true }
smallvec = { version = "1.6", features = ["union"] }
rand = "0.8"
+avro-rs = { version = "0.13", features = ["snappy"], optional = true }
+num-traits = { version = "0.2", optional = true }
[dev-dependencies]
criterion = "0.3"
diff --git a/datafusion/src/avro_to_arrow/arrow_array_reader.rs b/datafusion/src/avro_to_arrow/arrow_array_reader.rs
new file mode 100644
index 0000000..cc8ed8e
--- /dev/null
+++ b/datafusion/src/avro_to_arrow/arrow_array_reader.rs
@@ -0,0 +1,1090 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Avro to Arrow array readers
+
+use crate::arrow::array::{
+ make_array, Array, ArrayBuilder, ArrayData, ArrayDataBuilder, ArrayRef,
+ BooleanBuilder, LargeStringArray, ListBuilder, NullArray, OffsetSizeTrait,
+ PrimitiveArray, PrimitiveBuilder, StringArray, StringBuilder,
+ StringDictionaryBuilder,
+};
+use crate::arrow::buffer::{Buffer, MutableBuffer};
+use crate::arrow::datatypes::{
+ ArrowDictionaryKeyType, ArrowNumericType, ArrowPrimitiveType, DataType, Date32Type,
+ Date64Type, Field, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type,
+ Int8Type, Schema, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
+ Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType,
+ TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, UInt64Type,
+ UInt8Type,
+};
+use crate::arrow::error::ArrowError;
+use crate::arrow::record_batch::RecordBatch;
+use crate::arrow::util::bit_util;
+use crate::error::{DataFusionError, Result};
+use arrow::array::{BinaryArray, GenericListArray};
+use arrow::datatypes::SchemaRef;
+use arrow::error::ArrowError::SchemaError;
+use arrow::error::Result as ArrowResult;
+use avro_rs::{
+ schema::{Schema as AvroSchema, SchemaKind},
+ types::Value,
+ AvroResult, Error as AvroError, Reader as AvroReader,
+};
+use num_traits::NumCast;
+use std::collections::HashMap;
+use std::io::Read;
+use std::sync::Arc;
+
+type RecordSlice<'a> = &'a [Vec<(String, Value)>];
+
+pub struct AvroArrowArrayReader<'a, R: Read> {
+ reader: AvroReader<'a, R>,
+ schema: SchemaRef,
+ projection: Option<Vec<String>>,
+ schema_lookup: HashMap<String, usize>,
+}
+
+impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
+ pub fn try_new(
+ reader: R,
+ schema: SchemaRef,
+ projection: Option<Vec<String>>,
+ ) -> Result<Self> {
+ let reader = AvroReader::new(reader)?;
+ let writer_schema = reader.writer_schema().clone();
+ let schema_lookup = Self::schema_lookup(writer_schema)?;
+ Ok(Self {
+ reader,
+ schema,
+ projection,
+ schema_lookup,
+ })
+ }
+
+ pub fn schema_lookup(schema: AvroSchema) -> Result<HashMap<String, usize>> {
+ match schema {
+ AvroSchema::Record {
+ lookup: ref schema_lookup,
+ ..
+ } => Ok(schema_lookup.clone()),
+ _ => Err(DataFusionError::ArrowError(SchemaError(
+ "expected avro schema to be a record".to_string(),
+ ))),
+ }
+ }
+
+ /// Read the next batch of records
+ #[allow(clippy::should_implement_trait)]
+ pub fn next_batch(&mut self, batch_size: usize) -> ArrowResult<Option<RecordBatch>> {
+ let mut rows = Vec::with_capacity(batch_size);
+ for value in self.reader.by_ref().take(batch_size) {
+ let v = value.map_err(|e| {
+ ArrowError::ParseError(format!("Failed to parse avro value: {:?}", e))
+ })?;
+ match v {
+ Value::Record(v) => {
+ rows.push(v);
+ }
+ other => {
+ return Err(ArrowError::ParseError(format!(
+ "Row needs to be of type object, got: {:?}",
+ other
+ )))
+ }
+ }
+ }
+ if rows.is_empty() {
+ // reached end of file
+ return Ok(None);
+ }
+ let rows = &rows[..];
+ let projection = self.projection.clone().unwrap_or_else(Vec::new);
+ let arrays = self.build_struct_array(rows, self.schema.fields(), &projection);
+ let projected_fields: Vec<Field> = if projection.is_empty() {
+ self.schema.fields().to_vec()
+ } else {
+ projection
+ .iter()
+ .map(|name| self.schema.column_with_name(name))
+ .flatten()
+ .map(|(_, field)| field.clone())
+ .collect()
+ };
+ let projected_schema = Arc::new(Schema::new(projected_fields));
+ arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr).map(Some))
+ }
+
+ fn build_boolean_array(
+ &self,
+ rows: RecordSlice,
+ col_name: &str,
+ ) -> ArrowResult<ArrayRef> {
+ let mut builder = BooleanBuilder::new(rows.len());
+ for row in rows {
+ if let Some(value) = self.field_lookup(col_name, row) {
+ if let Some(boolean) = resolve_boolean(&value) {
+ builder.append_value(boolean)?
+ } else {
+ builder.append_null()?;
+ }
+ } else {
+ builder.append_null()?;
+ }
+ }
+ Ok(Arc::new(builder.finish()))
+ }
+
+ #[allow(clippy::unnecessary_wraps)]
+ fn build_primitive_array<T: ArrowPrimitiveType + Resolver>(
+ &self,
+ rows: RecordSlice,
+ col_name: &str,
+ ) -> ArrowResult<ArrayRef>
+ where
+ T: ArrowNumericType,
+ T::Native: num_traits::cast::NumCast,
+ {
+ Ok(Arc::new(
+ rows.iter()
+ .map(|row| {
+ self.field_lookup(col_name, row)
+ .and_then(|value| resolve_item::<T>(&value))
+ })
+ .collect::<PrimitiveArray<T>>(),
+ ))
+ }
+
+ #[inline(always)]
+ #[allow(clippy::unnecessary_wraps)]
+ fn build_string_dictionary_builder<T>(
+ &self,
+ row_len: usize,
+ ) -> ArrowResult<StringDictionaryBuilder<T>>
+ where
+ T: ArrowPrimitiveType + ArrowDictionaryKeyType,
+ {
+ let key_builder = PrimitiveBuilder::<T>::new(row_len);
+ let values_builder = StringBuilder::new(row_len * 5);
+ Ok(StringDictionaryBuilder::new(key_builder, values_builder))
+ }
+
+ fn build_wrapped_list_array(
+ &self,
+ rows: RecordSlice,
+ col_name: &str,
+ key_type: &DataType,
+ ) -> ArrowResult<ArrayRef> {
+ match *key_type {
+ DataType::Int8 => {
+ let dtype = DataType::Dictionary(
+ Box::new(DataType::Int8),
+ Box::new(DataType::Utf8),
+ );
+ self.list_array_string_array_builder::<Int8Type>(&dtype, col_name, rows)
+ }
+ DataType::Int16 => {
+ let dtype = DataType::Dictionary(
+ Box::new(DataType::Int16),
+ Box::new(DataType::Utf8),
+ );
+ self.list_array_string_array_builder::<Int16Type>(&dtype, col_name, rows)
+ }
+ DataType::Int32 => {
+ let dtype = DataType::Dictionary(
+ Box::new(DataType::Int32),
+ Box::new(DataType::Utf8),
+ );
+ self.list_array_string_array_builder::<Int32Type>(&dtype, col_name, rows)
+ }
+ DataType::Int64 => {
+ let dtype = DataType::Dictionary(
+ Box::new(DataType::Int64),
+ Box::new(DataType::Utf8),
+ );
+ self.list_array_string_array_builder::<Int64Type>(&dtype, col_name, rows)
+ }
+ DataType::UInt8 => {
+ let dtype = DataType::Dictionary(
+ Box::new(DataType::UInt8),
+ Box::new(DataType::Utf8),
+ );
+ self.list_array_string_array_builder::<UInt8Type>(&dtype, col_name, rows)
+ }
+ DataType::UInt16 => {
+ let dtype = DataType::Dictionary(
+ Box::new(DataType::UInt16),
+ Box::new(DataType::Utf8),
+ );
+ self.list_array_string_array_builder::<UInt16Type>(&dtype, col_name, rows)
+ }
+ DataType::UInt32 => {
+ let dtype = DataType::Dictionary(
+ Box::new(DataType::UInt32),
+ Box::new(DataType::Utf8),
+ );
+ self.list_array_string_array_builder::<UInt32Type>(&dtype, col_name, rows)
+ }
+ DataType::UInt64 => {
+ let dtype = DataType::Dictionary(
+ Box::new(DataType::UInt64),
+ Box::new(DataType::Utf8),
+ );
+ self.list_array_string_array_builder::<UInt64Type>(&dtype, col_name, rows)
+ }
+ ref e => Err(SchemaError(format!(
+ "Data type is currently not supported for dictionaries in list : {:?}",
+ e
+ ))),
+ }
+ }
+
+ #[inline(always)]
+ fn list_array_string_array_builder<D>(
+ &self,
+ data_type: &DataType,
+ col_name: &str,
+ rows: RecordSlice,
+ ) -> ArrowResult<ArrayRef>
+ where
+ D: ArrowPrimitiveType + ArrowDictionaryKeyType,
+ {
+ let mut builder: Box<dyn ArrayBuilder> = match data_type {
+ DataType::Utf8 => {
+ let values_builder = StringBuilder::new(rows.len() * 5);
+ Box::new(ListBuilder::new(values_builder))
+ }
+ DataType::Dictionary(_, _) => {
+ let values_builder =
+ self.build_string_dictionary_builder::<D>(rows.len() * 5)?;
+ Box::new(ListBuilder::new(values_builder))
+ }
+ e => {
+ return Err(SchemaError(format!(
+ "Nested list data builder type is not supported: {:?}",
+ e
+ )))
+ }
+ };
+
+ for row in rows {
+ if let Some(value) = self.field_lookup(col_name, row) {
+ // value can be an array or a scalar
+ let vals: Vec<Option<String>> = if let Value::String(v) = value {
+ vec![Some(v.to_string())]
+ } else if let Value::Array(n) = value {
+ n.into_iter()
+ .map(|v| resolve_string(&v))
+ .collect::<ArrowResult<Vec<String>>>()?
+ .into_iter()
+ .map(Some)
+ .collect::<Vec<Option<String>>>()
+ } else if let Value::Null = value {
+ vec![None]
+ } else if !matches!(value, Value::Record(_)) {
+ vec![Some(resolve_string(&value)?)]
+ } else {
+ return Err(SchemaError(
+ "Only scalars are currently supported in Avro arrays".to_string(),
+ ));
+ };
+
+ // TODO: ARROW-10335: APIs of dictionary arrays and others are different. Unify
+ // them.
+ match data_type {
+ DataType::Utf8 => {
+ let builder = builder
+ .as_any_mut()
+ .downcast_mut::<ListBuilder<StringBuilder>>()
+ .ok_or_else(||ArrowError::SchemaError(
+ "Cast failed for ListBuilder<StringBuilder> during nested data parsing".to_string(),
+ ))?;
+ for val in vals {
+ if let Some(v) = val {
+ builder.values().append_value(&v)?
+ } else {
+ builder.values().append_null()?
+ };
+ }
+
+ // Append to the list
+ builder.append(true)?;
+ }
+ DataType::Dictionary(_, _) => {
+ let builder = builder.as_any_mut().downcast_mut::<ListBuilder<StringDictionaryBuilder<D>>>().ok_or_else(||ArrowError::SchemaError(
+ "Cast failed for ListBuilder<StringDictionaryBuilder> during nested data parsing".to_string(),
+ ))?;
+ for val in vals {
+ if let Some(v) = val {
+ let _ = builder.values().append(&v)?;
+ } else {
+ builder.values().append_null()?
+ };
+ }
+
+ // Append to the list
+ builder.append(true)?;
+ }
+ e => {
+ return Err(SchemaError(format!(
+ "Nested list data builder type is not supported: {:?}",
+ e
+ )))
+ }
+ }
+ }
+ }
+
+ Ok(builder.finish() as ArrayRef)
+ }
+
+ #[inline(always)]
+ fn build_dictionary_array<T>(
+ &self,
+ rows: RecordSlice,
+ col_name: &str,
+ ) -> ArrowResult<ArrayRef>
+ where
+ T::Native: num_traits::cast::NumCast,
+ T: ArrowPrimitiveType + ArrowDictionaryKeyType,
+ {
+ let mut builder: StringDictionaryBuilder<T> =
+ self.build_string_dictionary_builder(rows.len())?;
+ for row in rows {
+ if let Some(value) = self.field_lookup(col_name, row) {
+ if let Ok(str_v) = resolve_string(&value) {
+ builder.append(str_v).map(drop)?
+ } else {
+ builder.append_null()?
+ }
+ } else {
+ builder.append_null()?
+ }
+ }
+ Ok(Arc::new(builder.finish()) as ArrayRef)
+ }
+
+ #[inline(always)]
+ fn build_string_dictionary_array(
+ &self,
+ rows: RecordSlice,
+ col_name: &str,
+ key_type: &DataType,
+ value_type: &DataType,
+ ) -> ArrowResult<ArrayRef> {
+ if let DataType::Utf8 = *value_type {
+ match *key_type {
+ DataType::Int8 => self.build_dictionary_array::<Int8Type>(rows, col_name),
+ DataType::Int16 => {
+ self.build_dictionary_array::<Int16Type>(rows, col_name)
+ }
+ DataType::Int32 => {
+ self.build_dictionary_array::<Int32Type>(rows, col_name)
+ }
+ DataType::Int64 => {
+ self.build_dictionary_array::<Int64Type>(rows, col_name)
+ }
+ DataType::UInt8 => {
+ self.build_dictionary_array::<UInt8Type>(rows, col_name)
+ }
+ DataType::UInt16 => {
+ self.build_dictionary_array::<UInt16Type>(rows, col_name)
+ }
+ DataType::UInt32 => {
+ self.build_dictionary_array::<UInt32Type>(rows, col_name)
+ }
+ DataType::UInt64 => {
+ self.build_dictionary_array::<UInt64Type>(rows, col_name)
+ }
+ _ => Err(ArrowError::SchemaError(
+ "unsupported dictionary key type".to_string(),
+ )),
+ }
+ } else {
+ Err(ArrowError::SchemaError(
+ "dictionary types other than UTF-8 not yet supported".to_string(),
+ ))
+ }
+ }
+
+ /// Build a nested GenericListArray from a list of unnested `Value`s
+ fn build_nested_list_array<OffsetSize: OffsetSizeTrait>(
+ &self,
+ rows: &[Value],
+ list_field: &Field,
+ ) -> ArrowResult<ArrayRef> {
+ // build list offsets
+ let mut cur_offset = OffsetSize::zero();
+ let list_len = rows.len();
+ let num_list_bytes = bit_util::ceil(list_len, 8);
+ let mut offsets = Vec::with_capacity(list_len + 1);
+ let mut list_nulls = MutableBuffer::from_len_zeroed(num_list_bytes);
+ let list_nulls = list_nulls.as_slice_mut();
+ offsets.push(cur_offset);
+ rows.iter().enumerate().for_each(|(i, v)| {
+ // TODO: unboxing Union(Array(Union(...))) should probably be done earlier
+ let v = maybe_resolve_union(v);
+ if let Value::Array(a) = v {
+ cur_offset += OffsetSize::from_usize(a.len()).unwrap();
+ bit_util::set_bit(list_nulls, i);
+ } else if let Value::Null = v {
+ // value is null, not incremented
+ } else {
+ cur_offset += OffsetSize::one();
+ }
+ offsets.push(cur_offset);
+ });
+ let valid_len = cur_offset.to_usize().unwrap();
+ let array_data = match list_field.data_type() {
+ DataType::Null => NullArray::new(valid_len).data().clone(),
+ DataType::Boolean => {
+ let num_bytes = bit_util::ceil(valid_len, 8);
+ let mut bool_values = MutableBuffer::from_len_zeroed(num_bytes);
+ let mut bool_nulls =
+ MutableBuffer::new(num_bytes).with_bitset(num_bytes, true);
+ let mut curr_index = 0;
+ rows.iter().for_each(|v| {
+ if let Value::Array(vs) = v {
+ vs.iter().for_each(|value| {
+ if let Value::Boolean(child) = value {
+ // if valid boolean, append value
+ if *child {
+ bit_util::set_bit(
+ bool_values.as_slice_mut(),
+ curr_index,
+ );
+ }
+ } else {
+ // null slot
+ bit_util::unset_bit(
+ bool_nulls.as_slice_mut(),
+ curr_index,
+ );
+ }
+ curr_index += 1;
+ });
+ }
+ });
+ ArrayData::builder(list_field.data_type().clone())
+ .len(valid_len)
+ .add_buffer(bool_values.into())
+ .null_bit_buffer(bool_nulls.into())
+ .build()
+ }
+ DataType::Int8 => self.read_primitive_list_values::<Int8Type>(rows),
+ DataType::Int16 => self.read_primitive_list_values::<Int16Type>(rows),
+ DataType::Int32 => self.read_primitive_list_values::<Int32Type>(rows),
+ DataType::Int64 => self.read_primitive_list_values::<Int64Type>(rows),
+ DataType::UInt8 => self.read_primitive_list_values::<UInt8Type>(rows),
+ DataType::UInt16 => self.read_primitive_list_values::<UInt16Type>(rows),
+ DataType::UInt32 => self.read_primitive_list_values::<UInt32Type>(rows),
+ DataType::UInt64 => self.read_primitive_list_values::<UInt64Type>(rows),
+ DataType::Float16 => {
+ return Err(ArrowError::SchemaError("Float16 not supported".to_string()))
+ }
+ DataType::Float32 => self.read_primitive_list_values::<Float32Type>(rows),
+ DataType::Float64 => self.read_primitive_list_values::<Float64Type>(rows),
+ DataType::Timestamp(_, _)
+ | DataType::Date32
+ | DataType::Date64
+ | DataType::Time32(_)
+ | DataType::Time64(_) => {
+ return Err(ArrowError::SchemaError(
+ "Temporal types are not yet supported, see ARROW-4803".to_string(),
+ ))
+ }
+ DataType::Utf8 => flatten_string_values(rows)
+ .into_iter()
+ .collect::<StringArray>()
+ .data()
+ .clone(),
+ DataType::LargeUtf8 => flatten_string_values(rows)
+ .into_iter()
+ .collect::<LargeStringArray>()
+ .data()
+ .clone(),
+ DataType::List(field) => {
+ let child =
+ self.build_nested_list_array::<i32>(&flatten_values(rows), field)?;
+ child.data().clone()
+ }
+ DataType::LargeList(field) => {
+ let child =
+ self.build_nested_list_array::<i64>(&flatten_values(rows), field)?;
+ child.data().clone()
+ }
+ DataType::Struct(fields) => {
+ // extract list values, with non-lists converted to Value::Null
+ let array_item_count = rows
+ .iter()
+ .map(|row| match row {
+ Value::Array(values) => values.len(),
+ _ => 1,
+ })
+ .sum();
+ let num_bytes = bit_util::ceil(array_item_count, 8);
+ let mut null_buffer = MutableBuffer::from_len_zeroed(num_bytes);
+ let mut struct_index = 0;
+ let rows: Vec<Vec<(String, Value)>> = rows
+ .iter()
+ .map(|row| {
+ if let Value::Array(values) = row {
+ values.iter().for_each(|_| {
+ bit_util::set_bit(
+ null_buffer.as_slice_mut(),
+ struct_index,
+ );
+ struct_index += 1;
+ });
+ values
+ .iter()
+ .map(|v| ("".to_string(), v.clone()))
+ .collect::<Vec<(String, Value)>>()
+ } else {
+ struct_index += 1;
+ vec![("null".to_string(), Value::Null)]
+ }
+ })
+ .collect();
+ let arrays =
+ self.build_struct_array(rows.as_slice(), fields.as_slice(), &[])?;
+ let data_type = DataType::Struct(fields.clone());
+ let buf = null_buffer.into();
+ ArrayDataBuilder::new(data_type)
+ .len(rows.len())
+ .null_bit_buffer(buf)
+ .child_data(arrays.into_iter().map(|a| a.data().clone()).collect())
+ .build()
+ }
+ datatype => {
+ return Err(ArrowError::SchemaError(format!(
+ "Nested list of {:?} not supported",
+ datatype
+ )));
+ }
+ };
+ // build list
+ let list_data = ArrayData::builder(DataType::List(Box::new(list_field.clone())))
+ .len(list_len)
+ .add_buffer(Buffer::from_slice_ref(&offsets))
+ .add_child_data(array_data)
+ .null_bit_buffer(list_nulls.into())
+ .build();
+ Ok(Arc::new(GenericListArray::<OffsetSize>::from(list_data)))
+ }
+
+ /// Builds the child values of a `StructArray`, falling short of constructing the StructArray.
+ /// The function does not construct the StructArray as some callers would want the child arrays.
+ ///
+ /// *Note*: The function is recursive, and will read nested structs.
+ ///
+ /// If `projection` is not empty, then all values are returned. The first level of projection
+ /// occurs at the `RecordBatch` level. No further projection currently occurs, but would be
+ /// useful if plucking values from a struct, e.g. getting `a.b.c.e` from `a.b.c.{d, e}`.
+ fn build_struct_array(
+ &self,
+ rows: RecordSlice,
+ struct_fields: &[Field],
+ projection: &[String],
+ ) -> ArrowResult<Vec<ArrayRef>> {
+ let arrays: ArrowResult<Vec<ArrayRef>> = struct_fields
+ .iter()
+ .filter(|field| projection.is_empty() || projection.contains(field.name()))
+ .map(|field| {
+ match field.data_type() {
+ DataType::Null => {
+ Ok(Arc::new(NullArray::new(rows.len())) as ArrayRef)
+ }
+ DataType::Boolean => self.build_boolean_array(rows, field.name()),
+ DataType::Float64 => {
+ self.build_primitive_array::<Float64Type>(rows, field.name())
+ }
+ DataType::Float32 => {
+ self.build_primitive_array::<Float32Type>(rows, field.name())
+ }
+ DataType::Int64 => {
+ self.build_primitive_array::<Int64Type>(rows, field.name())
+ }
+ DataType::Int32 => {
+ self.build_primitive_array::<Int32Type>(rows, field.name())
+ }
+ DataType::Int16 => {
+ self.build_primitive_array::<Int16Type>(rows, field.name())
+ }
+ DataType::Int8 => {
+ self.build_primitive_array::<Int8Type>(rows, field.name())
+ }
+ DataType::UInt64 => {
+ self.build_primitive_array::<UInt64Type>(rows, field.name())
+ }
+ DataType::UInt32 => {
+ self.build_primitive_array::<UInt32Type>(rows, field.name())
+ }
+ DataType::UInt16 => {
+ self.build_primitive_array::<UInt16Type>(rows, field.name())
+ }
+ DataType::UInt8 => {
+ self.build_primitive_array::<UInt8Type>(rows, field.name())
+ }
+ // TODO: this is incomplete
+ DataType::Timestamp(unit, _) => match unit {
+ TimeUnit::Second => self
+ .build_primitive_array::<TimestampSecondType>(
+ rows,
+ field.name(),
+ ),
+ TimeUnit::Microsecond => self
+ .build_primitive_array::<TimestampMicrosecondType>(
+ rows,
+ field.name(),
+ ),
+ TimeUnit::Millisecond => self
+ .build_primitive_array::<TimestampMillisecondType>(
+ rows,
+ field.name(),
+ ),
+ TimeUnit::Nanosecond => self
+ .build_primitive_array::<TimestampNanosecondType>(
+ rows,
+ field.name(),
+ ),
+ },
+ DataType::Date64 => {
+ self.build_primitive_array::<Date64Type>(rows, field.name())
+ }
+ DataType::Date32 => {
+ self.build_primitive_array::<Date32Type>(rows, field.name())
+ }
+ DataType::Time64(unit) => match unit {
+ TimeUnit::Microsecond => self
+ .build_primitive_array::<Time64MicrosecondType>(
+ rows,
+ field.name(),
+ ),
+ TimeUnit::Nanosecond => self
+ .build_primitive_array::<Time64NanosecondType>(
+ rows,
+ field.name(),
+ ),
+ t => Err(ArrowError::SchemaError(format!(
+ "TimeUnit {:?} not supported with Time64",
+ t
+ ))),
+ },
+ DataType::Time32(unit) => match unit {
+ TimeUnit::Second => self
+ .build_primitive_array::<Time32SecondType>(
+ rows,
+ field.name(),
+ ),
+ TimeUnit::Millisecond => self
+ .build_primitive_array::<Time32MillisecondType>(
+ rows,
+ field.name(),
+ ),
+ t => Err(ArrowError::SchemaError(format!(
+ "TimeUnit {:?} not supported with Time32",
+ t
+ ))),
+ },
+ DataType::Utf8 | DataType::LargeUtf8 => Ok(Arc::new(
+ rows.iter()
+ .map(|row| {
+ let maybe_value = self.field_lookup(field.name(), row);
+ maybe_value
+ .map(|value| resolve_string(&value))
+ .transpose()
+ })
+ .collect::<ArrowResult<StringArray>>()?,
+ )
+ as ArrayRef),
+ DataType::Binary | DataType::LargeBinary => Ok(Arc::new(
+ rows.iter()
+ .map(|row| {
+ let maybe_value = self.field_lookup(field.name(), row);
+ maybe_value.and_then(resolve_bytes)
+ })
+ .collect::<BinaryArray>(),
+ )
+ as ArrayRef),
+ DataType::List(ref list_field) => {
+ match list_field.data_type() {
+ DataType::Dictionary(ref key_ty, _) => {
+ self.build_wrapped_list_array(rows, field.name(), key_ty)
+ }
+ _ => {
+ // extract rows by name
+ let extracted_rows = rows
+ .iter()
+ .map(|row| {
+ self.field_lookup(field.name(), row)
+ .unwrap_or(Value::Null)
+ })
+ .collect::<Vec<Value>>();
+ self.build_nested_list_array::<i32>(
+ extracted_rows.as_slice(),
+ list_field,
+ )
+ }
+ }
+ }
+ DataType::Dictionary(ref key_ty, ref val_ty) => self
+ .build_string_dictionary_array(
+ rows,
+ field.name(),
+ key_ty,
+ val_ty,
+ ),
+ DataType::Struct(fields) => {
+ let len = rows.len();
+ let num_bytes = bit_util::ceil(len, 8);
+ let mut null_buffer = MutableBuffer::from_len_zeroed(num_bytes);
+ let struct_rows = rows
+ .iter()
+ .enumerate()
+ .map(|(i, row)| (i, self.field_lookup(field.name(), row)))
+ .map(|(i, v)| match v {
+ // we want the field as an object, if it's not, we treat as null
+ Some(Value::Record(ref value)) => {
+ bit_util::set_bit(null_buffer.as_slice_mut(), i);
+ value.clone()
+ }
+ _ => vec![],
+ })
+ .collect::<Vec<Vec<(String, Value)>>>();
+ let arrays =
+ self.build_struct_array(struct_rows.as_slice(), fields, &[])?;
+ // construct a struct array's data in order to set null buffer
+ let data_type = DataType::Struct(fields.clone());
+ let data = ArrayDataBuilder::new(data_type)
+ .len(len)
+ .null_bit_buffer(null_buffer.into())
+ .child_data(
+ arrays.into_iter().map(|a| a.data().clone()).collect(),
+ )
+ .build();
+ Ok(make_array(data))
+ }
+ _ => Err(ArrowError::SchemaError(format!(
+ "type {:?} not supported",
+ field.data_type()
+ ))),
+ }
+ })
+ .collect();
+ arrays
+ }
+
+ /// Read the primitive list's values into ArrayData
+ fn read_primitive_list_values<T>(&self, rows: &[Value]) -> ArrayData
+ where
+ T: ArrowPrimitiveType + ArrowNumericType,
+ T::Native: num_traits::cast::NumCast,
+ {
+ let values = rows
+ .iter()
+ .flat_map(|row| {
+ let row = maybe_resolve_union(row);
+ if let Value::Array(values) = row {
+ values
+ .iter()
+ .map(resolve_item::<T>)
+ .collect::<Vec<Option<T::Native>>>()
+ } else if let Some(f) = resolve_item::<T>(row) {
+ vec![Some(f)]
+ } else {
+ vec![]
+ }
+ })
+ .collect::<Vec<Option<T::Native>>>();
+ let array = values.iter().collect::<PrimitiveArray<T>>();
+ array.data().clone()
+ }
+
+ fn field_lookup(&self, name: &str, row: &[(String, Value)]) -> Option<Value> {
+ self.schema_lookup
+ .get(name)
+ .and_then(|i| row.get(*i))
+ .map(|o| o.1.clone())
+ }
+}
+
+/// Flattens a list of Avro values, by flattening lists, and treating all other values as
+/// single-value lists.
+/// This is used to read into nested lists (list of list, list of struct) and non-dictionary lists.
+#[inline]
+fn flatten_values(values: &[Value]) -> Vec<Value> {
+ values
+ .iter()
+ .flat_map(|row| {
+ if let Value::Array(values) = row {
+ values.clone()
+ } else if let Value::Null = row {
+ vec![Value::Null]
+ } else {
+ // we interpret a scalar as a single-value list to minimise data loss
+ vec![row.clone()]
+ }
+ })
+ .collect()
+}
+
+/// Flattens a list into string values, dropping Value::Null in the process.
+/// This is useful for interpreting any Avro array as string, dropping nulls.
+/// See `value_as_string`.
+#[inline]
+fn flatten_string_values(values: &[Value]) -> Vec<Option<String>> {
+ values
+ .iter()
+ .flat_map(|row| {
+ if let Value::Array(values) = row {
+ values
+ .iter()
+ .map(|s| resolve_string(s).ok())
+ .collect::<Vec<Option<_>>>()
+ } else if let Value::Null = row {
+ vec![]
+ } else {
+ vec![resolve_string(row).ok()]
+ }
+ })
+ .collect::<Vec<Option<_>>>()
+}
+
+/// Reads an Avro value as a string, regardless of its type.
+/// This is useful if the expected datatype is a string, in which case we preserve
+/// all the values regardless of they type.
+fn resolve_string(v: &Value) -> ArrowResult<String> {
+ let v = if let Value::Union(b) = v { b } else { v };
+ match v {
+ Value::String(s) => Ok(s.clone()),
+ Value::Bytes(bytes) => {
+ String::from_utf8(bytes.to_vec()).map_err(AvroError::ConvertToUtf8)
+ }
+ other => Err(AvroError::GetString(other.into())),
+ }
+ .map_err(|e| SchemaError(format!("expected resolvable string : {}", e)))
+}
+
+fn resolve_u8(v: Value) -> AvroResult<u8> {
+ let int = v.resolve(&AvroSchema::Int)?;
+ if let Value::Int(n) = int {
+ if n >= 0 && n <= std::convert::From::from(u8::MAX) {
+ return Ok(n as u8);
+ }
+ }
+
+ Err(AvroError::GetU8(int.into()))
+}
+
+fn resolve_bytes(v: Value) -> Option<Vec<u8>> {
+ let v = if let Value::Union(b) = v { *b } else { v };
+ match v {
+ Value::Bytes(bytes) => Ok(Value::Bytes(bytes)),
+ Value::String(s) => Ok(Value::Bytes(s.into_bytes())),
+ Value::Array(items) => Ok(Value::Bytes(
+ items
+ .into_iter()
+ .map(resolve_u8)
+ .collect::<std::result::Result<Vec<_>, _>>()
+ .ok()?,
+ )),
+ other => Err(AvroError::GetBytes(other.into())),
+ }
+ .ok()
+ .and_then(|v| match v {
+ Value::Bytes(s) => Some(s),
+ _ => None,
+ })
+}
+
+fn resolve_boolean(value: &Value) -> Option<bool> {
+ let v = if let Value::Union(b) = value {
+ b
+ } else {
+ value
+ };
+ match v {
+ Value::Boolean(boolean) => Some(*boolean),
+ _ => None,
+ }
+}
+
+trait Resolver: ArrowPrimitiveType {
+ fn resolve(value: &Value) -> Option<Self::Native>;
+}
+
+fn resolve_item<T: Resolver>(value: &Value) -> Option<T::Native> {
+ T::resolve(value)
+}
+
+fn maybe_resolve_union(value: &Value) -> &Value {
+ if SchemaKind::from(value) == SchemaKind::Union {
+ // Pull out the Union, and attempt to resolve against it.
+ match value {
+ Value::Union(b) => b,
+ _ => unreachable!(),
+ }
+ } else {
+ value
+ }
+}
+
+impl<N> Resolver for N
+where
+ N: ArrowNumericType,
+ N::Native: num_traits::cast::NumCast,
+{
+ fn resolve(value: &Value) -> Option<Self::Native> {
+ let value = maybe_resolve_union(value);
+ match value {
+ Value::Int(i) | Value::TimeMillis(i) | Value::Date(i) => NumCast::from(*i),
+ Value::Long(l)
+ | Value::TimeMicros(l)
+ | Value::TimestampMillis(l)
+ | Value::TimestampMicros(l) => NumCast::from(*l),
+ Value::Float(f) => NumCast::from(*f),
+ Value::Double(f) => NumCast::from(*f),
+ Value::Duration(_d) => unimplemented!(), // shenanigans type
+ Value::Null => None,
+ _ => unreachable!(),
+ }
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use crate::arrow::array::Array;
+ use crate::arrow::datatypes::{Field, TimeUnit};
+ use crate::avro_to_arrow::{Reader, ReaderBuilder};
+ use arrow::array::{Int32Array, Int64Array, ListArray, TimestampMicrosecondArray};
+ use arrow::datatypes::DataType;
+ use std::fs::File;
+
+ fn build_reader(name: &str, batch_size: usize) -> Reader<File> {
+ let testdata = crate::test_util::arrow_test_data();
+ let filename = format!("{}/avro/{}", testdata, name);
+ let builder = ReaderBuilder::new()
+ .read_schema()
+ .with_batch_size(batch_size);
+ builder.build(File::open(filename).unwrap()).unwrap()
+ }
+
+ // TODO: Fixed, Enum, Dictionary
+
+ #[test]
+ fn test_time_avro_milliseconds() {
+ let mut reader = build_reader("alltypes_plain.avro", 10);
+ let batch = reader.next().unwrap().unwrap();
+
+ assert_eq!(11, batch.num_columns());
+ assert_eq!(8, batch.num_rows());
+
+ let schema = reader.schema();
+ let batch_schema = batch.schema();
+ assert_eq!(schema, batch_schema);
+
+ let timestamp_col = schema.column_with_name("timestamp_col").unwrap();
+ assert_eq!(
+ &DataType::Timestamp(TimeUnit::Microsecond, None),
+ timestamp_col.1.data_type()
+ );
+ let timestamp_array = batch
+ .column(timestamp_col.0)
+ .as_any()
+ .downcast_ref::<TimestampMicrosecondArray>()
+ .unwrap();
+ for i in 0..timestamp_array.len() {
+ assert!(timestamp_array.is_valid(i));
+ }
+ assert_eq!(1235865600000000, timestamp_array.value(0));
+ assert_eq!(1235865660000000, timestamp_array.value(1));
+ assert_eq!(1238544000000000, timestamp_array.value(2));
+ assert_eq!(1238544060000000, timestamp_array.value(3));
+ assert_eq!(1233446400000000, timestamp_array.value(4));
+ assert_eq!(1233446460000000, timestamp_array.value(5));
+ assert_eq!(1230768000000000, timestamp_array.value(6));
+ assert_eq!(1230768060000000, timestamp_array.value(7));
+ }
+
+ #[test]
+ fn test_avro_read_list() {
+ let mut reader = build_reader("list_columns.avro", 3);
+ let schema = reader.schema();
+ let (col_id_index, _) = schema.column_with_name("int64_list").unwrap();
+ let batch = reader.next().unwrap().unwrap();
+ assert_eq!(batch.num_columns(), 2);
+ assert_eq!(batch.num_rows(), 3);
+ let a_array = batch
+ .column(col_id_index)
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .unwrap();
+ assert_eq!(
+ *a_array.data_type(),
+ DataType::List(Box::new(Field::new("bigint", DataType::Int64, true)))
+ );
+ let array = a_array.value(0);
+ assert_eq!(*array.data_type(), DataType::Int64);
+
+ assert_eq!(
+ 6,
+ array
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .unwrap()
+ .iter()
+ .flatten()
+ .sum::<i64>()
+ );
+ }
+ #[test]
+ fn test_avro_read_nested_list() {
+ let mut reader = build_reader("nested_lists.snappy.avro", 3);
+ let batch = reader.next().unwrap().unwrap();
+ assert_eq!(batch.num_columns(), 2);
+ assert_eq!(batch.num_rows(), 3);
+ }
+
+ #[test]
+ fn test_avro_iterator() {
+ let reader = build_reader("alltypes_plain.avro", 5);
+ let schema = reader.schema();
+ let (col_id_index, _) = schema.column_with_name("id").unwrap();
+
+ let mut sum_num_rows = 0;
+ let mut num_batches = 0;
+ let mut sum_id = 0;
+ for batch in reader {
+ let batch = batch.unwrap();
+ assert_eq!(11, batch.num_columns());
+ sum_num_rows += batch.num_rows();
+ num_batches += 1;
+ let batch_schema = batch.schema();
+ assert_eq!(schema, batch_schema);
+ let a_array = batch
+ .column(col_id_index)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ sum_id += (0..a_array.len()).map(|i| a_array.value(i)).sum::<i32>();
+ }
+ assert_eq!(8, sum_num_rows);
+ assert_eq!(2, num_batches);
+ assert_eq!(28, sum_id);
+ }
+}
diff --git a/datafusion/src/avro_to_arrow/mod.rs b/datafusion/src/avro_to_arrow/mod.rs
new file mode 100644
index 0000000..531b109
--- /dev/null
+++ b/datafusion/src/avro_to_arrow/mod.rs
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains utilities to manipulate avro metadata.
+
+#[cfg(feature = "avro")]
+mod arrow_array_reader;
+#[cfg(feature = "avro")]
+mod reader;
+#[cfg(feature = "avro")]
+mod schema;
+
+use crate::arrow::datatypes::Schema;
+use crate::error::Result;
+#[cfg(feature = "avro")]
+pub use reader::{Reader, ReaderBuilder};
+use std::io::{Read, Seek};
+
+#[cfg(feature = "avro")]
+/// Read Avro schema given a reader
+pub fn read_avro_schema_from_reader<R: Read + Seek>(reader: &mut R) -> Result<Schema> {
+ let avro_reader = avro_rs::Reader::new(reader)?;
+ let schema = avro_reader.writer_schema();
+ schema::to_arrow_schema(schema)
+}
+
+#[cfg(not(feature = "avro"))]
+/// Read Avro schema given a reader (requires the avro feature)
+pub fn read_avro_schema_from_reader<R: Read + Seek>(_: &mut R) -> Result<Schema> {
+ Err(crate::error::DataFusionError::NotImplemented(
+ "cannot read avro schema without the 'avro' feature enabled".to_string(),
+ ))
+}
diff --git a/datafusion/src/avro_to_arrow/reader.rs b/datafusion/src/avro_to_arrow/reader.rs
new file mode 100644
index 0000000..8baad14
--- /dev/null
+++ b/datafusion/src/avro_to_arrow/reader.rs
@@ -0,0 +1,281 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::arrow_array_reader::AvroArrowArrayReader;
+use crate::arrow::datatypes::SchemaRef;
+use crate::arrow::record_batch::RecordBatch;
+use crate::error::Result;
+use arrow::error::Result as ArrowResult;
+use std::io::{Read, Seek, SeekFrom};
+use std::sync::Arc;
+
+/// Avro file reader builder
+#[derive(Debug)]
+pub struct ReaderBuilder {
+ /// Optional schema for the Avro file
+ ///
+ /// If the schema is not supplied, the reader will try to read the schema.
+ schema: Option<SchemaRef>,
+ /// Batch size (number of records to load each time)
+ ///
+ /// The default batch size when using the `ReaderBuilder` is 1024 records
+ batch_size: usize,
+ /// Optional projection for which columns to load (zero-based column indices)
+ projection: Option<Vec<String>>,
+}
+
+impl Default for ReaderBuilder {
+ fn default() -> Self {
+ Self {
+ schema: None,
+ batch_size: 1024,
+ projection: None,
+ }
+ }
+}
+
+impl ReaderBuilder {
+ /// Create a new builder for configuring Avro parsing options.
+ ///
+ /// To convert a builder into a reader, call `Reader::from_builder`
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// extern crate avro_rs;
+ ///
+ /// use std::fs::File;
+ ///
+ /// fn example() -> crate::datafusion::avro_to_arrow::Reader<'static, File> {
+ /// let file = File::open("test/data/basic.avro").unwrap();
+ ///
+ /// // create a builder, inferring the schema with the first 100 records
+ /// let builder = crate::datafusion::avro_to_arrow::ReaderBuilder::new().read_schema().with_batch_size(100);
+ ///
+ /// let reader = builder.build::<File>(file).unwrap();
+ ///
+ /// reader
+ /// }
+ /// ```
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Set the Avro file's schema
+ pub fn with_schema(mut self, schema: SchemaRef) -> Self {
+ self.schema = Some(schema);
+ self
+ }
+
+ /// Set the Avro reader to infer the schema of the file
+ pub fn read_schema(mut self) -> Self {
+ // remove any schema that is set
+ self.schema = None;
+ self
+ }
+
+ /// Set the batch size (number of records to load at one time)
+ pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+ self.batch_size = batch_size;
+ self
+ }
+
+ /// Set the reader's column projection
+ pub fn with_projection(mut self, projection: Vec<String>) -> Self {
+ self.projection = Some(projection);
+ self
+ }
+
+ /// Create a new `Reader` from the `ReaderBuilder`
+ pub fn build<'a, R>(self, source: R) -> Result<Reader<'a, R>>
+ where
+ R: Read + Seek,
+ {
+ let mut source = source;
+
+ // check if schema should be inferred
+ let schema = match self.schema {
+ Some(schema) => schema,
+ None => Arc::new(super::read_avro_schema_from_reader(&mut source)?),
+ };
+ source.seek(SeekFrom::Start(0))?;
+ Reader::try_new(source, schema, self.batch_size, self.projection)
+ }
+}
+
+/// Avro file record reader
+pub struct Reader<'a, R: Read> {
+ array_reader: AvroArrowArrayReader<'a, R>,
+ schema: SchemaRef,
+ batch_size: usize,
+}
+
+impl<'a, R: Read> Reader<'a, R> {
+ /// Create a new Avro Reader from any value that implements the `Read` trait.
+ ///
+ /// If reading a `File`, you can customise the Reader, such as to enable schema
+ /// inference, use `ReaderBuilder`.
+ pub fn try_new(
+ reader: R,
+ schema: SchemaRef,
+ batch_size: usize,
+ projection: Option<Vec<String>>,
+ ) -> Result<Self> {
+ Ok(Self {
+ array_reader: AvroArrowArrayReader::try_new(
+ reader,
+ schema.clone(),
+ projection,
+ )?,
+ schema,
+ batch_size,
+ })
+ }
+
+ /// Returns the schema of the reader, useful for getting the schema without reading
+ /// record batches
+ pub fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ /// Returns the next batch of results (defined by `self.batch_size`), or `None` if there
+ /// are no more results
+ #[allow(clippy::should_implement_trait)]
+ pub fn next(&mut self) -> ArrowResult<Option<RecordBatch>> {
+ self.array_reader.next_batch(self.batch_size)
+ }
+}
+
+impl<'a, R: Read> Iterator for Reader<'a, R> {
+ type Item = ArrowResult<RecordBatch>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.next().transpose()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::arrow::array::*;
+ use crate::arrow::datatypes::{DataType, Field};
+ use arrow::datatypes::TimeUnit;
+ use std::fs::File;
+
+ fn build_reader(name: &str) -> Reader<File> {
+ let testdata = crate::test_util::arrow_test_data();
+ let filename = format!("{}/avro/{}", testdata, name);
+ let builder = ReaderBuilder::new().read_schema().with_batch_size(64);
+ builder.build(File::open(filename).unwrap()).unwrap()
+ }
+
+ fn get_col<'a, T: 'static>(
+ batch: &'a RecordBatch,
+ col: (usize, &Field),
+ ) -> Option<&'a T> {
+ batch.column(col.0).as_any().downcast_ref::<T>()
+ }
+
+ #[test]
+ fn test_avro_basic() {
+ let mut reader = build_reader("alltypes_dictionary.avro");
+ let batch = reader.next().unwrap().unwrap();
+
+ assert_eq!(11, batch.num_columns());
+ assert_eq!(2, batch.num_rows());
+
+ let schema = reader.schema();
+ let batch_schema = batch.schema();
+ assert_eq!(schema, batch_schema);
+
+ let id = schema.column_with_name("id").unwrap();
+ assert_eq!(0, id.0);
+ assert_eq!(&DataType::Int32, id.1.data_type());
+ let col = get_col::<Int32Array>(&batch, id).unwrap();
+ assert_eq!(0, col.value(0));
+ assert_eq!(1, col.value(1));
+ let bool_col = schema.column_with_name("bool_col").unwrap();
+ assert_eq!(1, bool_col.0);
+ assert_eq!(&DataType::Boolean, bool_col.1.data_type());
+ let col = get_col::<BooleanArray>(&batch, bool_col).unwrap();
+ assert!(col.value(0));
+ assert!(!col.value(1));
+ let tinyint_col = schema.column_with_name("tinyint_col").unwrap();
+ assert_eq!(2, tinyint_col.0);
+ assert_eq!(&DataType::Int32, tinyint_col.1.data_type());
+ let col = get_col::<Int32Array>(&batch, tinyint_col).unwrap();
+ assert_eq!(0, col.value(0));
+ assert_eq!(1, col.value(1));
+ let smallint_col = schema.column_with_name("smallint_col").unwrap();
+ assert_eq!(3, smallint_col.0);
+ assert_eq!(&DataType::Int32, smallint_col.1.data_type());
+ let col = get_col::<Int32Array>(&batch, smallint_col).unwrap();
+ assert_eq!(0, col.value(0));
+ assert_eq!(1, col.value(1));
+ let int_col = schema.column_with_name("int_col").unwrap();
+ assert_eq!(4, int_col.0);
+ let col = get_col::<Int32Array>(&batch, int_col).unwrap();
+ assert_eq!(0, col.value(0));
+ assert_eq!(1, col.value(1));
+ assert_eq!(&DataType::Int32, int_col.1.data_type());
+ let col = get_col::<Int32Array>(&batch, int_col).unwrap();
+ assert_eq!(0, col.value(0));
+ assert_eq!(1, col.value(1));
+ let bigint_col = schema.column_with_name("bigint_col").unwrap();
+ assert_eq!(5, bigint_col.0);
+ let col = get_col::<Int64Array>(&batch, bigint_col).unwrap();
+ assert_eq!(0, col.value(0));
+ assert_eq!(10, col.value(1));
+ assert_eq!(&DataType::Int64, bigint_col.1.data_type());
+ let float_col = schema.column_with_name("float_col").unwrap();
+ assert_eq!(6, float_col.0);
+ let col = get_col::<Float32Array>(&batch, float_col).unwrap();
+ assert_eq!(0.0, col.value(0));
+ assert_eq!(1.1, col.value(1));
+ assert_eq!(&DataType::Float32, float_col.1.data_type());
+ let col = get_col::<Float32Array>(&batch, float_col).unwrap();
+ assert_eq!(0.0, col.value(0));
+ assert_eq!(1.1, col.value(1));
+ let double_col = schema.column_with_name("double_col").unwrap();
+ assert_eq!(7, double_col.0);
+ assert_eq!(&DataType::Float64, double_col.1.data_type());
+ let col = get_col::<Float64Array>(&batch, double_col).unwrap();
+ assert_eq!(0.0, col.value(0));
+ assert_eq!(10.1, col.value(1));
+ let date_string_col = schema.column_with_name("date_string_col").unwrap();
+ assert_eq!(8, date_string_col.0);
+ assert_eq!(&DataType::Binary, date_string_col.1.data_type());
+ let col = get_col::<BinaryArray>(&batch, date_string_col).unwrap();
+ assert_eq!("01/01/09".as_bytes(), col.value(0));
+ assert_eq!("01/01/09".as_bytes(), col.value(1));
+ let string_col = schema.column_with_name("string_col").unwrap();
+ assert_eq!(9, string_col.0);
+ assert_eq!(&DataType::Binary, string_col.1.data_type());
+ let col = get_col::<BinaryArray>(&batch, string_col).unwrap();
+ assert_eq!("0".as_bytes(), col.value(0));
+ assert_eq!("1".as_bytes(), col.value(1));
+ let timestamp_col = schema.column_with_name("timestamp_col").unwrap();
+ assert_eq!(10, timestamp_col.0);
+ assert_eq!(
+ &DataType::Timestamp(TimeUnit::Microsecond, None),
+ timestamp_col.1.data_type()
+ );
+ let col = get_col::<TimestampMicrosecondArray>(&batch, timestamp_col).unwrap();
+ assert_eq!(1230768000000000, col.value(0));
+ assert_eq!(1230768060000000, col.value(1));
+ }
+}
diff --git a/datafusion/src/avro_to_arrow/schema.rs b/datafusion/src/avro_to_arrow/schema.rs
new file mode 100644
index 0000000..c2927f0
--- /dev/null
+++ b/datafusion/src/avro_to_arrow/schema.rs
@@ -0,0 +1,464 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::datatypes::{DataType, IntervalUnit, Schema, TimeUnit};
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::Field;
+use avro_rs::schema::Name;
+use avro_rs::types::Value;
+use avro_rs::Schema as AvroSchema;
+use std::collections::BTreeMap;
+use std::convert::TryFrom;
+
+/// Converts an avro schema to an arrow schema
+pub fn to_arrow_schema(avro_schema: &avro_rs::Schema) -> Result<Schema> {
+ let mut schema_fields = vec![];
+ match avro_schema {
+ AvroSchema::Record { fields, .. } => {
+ for field in fields {
+ schema_fields.push(schema_to_field_with_props(
+ &field.schema,
+ Some(&field.name),
+ false,
+ Some(&external_props(&field.schema)),
+ )?)
+ }
+ }
+ schema => schema_fields.push(schema_to_field(schema, Some(""), false)?),
+ }
+
+ let schema = Schema::new(schema_fields);
+ Ok(schema)
+}
+
+fn schema_to_field(
+ schema: &avro_rs::Schema,
+ name: Option<&str>,
+ nullable: bool,
+) -> Result<Field> {
+ schema_to_field_with_props(schema, name, nullable, None)
+}
+
+fn schema_to_field_with_props(
+ schema: &AvroSchema,
+ name: Option<&str>,
+ nullable: bool,
+ props: Option<&BTreeMap<String, String>>,
+) -> Result<Field> {
+ let mut nullable = nullable;
+ let field_type: DataType = match schema {
+ AvroSchema::Null => DataType::Null,
+ AvroSchema::Boolean => DataType::Boolean,
+ AvroSchema::Int => DataType::Int32,
+ AvroSchema::Long => DataType::Int64,
+ AvroSchema::Float => DataType::Float32,
+ AvroSchema::Double => DataType::Float64,
+ AvroSchema::Bytes => DataType::Binary,
+ AvroSchema::String => DataType::Utf8,
+ AvroSchema::Array(item_schema) => DataType::List(Box::new(
+ schema_to_field_with_props(item_schema, None, false, None)?,
+ )),
+ AvroSchema::Map(value_schema) => {
+ let value_field =
+ schema_to_field_with_props(value_schema, Some("value"), false, None)?;
+ DataType::Dictionary(
+ Box::new(DataType::Utf8),
+ Box::new(value_field.data_type().clone()),
+ )
+ }
+ AvroSchema::Union(us) => {
+ // If there are only two variants and one of them is null, set the other type as the field data type
+ let has_nullable = us.find_schema(&Value::Null).is_some();
+ let sub_schemas = us.variants();
+ if has_nullable && sub_schemas.len() == 2 {
+ nullable = true;
+ if let Some(schema) = sub_schemas
+ .iter()
+ .find(|&schema| !matches!(schema, AvroSchema::Null))
+ {
+ schema_to_field_with_props(schema, None, has_nullable, None)?
+ .data_type()
+ .clone()
+ } else {
+ return Err(DataFusionError::AvroError(
+ avro_rs::Error::GetUnionDuplicate,
+ ));
+ }
+ } else {
+ let fields = sub_schemas
+ .iter()
+ .map(|s| schema_to_field_with_props(s, None, has_nullable, None))
+ .collect::<Result<Vec<Field>>>()?;
+ DataType::Union(fields)
+ }
+ }
+ AvroSchema::Record { name, fields, .. } => {
+ let fields: Result<Vec<Field>> = fields
+ .iter()
+ .map(|field| {
+ let mut props = BTreeMap::new();
+ if let Some(doc) = &field.doc {
+ props.insert("avro::doc".to_string(), doc.clone());
+ }
+ /*if let Some(aliases) = fields.aliases {
+ props.insert("aliases", aliases);
+ }*/
+ schema_to_field_with_props(
+ &field.schema,
+ Some(&format!("{}.{}", name.fullname(None), field.name)),
+ false,
+ Some(&props),
+ )
+ })
+ .collect();
+ DataType::Struct(fields?)
+ }
+ AvroSchema::Enum { symbols, name, .. } => {
+ return Ok(Field::new_dict(
+ &name.fullname(None),
+ index_type(symbols.len()),
+ false,
+ 0,
+ false,
+ ))
+ }
+ AvroSchema::Fixed { size, .. } => DataType::FixedSizeBinary(*size as i32),
+ AvroSchema::Decimal {
+ precision, scale, ..
+ } => DataType::Decimal(*precision, *scale),
+ AvroSchema::Uuid => DataType::FixedSizeBinary(16),
+ AvroSchema::Date => DataType::Date32,
+ AvroSchema::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
+ AvroSchema::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
+ AvroSchema::TimestampMillis => DataType::Timestamp(TimeUnit::Millisecond, None),
+ AvroSchema::TimestampMicros => DataType::Timestamp(TimeUnit::Microsecond, None),
+ AvroSchema::Duration => DataType::Duration(TimeUnit::Millisecond),
+ };
+
+ let data_type = field_type.clone();
+ let name = name.unwrap_or_else(|| default_field_name(&data_type));
+
+ let mut field = Field::new(name, field_type, nullable);
+ field.set_metadata(props.cloned());
+ Ok(field)
+}
+
+fn default_field_name(dt: &DataType) -> &str {
+ match dt {
+ DataType::Null => "null",
+ DataType::Boolean => "bit",
+ DataType::Int8 => "tinyint",
+ DataType::Int16 => "smallint",
+ DataType::Int32 => "int",
+ DataType::Int64 => "bigint",
+ DataType::UInt8 => "uint1",
+ DataType::UInt16 => "uint2",
+ DataType::UInt32 => "uint4",
+ DataType::UInt64 => "uint8",
+ DataType::Float16 => "float2",
+ DataType::Float32 => "float4",
+ DataType::Float64 => "float8",
+ DataType::Date32 => "dateday",
+ DataType::Date64 => "datemilli",
+ DataType::Time32(tu) | DataType::Time64(tu) => match tu {
+ TimeUnit::Second => "timesec",
+ TimeUnit::Millisecond => "timemilli",
+ TimeUnit::Microsecond => "timemicro",
+ TimeUnit::Nanosecond => "timenano",
+ },
+ DataType::Timestamp(tu, tz) => {
+ if tz.is_some() {
+ match tu {
+ TimeUnit::Second => "timestampsectz",
+ TimeUnit::Millisecond => "timestampmillitz",
+ TimeUnit::Microsecond => "timestampmicrotz",
+ TimeUnit::Nanosecond => "timestampnanotz",
+ }
+ } else {
+ match tu {
+ TimeUnit::Second => "timestampsec",
+ TimeUnit::Millisecond => "timestampmilli",
+ TimeUnit::Microsecond => "timestampmicro",
+ TimeUnit::Nanosecond => "timestampnano",
+ }
+ }
+ }
+ DataType::Duration(_) => "duration",
+ DataType::Interval(unit) => match unit {
+ IntervalUnit::YearMonth => "intervalyear",
+ IntervalUnit::DayTime => "intervalmonth",
+ },
+ DataType::Binary => "varbinary",
+ DataType::FixedSizeBinary(_) => "fixedsizebinary",
+ DataType::LargeBinary => "largevarbinary",
+ DataType::Utf8 => "varchar",
+ DataType::LargeUtf8 => "largevarchar",
+ DataType::List(_) => "list",
+ DataType::FixedSizeList(_, _) => "fixed_size_list",
+ DataType::LargeList(_) => "largelist",
+ DataType::Struct(_) => "struct",
+ DataType::Union(_) => "union",
+ DataType::Dictionary(_, _) => "map",
+ DataType::Decimal(_, _) => "decimal",
+ }
+}
+
+fn index_type(len: usize) -> DataType {
+ if len <= usize::from(u8::MAX) {
+ DataType::Int8
+ } else if len <= usize::from(u16::MAX) {
+ DataType::Int16
+ } else if usize::try_from(u32::MAX).map(|i| len < i).unwrap_or(false) {
+ DataType::Int32
+ } else {
+ DataType::Int64
+ }
+}
+
+fn external_props(schema: &AvroSchema) -> BTreeMap<String, String> {
+ let mut props = BTreeMap::new();
+ match &schema {
+ AvroSchema::Record {
+ doc: Some(ref doc), ..
+ }
+ | AvroSchema::Enum {
+ doc: Some(ref doc), ..
+ } => {
+ props.insert("avro::doc".to_string(), doc.clone());
+ }
+ _ => {}
+ }
+ match &schema {
+ AvroSchema::Record {
+ name:
+ Name {
+ aliases: Some(aliases),
+ namespace,
+ ..
+ },
+ ..
+ }
+ | AvroSchema::Enum {
+ name:
+ Name {
+ aliases: Some(aliases),
+ namespace,
+ ..
+ },
+ ..
+ }
+ | AvroSchema::Fixed {
+ name:
+ Name {
+ aliases: Some(aliases),
+ namespace,
+ ..
+ },
+ ..
+ } => {
+ let aliases: Vec<String> = aliases
+ .iter()
+ .map(|alias| aliased(alias, namespace.as_deref(), None))
+ .collect();
+ props.insert(
+ "avro::aliases".to_string(),
+ format!("[{}]", aliases.join(",")),
+ );
+ }
+ _ => {}
+ }
+ props
+}
+
+#[allow(dead_code)]
+fn get_metadata(
+ _schema: AvroSchema,
+ props: BTreeMap<String, String>,
+) -> BTreeMap<String, String> {
+ let mut metadata: BTreeMap<String, String> = Default::default();
+ metadata.extend(props);
+ metadata
+}
+
+/// Returns the fully qualified name for a field
+pub fn aliased(
+ name: &str,
+ namespace: Option<&str>,
+ default_namespace: Option<&str>,
+) -> String {
+ if name.contains('.') {
+ name.to_string()
+ } else {
+ let namespace = namespace.as_ref().copied().or(default_namespace);
+
+ match namespace {
+ Some(ref namespace) => format!("{}.{}", namespace, name),
+ None => name.to_string(),
+ }
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::{aliased, external_props, to_arrow_schema};
+ use crate::arrow::datatypes::DataType::{Binary, Float32, Float64, Timestamp, Utf8};
+ use crate::arrow::datatypes::TimeUnit::Microsecond;
+ use crate::arrow::datatypes::{Field, Schema};
+ use arrow::datatypes::DataType::{Boolean, Int32, Int64};
+ use avro_rs::schema::Name;
+ use avro_rs::Schema as AvroSchema;
+
+ #[test]
+ fn test_alias() {
+ assert_eq!(aliased("foo.bar", None, None), "foo.bar");
+ assert_eq!(aliased("bar", Some("foo"), None), "foo.bar");
+ assert_eq!(aliased("bar", Some("foo"), Some("cat")), "foo.bar");
+ assert_eq!(aliased("bar", None, Some("cat")), "cat.bar");
+ }
+
+ #[test]
+ fn test_external_props() {
+ let record_schema = AvroSchema::Record {
+ name: Name {
+ name: "record".to_string(),
+ namespace: None,
+ aliases: Some(vec!["fooalias".to_string(), "baralias".to_string()]),
+ },
+ doc: Some("record documentation".to_string()),
+ fields: vec![],
+ lookup: Default::default(),
+ };
+ let props = external_props(&record_schema);
+ assert_eq!(
+ props.get("avro::doc"),
+ Some(&"record documentation".to_string())
+ );
+ assert_eq!(
+ props.get("avro::aliases"),
+ Some(&"[fooalias,baralias]".to_string())
+ );
+ let enum_schema = AvroSchema::Enum {
+ name: Name {
+ name: "enum".to_string(),
+ namespace: None,
+ aliases: Some(vec!["fooenum".to_string(), "barenum".to_string()]),
+ },
+ doc: Some("enum documentation".to_string()),
+ symbols: vec![],
+ };
+ let props = external_props(&enum_schema);
+ assert_eq!(
+ props.get("avro::doc"),
+ Some(&"enum documentation".to_string())
+ );
+ assert_eq!(
+ props.get("avro::aliases"),
+ Some(&"[fooenum,barenum]".to_string())
+ );
+ let fixed_schema = AvroSchema::Fixed {
+ name: Name {
+ name: "fixed".to_string(),
+ namespace: None,
+ aliases: Some(vec!["foofixed".to_string(), "barfixed".to_string()]),
+ },
+ size: 1,
+ };
+ let props = external_props(&fixed_schema);
+ assert_eq!(
+ props.get("avro::aliases"),
+ Some(&"[foofixed,barfixed]".to_string())
+ );
+ }
+
+ #[test]
+ fn test_invalid_avro_schema() {}
+
+ #[test]
+ fn test_plain_types_schema() {
+ let schema = AvroSchema::parse_str(
+ r#"
+ {
+ "type" : "record",
+ "name" : "topLevelRecord",
+ "fields" : [ {
+ "name" : "id",
+ "type" : [ "int", "null" ]
+ }, {
+ "name" : "bool_col",
+ "type" : [ "boolean", "null" ]
+ }, {
+ "name" : "tinyint_col",
+ "type" : [ "int", "null" ]
+ }, {
+ "name" : "smallint_col",
+ "type" : [ "int", "null" ]
+ }, {
+ "name" : "int_col",
+ "type" : [ "int", "null" ]
+ }, {
+ "name" : "bigint_col",
+ "type" : [ "long", "null" ]
+ }, {
+ "name" : "float_col",
+ "type" : [ "float", "null" ]
+ }, {
+ "name" : "double_col",
+ "type" : [ "double", "null" ]
+ }, {
+ "name" : "date_string_col",
+ "type" : [ "bytes", "null" ]
+ }, {
+ "name" : "string_col",
+ "type" : [ "bytes", "null" ]
+ }, {
+ "name" : "timestamp_col",
+ "type" : [ {
+ "type" : "long",
+ "logicalType" : "timestamp-micros"
+ }, "null" ]
+ } ]
+ }"#,
+ );
+ assert!(schema.is_ok(), "{:?}", schema);
+ let arrow_schema = to_arrow_schema(&schema.unwrap());
+ assert!(arrow_schema.is_ok(), "{:?}", arrow_schema);
+ let expected = Schema::new(vec![
+ Field::new("id", Int32, true),
+ Field::new("bool_col", Boolean, true),
+ Field::new("tinyint_col", Int32, true),
+ Field::new("smallint_col", Int32, true),
+ Field::new("int_col", Int32, true),
+ Field::new("bigint_col", Int64, true),
+ Field::new("float_col", Float32, true),
+ Field::new("double_col", Float64, true),
+ Field::new("date_string_col", Binary, true),
+ Field::new("string_col", Binary, true),
+ Field::new("timestamp_col", Timestamp(Microsecond, None), true),
+ ]);
+ assert_eq!(arrow_schema.unwrap(), expected);
+ }
+
+ #[test]
+ fn test_non_record_schema() {
+ let arrow_schema = to_arrow_schema(&AvroSchema::String);
+ assert!(arrow_schema.is_ok(), "{:?}", arrow_schema);
+ assert_eq!(
+ arrow_schema.unwrap(),
+ Schema::new(vec![Field::new("", Utf8, false)])
+ );
+ }
+}
diff --git a/datafusion/src/datasource/avro.rs b/datafusion/src/datasource/avro.rs
new file mode 100644
index 0000000..ee0fabf
--- /dev/null
+++ b/datafusion/src/datasource/avro.rs
@@ -0,0 +1,424 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Line-delimited Avro data source
+//!
+//! This data source allows Line-delimited Avro records or files to be used as input for queries.
+//!
+
+use std::{
+ any::Any,
+ io::{Read, Seek},
+ sync::{Arc, Mutex},
+};
+
+use arrow::datatypes::SchemaRef;
+
+use crate::physical_plan::avro::{AvroExec, AvroReadOptions};
+use crate::{
+ datasource::{Source, TableProvider},
+ error::{DataFusionError, Result},
+ physical_plan::{common, ExecutionPlan},
+};
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+
+/// Represents a line-delimited Avro file with a provided schema
+pub struct AvroFile {
+ source: Source<Box<dyn SeekRead + Send + Sync + 'static>>,
+ schema: SchemaRef,
+ file_extension: String,
+}
+
+impl AvroFile {
+ /// Attempt to initialize a `AvroFile` from a path. The schema can be read automatically.
+ pub fn try_new(path: &str, options: AvroReadOptions) -> Result<Self> {
+ let schema = if let Some(schema) = options.schema {
+ schema
+ } else {
+ let filenames =
+ common::build_checked_file_list(path, options.file_extension)?;
+ Arc::new(AvroExec::try_read_schema(&filenames)?)
+ };
+
+ Ok(Self {
+ source: Source::Path(path.to_string()),
+ schema,
+ file_extension: options.file_extension.to_string(),
+ })
+ }
+
+ /// Attempt to initialize a `AvroFile` from a reader. The schema MUST be provided in options
+ pub fn try_new_from_reader<R: Read + Seek + Send + Sync + 'static>(
+ reader: R,
+ options: AvroReadOptions,
+ ) -> Result<Self> {
+ let schema = match options.schema {
+ Some(s) => s,
+ None => {
+ return Err(DataFusionError::Execution(
+ "Schema must be provided to CsvRead".to_string(),
+ ));
+ }
+ };
+ Ok(Self {
+ source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+ schema,
+ file_extension: String::new(),
+ })
+ }
+
+ /// Attempt to initialize an AvroFile from a reader impls Seek. The schema can be read automatically.
+ pub fn try_new_from_reader_schema<R: Read + Seek + Send + Sync + 'static>(
+ mut reader: R,
+ options: AvroReadOptions,
+ ) -> Result<Self> {
+ let schema = {
+ if let Some(schema) = options.schema {
+ schema
+ } else {
+ Arc::new(crate::avro_to_arrow::read_avro_schema_from_reader(
+ &mut reader,
+ )?)
+ }
+ };
+
+ Ok(Self {
+ source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+ schema,
+ file_extension: String::new(),
+ })
+ }
+
+ /// Get the path for Avro file(s) represented by this AvroFile instance
+ pub fn path(&self) -> &str {
+ match &self.source {
+ Source::Reader(_) => "",
+ Source::Path(path) => path,
+ }
+ }
+
+ /// Get the file extension for the Avro file(s) represented by this AvroFile instance
+ pub fn file_extension(&self) -> &str {
+ &self.file_extension
+ }
+}
+
+impl TableProvider for AvroFile {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ fn scan(
+ &self,
+ projection: &Option<Vec<usize>>,
+ batch_size: usize,
+ _filters: &[crate::logical_plan::Expr],
+ limit: Option<usize>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let opts = AvroReadOptions {
+ schema: Some(self.schema.clone()),
+ file_extension: self.file_extension.as_str(),
+ };
+ let batch_size = limit
+ .map(|l| std::cmp::min(l, batch_size))
+ .unwrap_or(batch_size);
+
+ let exec = match &self.source {
+ Source::Reader(maybe_reader) => {
+ if let Some(rdr) = maybe_reader.lock().unwrap().take() {
+ AvroExec::try_new_from_reader(
+ rdr,
+ opts,
+ projection.clone(),
+ batch_size,
+ limit,
+ )?
+ } else {
+ return Err(DataFusionError::Execution(
+ "You can only read once if the data comes from a reader"
+ .to_string(),
+ ));
+ }
+ }
+ Source::Path(p) => {
+ AvroExec::try_from_path(p, opts, projection.clone(), batch_size, limit)?
+ }
+ };
+ Ok(Arc::new(exec))
+ }
+}
+
+#[cfg(test)]
+#[cfg(feature = "avro")]
+mod tests {
+ use arrow::array::{
+ BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
+ TimestampMicrosecondArray,
+ };
+ use arrow::record_batch::RecordBatch;
+ use futures::StreamExt;
+
+ use super::*;
+
+ #[tokio::test]
+ async fn read_small_batches() -> Result<()> {
+ let table = load_table("alltypes_plain.avro")?;
+ let projection = None;
+ let exec = table.scan(&projection, 2, &[], None)?;
+ let stream = exec.execute(0).await?;
+
+ let _ = stream
+ .map(|batch| {
+ let batch = batch.unwrap();
+ assert_eq!(11, batch.num_columns());
+ assert_eq!(2, batch.num_rows());
+ })
+ .fold(0, |acc, _| async move { acc + 1i32 })
+ .await;
+
+ Ok(())
+ }
+
+ #[cfg(feature = "avro")]
+ #[tokio::test]
+ async fn read_alltypes_plain_avro() -> Result<()> {
+ let table = load_table("alltypes_plain.avro")?;
+
+ let x: Vec<String> = table
+ .schema()
+ .fields()
+ .iter()
+ .map(|f| format!("{}: {:?}", f.name(), f.data_type()))
+ .collect();
+ let y = x.join("\n");
+ assert_eq!(
+ "id: Int32\n\
+ bool_col: Boolean\n\
+ tinyint_col: Int32\n\
+ smallint_col: Int32\n\
+ int_col: Int32\n\
+ bigint_col: Int64\n\
+ float_col: Float32\n\
+ double_col: Float64\n\
+ date_string_col: Binary\n\
+ string_col: Binary\n\
+ timestamp_col: Timestamp(Microsecond, None)",
+ y
+ );
+
+ let projection = None;
+ let batch = get_first_batch(table, &projection).await?;
+ let expected = vec![
+ "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+ "| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col | string_col | timestamp_col |",
+ "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+ "| 4 | true | 0 | 0 | 0 | 0 | 0 | 0 | 30332f30312f3039 | 30 | 2009-03-01 00:00:00 |",
+ "| 5 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30332f30312f3039 | 31 | 2009-03-01 00:01:00 |",
+ "| 6 | true | 0 | 0 | 0 | 0 | 0 | 0 | 30342f30312f3039 | 30 | 2009-04-01 00:00:00 |",
+ "| 7 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30342f30312f3039 | 31 | 2009-04-01 00:01:00 |",
+ "| 2 | true | 0 | 0 | 0 | 0 | 0 | 0 | 30322f30312f3039 | 30 | 2009-02-01 00:00:00 |",
+ "| 3 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30322f30312f3039 | 31 | 2009-02-01 00:01:00 |",
+ "| 0 | true | 0 | 0 | 0 | 0 | 0 | 0 | 30312f30312f3039 | 30 | 2009-01-01 00:00:00 |",
+ "| 1 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30312f30312f3039 | 31 | 2009-01-01 00:01:00 |",
+ "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+ ];
+
+ crate::assert_batches_eq!(expected, &[batch]);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn read_bool_alltypes_plain_avro() -> Result<()> {
+ let table = load_table("alltypes_plain.avro")?;
+ let projection = Some(vec![1]);
+ let batch = get_first_batch(table, &projection).await?;
+
+ assert_eq!(1, batch.num_columns());
+ assert_eq!(8, batch.num_rows());
+
+ let array = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<BooleanArray>()
+ .unwrap();
+ let mut values: Vec<bool> = vec![];
+ for i in 0..batch.num_rows() {
+ values.push(array.value(i));
+ }
+
+ assert_eq!(
+ "[true, false, true, false, true, false, true, false]",
+ format!("{:?}", values)
+ );
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn read_i32_alltypes_plain_avro() -> Result<()> {
+ let table = load_table("alltypes_plain.avro")?;
+ let projection = Some(vec![0]);
+ let batch = get_first_batch(table, &projection).await?;
+
+ assert_eq!(1, batch.num_columns());
+ assert_eq!(8, batch.num_rows());
+
+ let array = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ let mut values: Vec<i32> = vec![];
+ for i in 0..batch.num_rows() {
+ values.push(array.value(i));
+ }
+
+ assert_eq!("[4, 5, 6, 7, 2, 3, 0, 1]", format!("{:?}", values));
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn read_i96_alltypes_plain_avro() -> Result<()> {
+ let table = load_table("alltypes_plain.avro")?;
+ let projection = Some(vec![10]);
+ let batch = get_first_batch(table, &projection).await?;
+ assert_eq!(1, batch.num_columns());
+ assert_eq!(8, batch.num_rows());
+
+ let array = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<TimestampMicrosecondArray>()
+ .unwrap();
+ let mut values: Vec<i64> = vec![];
+ for i in 0..batch.num_rows() {
+ values.push(array.value(i));
+ }
+
+ assert_eq!("[1235865600000000, 1235865660000000, 1238544000000000, 1238544060000000, 1233446400000000, 1233446460000000, 1230768000000000, 1230768060000000]", format!("{:?}", values));
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn read_f32_alltypes_plain_avro() -> Result<()> {
+ let table = load_table("alltypes_plain.avro")?;
+ let projection = Some(vec![6]);
+ let batch = get_first_batch(table, &projection).await?;
+
+ assert_eq!(1, batch.num_columns());
+ assert_eq!(8, batch.num_rows());
+
+ let array = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<Float32Array>()
+ .unwrap();
+ let mut values: Vec<f32> = vec![];
+ for i in 0..batch.num_rows() {
+ values.push(array.value(i));
+ }
+
+ assert_eq!(
+ "[0.0, 1.1, 0.0, 1.1, 0.0, 1.1, 0.0, 1.1]",
+ format!("{:?}", values)
+ );
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn read_f64_alltypes_plain_avro() -> Result<()> {
+ let table = load_table("alltypes_plain.avro")?;
+ let projection = Some(vec![7]);
+ let batch = get_first_batch(table, &projection).await?;
+
+ assert_eq!(1, batch.num_columns());
+ assert_eq!(8, batch.num_rows());
+
+ let array = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .unwrap();
+ let mut values: Vec<f64> = vec![];
+ for i in 0..batch.num_rows() {
+ values.push(array.value(i));
+ }
+
+ assert_eq!(
+ "[0.0, 10.1, 0.0, 10.1, 0.0, 10.1, 0.0, 10.1]",
+ format!("{:?}", values)
+ );
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn read_binary_alltypes_plain_avro() -> Result<()> {
+ let table = load_table("alltypes_plain.avro")?;
+ let projection = Some(vec![9]);
+ let batch = get_first_batch(table, &projection).await?;
+
+ assert_eq!(1, batch.num_columns());
+ assert_eq!(8, batch.num_rows());
+
+ let array = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<BinaryArray>()
+ .unwrap();
+ let mut values: Vec<&str> = vec![];
+ for i in 0..batch.num_rows() {
+ values.push(std::str::from_utf8(array.value(i)).unwrap());
+ }
+
+ assert_eq!(
+ "[\"0\", \"1\", \"0\", \"1\", \"0\", \"1\", \"0\", \"1\"]",
+ format!("{:?}", values)
+ );
+
+ Ok(())
+ }
+
+ fn load_table(name: &str) -> Result<Arc<dyn TableProvider>> {
+ let testdata = crate::test_util::arrow_test_data();
+ let filename = format!("{}/avro/{}", testdata, name);
+ let table = AvroFile::try_new(&filename, AvroReadOptions::default())?;
+ Ok(Arc::new(table))
+ }
+
+ async fn get_first_batch(
+ table: Arc<dyn TableProvider>,
+ projection: &Option<Vec<usize>>,
+ ) -> Result<RecordBatch> {
+ let exec = table.scan(projection, 1024, &[], None)?;
+ let mut it = exec.execute(0).await?;
+ it.next()
+ .await
+ .expect("should have received at least one batch")
+ .map_err(|e| e.into())
+ }
+}
diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs
index 53ba517..cfa9003 100644
--- a/datafusion/src/datasource/mod.rs
+++ b/datafusion/src/datasource/mod.rs
@@ -17,6 +17,7 @@
//! DataFusion data sources
+pub mod avro;
pub mod csv;
pub mod datasource;
pub mod empty;
diff --git a/datafusion/src/error.rs b/datafusion/src/error.rs
index 903faea..6b6bb13 100644
--- a/datafusion/src/error.rs
+++ b/datafusion/src/error.rs
@@ -23,6 +23,8 @@ use std::io;
use std::result;
use arrow::error::ArrowError;
+#[cfg(feature = "avro")]
+use avro_rs::Error as AvroError;
use parquet::errors::ParquetError;
use sqlparser::parser::ParserError;
@@ -37,6 +39,9 @@ pub enum DataFusionError {
ArrowError(ArrowError),
/// Wraps an error from the Parquet crate
ParquetError(ParquetError),
+ /// Wraps an error from the Avro crate
+ #[cfg(feature = "avro")]
+ AvroError(AvroError),
/// Error associated to I/O operations and associated traits.
IoError(io::Error),
/// Error returned when SQL is syntactically incorrect.
@@ -83,6 +88,13 @@ impl From<ParquetError> for DataFusionError {
}
}
+#[cfg(feature = "avro")]
+impl From<AvroError> for DataFusionError {
+ fn from(e: AvroError) -> Self {
+ DataFusionError::AvroError(e)
+ }
+}
+
impl From<ParserError> for DataFusionError {
fn from(e: ParserError) -> Self {
DataFusionError::SQL(e)
@@ -96,6 +108,10 @@ impl Display for DataFusionError {
DataFusionError::ParquetError(ref desc) => {
write!(f, "Parquet error: {}", desc)
}
+ #[cfg(feature = "avro")]
+ DataFusionError::AvroError(ref desc) => {
+ write!(f, "Avro error: {}", desc)
+ }
DataFusionError::IoError(ref desc) => write!(f, "IO error: {}", desc),
DataFusionError::SQL(ref desc) => {
write!(f, "SQL error: {:?}", desc)
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index 82947aa..5327c58 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -67,6 +67,8 @@ use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
use crate::physical_optimizer::repartition::Repartition;
+use crate::datasource::avro::AvroFile;
+use crate::physical_plan::avro::AvroReadOptions;
use crate::physical_plan::csv::CsvReadOptions;
use crate::physical_plan::planner::DefaultPhysicalPlanner;
use crate::physical_plan::udf::ScalarUDF;
@@ -197,6 +199,11 @@ impl ExecutionContext {
let plan = LogicalPlanBuilder::empty(false).build()?;
Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
}
+ FileType::Avro => {
+ self.register_avro(name, location, AvroReadOptions::default())?;
+ let plan = LogicalPlanBuilder::empty(false).build()?;
+ Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
+ }
_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported file type {:?}.",
file_type
@@ -271,6 +278,19 @@ impl ExecutionContext {
.insert(f.name.clone(), Arc::new(f));
}
+ /// Creates a DataFrame for reading an Avro data source.
+
+ pub fn read_avro(
+ &mut self,
+ filename: impl Into<String>,
+ options: AvroReadOptions,
+ ) -> Result<Arc<dyn DataFrame>> {
+ Ok(Arc::new(DataFrameImpl::new(
+ self.state.clone(),
+ &LogicalPlanBuilder::scan_avro(filename, options, None)?.build()?,
+ )))
+ }
+
/// Creates a DataFrame for reading a CSV data source.
pub fn read_csv(
&mut self,
@@ -334,6 +354,18 @@ impl ExecutionContext {
Ok(())
}
+ /// Registers an Avro data source so that it can be referenced from SQL statements
+ /// executed against this context.
+ pub fn register_avro(
+ &mut self,
+ name: &str,
+ filename: &str,
+ options: AvroReadOptions,
+ ) -> Result<()> {
+ self.register_table(name, Arc::new(AvroFile::try_new(filename, options)?))?;
+ Ok(())
+ }
+
/// Registers a named catalog using a custom `CatalogProvider` so that
/// it can be referenced from SQL statements executed against this
/// context.
diff --git a/datafusion/src/lib.rs b/datafusion/src/lib.rs
index eac9b5f..adaca11 100644
--- a/datafusion/src/lib.rs
+++ b/datafusion/src/lib.rs
@@ -212,6 +212,7 @@
extern crate sqlparser;
+pub mod avro_to_arrow;
pub mod catalog;
pub mod dataframe;
pub mod datasource;
diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs
index f31dd37..5555939 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -36,10 +36,12 @@ use crate::{
use super::dfschema::ToDFSchema;
use super::{exprlist_to_fields, Expr, JoinConstraint, JoinType, LogicalPlan, PlanType};
+use crate::datasource::avro::AvroFile;
use crate::logical_plan::{
columnize_expr, normalize_col, normalize_cols, Column, DFField, DFSchema,
DFSchemaRef, Partitioning,
};
+use crate::physical_plan::avro::AvroReadOptions;
/// Default table name for unnamed table
pub const UNNAMED_TABLE: &str = "?table?";
@@ -154,6 +156,27 @@ impl LogicalPlanBuilder {
Self::scan(table_name, provider, projection)
}
+ /// Scan an Avro data source
+ pub fn scan_avro(
+ path: impl Into<String>,
+ options: AvroReadOptions,
+ projection: Option<Vec<usize>>,
+ ) -> Result<Self> {
+ let path = path.into();
+ Self::scan_avro_with_name(path.clone(), options, projection, path)
+ }
+
+ /// Scan an Avro data source and register it with a given table name
+ pub fn scan_avro_with_name(
+ path: impl Into<String>,
+ options: AvroReadOptions,
+ projection: Option<Vec<usize>>,
+ table_name: impl Into<String>,
+ ) -> Result<Self> {
+ let provider = Arc::new(AvroFile::try_new(&path.into(), options)?);
+ Self::scan(table_name, provider, projection)
+ }
+
/// Scan an empty data source, mainly used in tests
pub fn scan_empty(
name: Option<&str>,
diff --git a/datafusion/src/logical_plan/dfschema.rs b/datafusion/src/logical_plan/dfschema.rs
index c067b5f..1ef8ac7 100644
--- a/datafusion/src/logical_plan/dfschema.rs
+++ b/datafusion/src/logical_plan/dfschema.rs
@@ -167,7 +167,6 @@ impl DFSchema {
(None, Some(_)) | (None, None) => field.name() == name,
})
.map(|(idx, _)| idx);
-
match matches.next() {
None => Err(DataFusionError::Plan(format!(
"No field named '{}.{}'. Valid fields are {}.",
diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs
index ec017d0..eb46099 100644
--- a/datafusion/src/logical_plan/expr.rs
+++ b/datafusion/src/logical_plan/expr.rs
@@ -1911,17 +1911,13 @@ mod tests {
impl ExprRewriter for FooBarRewriter {
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
match expr {
- Expr::Literal(scalar) => {
- if let ScalarValue::Utf8(Some(utf8_val)) = scalar {
- let utf8_val = if utf8_val == "foo" {
- "bar".to_string()
- } else {
- utf8_val
- };
- Ok(lit(utf8_val))
+ Expr::Literal(ScalarValue::Utf8(Some(utf8_val))) => {
+ let utf8_val = if utf8_val == "foo" {
+ "bar".to_string()
} else {
- Ok(Expr::Literal(scalar))
- }
+ utf8_val
+ };
+ Ok(lit(utf8_val))
}
// otherwise, return the expression unchanged
expr => Ok(expr),
diff --git a/datafusion/src/physical_plan/avro.rs b/datafusion/src/physical_plan/avro.rs
new file mode 100644
index 0000000..3f0b007
--- /dev/null
+++ b/datafusion/src/physical_plan/avro.rs
@@ -0,0 +1,457 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for reading line-delimited Avro files
+#[cfg(feature = "avro")]
+use super::RecordBatchStream;
+use super::{common, source::Source, ExecutionPlan, Partitioning};
+use crate::avro_to_arrow::read_avro_schema_from_reader;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{DisplayFormatType, Statistics};
+use arrow::datatypes::{Schema, SchemaRef};
+#[cfg(feature = "avro")]
+use arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
+use async_trait::async_trait;
+#[cfg(feature = "avro")]
+use futures::Stream;
+use std::fs::File;
+use std::{any::Any, io::Seek};
+use std::{
+ io::Read,
+ sync::{Arc, Mutex},
+};
+#[cfg(feature = "avro")]
+use std::{
+ pin::Pin,
+ task::{Context, Poll},
+};
+
+/// Line-delimited Avro read options
+#[derive(Clone)]
+pub struct AvroReadOptions<'a> {
+ /// The data source schema.
+ pub schema: Option<SchemaRef>,
+
+ /// File extension; only files with this extension are selected for data input.
+ /// Defaults to ".avro".
+ pub file_extension: &'a str,
+}
+
+impl<'a> Default for AvroReadOptions<'a> {
+ fn default() -> Self {
+ Self {
+ schema: None,
+ file_extension: ".avro",
+ }
+ }
+}
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+/// Execution plan for scanning Avro data source
+#[derive(Debug)]
+pub struct AvroExec {
+ source: Source<Box<dyn SeekRead + Send + Sync>>,
+ schema: SchemaRef,
+ projection: Option<Vec<usize>>,
+ projected_schema: SchemaRef,
+ file_extension: String,
+ batch_size: usize,
+ limit: Option<usize>,
+}
+
+impl AvroExec {
+ /// Create a new execution plan for reading from a path
+ pub fn try_from_path(
+ path: &str,
+ options: AvroReadOptions,
+ projection: Option<Vec<usize>>,
+ batch_size: usize,
+ limit: Option<usize>,
+ ) -> Result<Self> {
+ let file_extension = options.file_extension.to_string();
+
+ let filenames = common::build_file_list(path, &file_extension)?;
+
+ if filenames.is_empty() {
+ return Err(DataFusionError::Execution(format!(
+ "No files found at {path} with file extension {file_extension}",
+ path = path,
+ file_extension = file_extension.as_str()
+ )));
+ }
+
+ let schema = match options.schema {
+ Some(s) => s,
+ None => Arc::new(AvroExec::try_read_schema(filenames.as_slice())?),
+ };
+
+ let projected_schema = match &projection {
+ None => schema.clone(),
+ Some(p) => Arc::new(Schema::new(
+ p.iter().map(|i| schema.field(*i).clone()).collect(),
+ )),
+ };
+
+ Ok(Self {
+ source: Source::PartitionedFiles {
+ path: path.to_string(),
+ filenames,
+ },
+ schema,
+ projected_schema,
+ file_extension,
+ projection,
+ batch_size,
+ limit,
+ })
+ }
+ /// Create a new execution plan for reading from a reader
+ pub fn try_new_from_reader(
+ reader: impl Read + Seek + Send + Sync + 'static,
+ options: AvroReadOptions,
+ projection: Option<Vec<usize>>,
+ batch_size: usize,
+ limit: Option<usize>,
+ ) -> Result<Self> {
+ let schema = match options.schema {
+ Some(s) => s,
+ None => {
+ return Err(DataFusionError::Execution(
+ "The schema must be provided in options when reading from a reader"
+ .to_string(),
+ ));
+ }
+ };
+
+ let projected_schema = match &projection {
+ None => schema.clone(),
+ Some(p) => Arc::new(Schema::new(
+ p.iter().map(|i| schema.field(*i).clone()).collect(),
+ )),
+ };
+
+ Ok(Self {
+ source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+ schema,
+ file_extension: String::new(),
+ projection,
+ projected_schema,
+ batch_size,
+ limit,
+ })
+ }
+
+ /// Path to directory containing partitioned CSV files with the same schema
+ pub fn path(&self) -> &str {
+ self.source.path()
+ }
+
+ /// The individual files under path
+ pub fn filenames(&self) -> &[String] {
+ self.source.filenames()
+ }
+
+ /// File extension
+ pub fn file_extension(&self) -> &str {
+ &self.file_extension
+ }
+
+ /// Get the schema of the avro file
+ pub fn file_schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ /// Optional projection for which columns to load
+ pub fn projection(&self) -> Option<&Vec<usize>> {
+ self.projection.as_ref()
+ }
+
+ /// Batch size
+ pub fn batch_size(&self) -> usize {
+ self.batch_size
+ }
+
+ /// Limit
+ pub fn limit(&self) -> Option<usize> {
+ self.limit
+ }
+
+ /// Read schema for given Avro dataset
+ pub fn try_read_schema(filenames: &[String]) -> Result<Schema> {
+ let mut schemas = Vec::new();
+ for filename in filenames {
+ let mut file = File::open(filename)?;
+ let schema = read_avro_schema_from_reader(&mut file)?;
+ schemas.push(schema);
+ }
+
+ Ok(Schema::try_merge(schemas)?)
+ }
+}
+
+#[async_trait]
+impl ExecutionPlan for AvroExec {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.projected_schema.clone()
+ }
+
+ fn output_partitioning(&self) -> Partitioning {
+ Partitioning::UnknownPartitioning(match &self.source {
+ Source::PartitionedFiles { filenames, .. } => filenames.len(),
+ Source::Reader(_) => 1,
+ })
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ Vec::new()
+ }
+
+ fn with_new_children(
+ &self,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ if !children.is_empty() {
+ Err(DataFusionError::Internal(format!(
+ "Children cannot be replaced in {:?}",
+ self
+ )))
+ } else if let Source::PartitionedFiles { filenames, path } = &self.source {
+ Ok(Arc::new(Self {
+ source: Source::PartitionedFiles {
+ filenames: filenames.clone(),
+ path: path.clone(),
+ },
+ schema: self.schema.clone(),
+ projection: self.projection.clone(),
+ projected_schema: self.projected_schema.clone(),
+ batch_size: self.batch_size,
+ limit: self.limit,
+ file_extension: self.file_extension.clone(),
+ }))
+ } else {
+ Err(DataFusionError::Internal(
+ "AvroExec with reader source cannot be used with `with_new_children`"
+ .to_string(),
+ ))
+ }
+ }
+
+ #[cfg(not(feature = "avro"))]
+ async fn execute(
+ &self,
+ _partition: usize,
+ ) -> Result<super::SendableRecordBatchStream> {
+ Err(DataFusionError::NotImplemented(
+ "Cannot execute avro plan without avro feature enabled".to_string(),
+ ))
+ }
+
+ #[cfg(feature = "avro")]
+ async fn execute(
+ &self,
+ partition: usize,
+ ) -> Result<super::SendableRecordBatchStream> {
+ let mut builder = crate::avro_to_arrow::ReaderBuilder::new()
+ .with_schema(self.schema.clone())
+ .with_batch_size(self.batch_size);
+ if let Some(proj) = &self.projection {
+ builder = builder.with_projection(
+ proj.iter()
+ .map(|col_idx| self.schema.field(*col_idx).name())
+ .cloned()
+ .collect(),
+ );
+ }
+ match &self.source {
+ Source::PartitionedFiles { filenames, .. } => {
+ let file = File::open(&filenames[partition])?;
+
+ Ok(Box::pin(AvroStream::new(builder.build(file)?, self.limit)))
+ }
+ Source::Reader(rdr) => {
+ if partition != 0 {
+ Err(DataFusionError::Internal(
+ "Only partition 0 is valid when Avro comes from a reader"
+ .to_string(),
+ ))
+ } else if let Some(rdr) = rdr.lock().unwrap().take() {
+ Ok(Box::pin(AvroStream::new(builder.build(rdr)?, self.limit)))
+ } else {
+ Err(DataFusionError::Execution(
+ "Error reading Avro: Data can only be read a single time when the source is a reader"
+ .to_string(),
+ ))
+ }
+ }
+ }
+ }
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(
+ f,
+ "AvroExec: source={}, batch_size={}, limit={:?}",
+ self.source, self.batch_size, self.limit
+ )
+ }
+ }
+ }
+
+ fn statistics(&self) -> Statistics {
+ Statistics::default()
+ }
+}
+
+#[cfg(feature = "avro")]
+struct AvroStream<'a, R: Read> {
+ reader: crate::avro_to_arrow::Reader<'a, R>,
+ remain: Option<usize>,
+}
+
+#[cfg(feature = "avro")]
+impl<'a, R: Read> AvroStream<'a, R> {
+ fn new(reader: crate::avro_to_arrow::Reader<'a, R>, limit: Option<usize>) -> Self {
+ Self {
+ reader,
+ remain: limit,
+ }
+ }
+}
+
+#[cfg(feature = "avro")]
+impl<R: Read + Unpin> Stream for AvroStream<'_, R> {
+ type Item = ArrowResult<RecordBatch>;
+
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ if let Some(remain) = self.remain.as_mut() {
+ if *remain < 1 {
+ return Poll::Ready(None);
+ }
+ }
+
+ Poll::Ready(match self.reader.next() {
+ Ok(Some(item)) => {
+ if let Some(remain) = self.remain.as_mut() {
+ if *remain >= item.num_rows() {
+ *remain -= item.num_rows();
+ Some(Ok(item))
+ } else {
+ let len = *remain;
+ *remain = 0;
+ Some(Ok(RecordBatch::try_new(
+ item.schema(),
+ item.columns()
+ .iter()
+ .map(|column| column.slice(0, len))
+ .collect(),
+ )?))
+ }
+ } else {
+ Some(Ok(item))
+ }
+ }
+ Ok(None) => None,
+ Err(err) => Some(Err(err)),
+ })
+ }
+}
+
+#[cfg(feature = "avro")]
+impl<R: Read + Unpin> RecordBatchStream for AvroStream<'_, R> {
+ fn schema(&self) -> SchemaRef {
+ self.reader.schema()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[tokio::test]
+ #[cfg(feature = "avro")]
+ async fn test() -> Result<()> {
+ use futures::StreamExt;
+
+ let testdata = crate::test_util::arrow_test_data();
+ let filename = format!("{}/avro/alltypes_plain.avro", testdata);
+ let avro_exec = AvroExec::try_from_path(
+ &filename,
+ AvroReadOptions::default(),
+ Some(vec![0, 1, 2]),
+ 1024,
+ None,
+ )?;
+ assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
+
+ let mut results = avro_exec.execute(0).await?;
+ let batch = results.next().await.unwrap()?;
+
+ assert_eq!(8, batch.num_rows());
+ assert_eq!(3, batch.num_columns());
+
+ let schema = batch.schema();
+ let field_names: Vec<&str> =
+ schema.fields().iter().map(|f| f.name().as_str()).collect();
+ assert_eq!(vec!["id", "bool_col", "tinyint_col"], field_names);
+
+ let batch = results.next().await;
+ assert!(batch.is_none());
+
+ let batch = results.next().await;
+ assert!(batch.is_none());
+
+ let batch = results.next().await;
+ assert!(batch.is_none());
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ #[cfg(not(feature = "avro"))]
+ async fn test() -> Result<()> {
+ let testdata = crate::test_util::arrow_test_data();
+ let filename = format!("{}/avro/alltypes_plain.avro", testdata);
+ let avro_exec = AvroExec::try_from_path(
+ &filename,
+ AvroReadOptions::default(),
+ Some(vec![0, 1, 2]),
+ 1024,
+ None,
+ );
+ assert!(matches!(
+ avro_exec,
+ Err(DataFusionError::NotImplemented(msg))
+ if msg == *"cannot read avro schema without the 'avro' feature enabled"
+ ));
+
+ Ok(())
+ }
+}
diff --git a/datafusion/src/physical_plan/common.rs b/datafusion/src/physical_plan/common.rs
index d0b7a07..3be9e72 100644
--- a/datafusion/src/physical_plan/common.rs
+++ b/datafusion/src/physical_plan/common.rs
@@ -108,6 +108,20 @@ pub(crate) fn combine_batches(
}
/// Recursively builds a list of files in a directory with a given extension
+pub fn build_checked_file_list(dir: &str, ext: &str) -> Result<Vec<String>> {
+ let mut filenames: Vec<String> = Vec::new();
+ build_file_list_recurse(dir, &mut filenames, ext)?;
+ if filenames.is_empty() {
+ return Err(DataFusionError::Plan(format!(
+ "No files found at {path} with file extension {file_extension}",
+ path = dir,
+ file_extension = ext
+ )));
+ }
+ Ok(filenames)
+}
+
+/// Recursively builds a list of files in a directory with a given extension
pub fn build_file_list(dir: &str, ext: &str) -> Result<Vec<String>> {
let mut filenames: Vec<String> = Vec::new();
build_file_list_recurse(dir, &mut filenames, ext)?;
diff --git a/datafusion/src/physical_plan/datetime_expressions.rs b/datafusion/src/physical_plan/datetime_expressions.rs
index 39ae70d..a776c42f 100644
--- a/datafusion/src/physical_plan/datetime_expressions.rs
+++ b/datafusion/src/physical_plan/datetime_expressions.rs
@@ -236,14 +236,8 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let f = |x: Option<i64>| x.map(|x| date_trunc_single(granularity, x)).transpose();
Ok(match array {
- ColumnarValue::Scalar(scalar) => {
- if let ScalarValue::TimestampNanosecond(v) = scalar {
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond((f)(*v)?))
- } else {
- return Err(DataFusionError::Execution(
- "array of `date_trunc` must be non-null scalar Utf8".to_string(),
- ));
- }
+ ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v)) => {
+ ColumnarValue::Scalar(ScalarValue::TimestampNanosecond((f)(*v)?))
}
ColumnarValue::Array(array) => {
let array = array
@@ -257,6 +251,11 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
ColumnarValue::Array(Arc::new(array))
}
+ _ => {
+ return Err(DataFusionError::Execution(
+ "array of `date_trunc` must be non-null scalar Utf8".to_string(),
+ ));
+ }
})
}
diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs
index af86887..3701e90 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -603,6 +603,7 @@ pub trait Accumulator: Send + Sync + Debug {
pub mod aggregates;
pub mod analyze;
pub mod array_expressions;
+pub mod avro;
pub mod coalesce_batches;
pub mod coalesce_partitions;
pub mod common;
diff --git a/datafusion/src/physical_plan/source.rs b/datafusion/src/physical_plan/source.rs
index 012405a..32fa9c3 100644
--- a/datafusion/src/physical_plan/source.rs
+++ b/datafusion/src/physical_plan/source.rs
@@ -46,7 +46,7 @@ impl<R> std::fmt::Debug for Source<R> {
Ok(())
}
}
-impl std::fmt::Display for Source {
+impl<R> std::fmt::Display for Source<R> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Source::PartitionedFiles { path, filenames } => {
diff --git a/datafusion/src/physical_plan/string_expressions.rs b/datafusion/src/physical_plan/string_expressions.rs
index 09e19c4..7cbebce 100644
--- a/datafusion/src/physical_plan/string_expressions.rs
+++ b/datafusion/src/physical_plan/string_expressions.rs
@@ -290,6 +290,7 @@ pub fn concat(args: &[ColumnarValue]) -> Result<ColumnarValue> {
.map(|index| {
let mut owned_string: String = "".to_owned();
for arg in args {
+ #[allow(clippy::collapsible_match)]
match arg {
ColumnarValue::Scalar(ScalarValue::Utf8(maybe_value)) => {
if let Some(value) = maybe_value {
diff --git a/datafusion/src/sql/parser.rs b/datafusion/src/sql/parser.rs
index bb2f9e6..864801c 100644
--- a/datafusion/src/sql/parser.rs
+++ b/datafusion/src/sql/parser.rs
@@ -43,6 +43,8 @@ pub enum FileType {
Parquet,
/// Comma separated values
CSV,
+ /// Avro binary records
+ Avro,
}
impl FromStr for FileType {
@@ -53,8 +55,9 @@ impl FromStr for FileType {
"PARQUET" => Ok(Self::Parquet),
"NDJSON" => Ok(Self::NdJson),
"CSV" => Ok(Self::CSV),
+ "AVRO" => Ok(Self::Avro),
other => Err(ParserError::ParserError(format!(
- "expect one of PARQUET, NDJSON, or CSV, found: {}",
+ "expect one of PARQUET, AVRO, NDJSON, or CSV, found: {}",
other
))),
}
@@ -390,10 +393,21 @@ mod tests {
});
expect_parse_ok(sql, expected)?;
+ // positive case: it is ok for avro files not to have columns specified
+ let sql = "CREATE EXTERNAL TABLE t STORED AS AVRO LOCATION 'foo.avro'";
+ let expected = Statement::CreateExternalTable(CreateExternalTable {
+ name: "t".into(),
+ columns: vec![],
+ file_type: FileType::Avro,
+ has_header: false,
+ location: "foo.avro".into(),
+ });
+ expect_parse_ok(sql, expected)?;
+
// Error cases: Invalid type
let sql =
"CREATE EXTERNAL TABLE t(c1 int) STORED AS UNKNOWN_TYPE LOCATION 'foo.csv'";
- expect_parse_error(sql, "expect one of PARQUET, NDJSON, or CSV");
+ expect_parse_error(sql, "expect one of PARQUET, AVRO, NDJSON, or CSV");
Ok(())
}
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index e613ff3..50c36dd 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -212,6 +212,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
}
FileType::NdJson => {}
+ FileType::Avro => {}
};
let schema = self.build_schema(columns)?;
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 90173be..8916339 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -27,20 +27,16 @@ use chrono::Duration;
extern crate arrow;
extern crate datafusion;
-use arrow::{array::*, datatypes::TimeUnit};
-use arrow::{datatypes::Int32Type, datatypes::Int64Type, record_batch::RecordBatch};
use arrow::{
- datatypes::{
- ArrowNativeType, ArrowPrimitiveType, ArrowTimestampType, DataType, Field, Schema,
- SchemaRef, TimestampMicrosecondType, TimestampMillisecondType,
- TimestampNanosecondType, TimestampSecondType,
- },
+ array::*, datatypes::*, record_batch::RecordBatch,
util::display::array_value_to_string,
};
use datafusion::assert_batches_eq;
use datafusion::assert_batches_sorted_eq;
use datafusion::logical_plan::LogicalPlan;
+#[cfg(feature = "avro")]
+use datafusion::physical_plan::avro::AvroReadOptions;
use datafusion::physical_plan::metrics::MetricValue;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::ExecutionPlanVisitor;
@@ -2960,6 +2956,17 @@ fn register_alltypes_parquet(ctx: &mut ExecutionContext) {
.unwrap();
}
+#[cfg(feature = "avro")]
+fn register_alltypes_avro(ctx: &mut ExecutionContext) {
+ let testdata = datafusion::test_util::arrow_test_data();
+ ctx.register_avro(
+ "alltypes_plain",
+ &format!("{}/avro/alltypes_plain.avro", testdata),
+ AvroReadOptions::default(),
+ )
+ .unwrap();
+}
+
/// Execute query and return result set as 2-d table of Vecs
/// `result[row][column]`
async fn execute_to_batches(ctx: &mut ExecutionContext, sql: &str) -> Vec<RecordBatch> {
@@ -4685,3 +4692,137 @@ async fn test_regexp_is_match() -> Result<()> {
assert_batches_eq!(expected, &actual);
Ok(())
}
+
+#[cfg(feature = "avro")]
+#[tokio::test]
+async fn avro_query() {
+ let mut ctx = ExecutionContext::new();
+ register_alltypes_avro(&mut ctx);
+ // NOTE that string_col is actually a binary column and does not have the UTF8 logical type
+ // so we need an explicit cast
+ let sql = "SELECT id, CAST(string_col AS varchar) FROM alltypes_plain";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+----+-----------------------------------------+",
+ "| id | CAST(alltypes_plain.string_col AS Utf8) |",
+ "+----+-----------------------------------------+",
+ "| 4 | 0 |",
+ "| 5 | 1 |",
+ "| 6 | 0 |",
+ "| 7 | 1 |",
+ "| 2 | 0 |",
+ "| 3 | 1 |",
+ "| 0 | 0 |",
+ "| 1 | 1 |",
+ "+----+-----------------------------------------+",
+ ];
+
+ assert_batches_eq!(expected, &actual);
+}
+
+#[cfg(feature = "avro")]
+#[tokio::test]
+async fn avro_query_multiple_files() {
+ let tempdir = tempfile::tempdir().unwrap();
+ let table_path = tempdir.path();
+ let testdata = datafusion::test_util::arrow_test_data();
+ let alltypes_plain_file = format!("{}/avro/alltypes_plain.avro", testdata);
+ std::fs::copy(
+ &alltypes_plain_file,
+ format!("{}/alltypes_plain1.avro", table_path.display()),
+ )
+ .unwrap();
+ std::fs::copy(
+ &alltypes_plain_file,
+ format!("{}/alltypes_plain2.avro", table_path.display()),
+ )
+ .unwrap();
+
+ let mut ctx = ExecutionContext::new();
+ ctx.register_avro(
+ "alltypes_plain",
+ table_path.display().to_string().as_str(),
+ AvroReadOptions::default(),
+ )
+ .unwrap();
+ // NOTE that string_col is actually a binary column and does not have the UTF8 logical type
+ // so we need an explicit cast
+ let sql = "SELECT id, CAST(string_col AS varchar) FROM alltypes_plain";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+----+-----------------------------------------+",
+ "| id | CAST(alltypes_plain.string_col AS Utf8) |",
+ "+----+-----------------------------------------+",
+ "| 4 | 0 |",
+ "| 5 | 1 |",
+ "| 6 | 0 |",
+ "| 7 | 1 |",
+ "| 2 | 0 |",
+ "| 3 | 1 |",
+ "| 0 | 0 |",
+ "| 1 | 1 |",
+ "| 4 | 0 |",
+ "| 5 | 1 |",
+ "| 6 | 0 |",
+ "| 7 | 1 |",
+ "| 2 | 0 |",
+ "| 3 | 1 |",
+ "| 0 | 0 |",
+ "| 1 | 1 |",
+ "+----+-----------------------------------------+",
+ ];
+
+ assert_batches_eq!(expected, &actual);
+}
+
+#[cfg(feature = "avro")]
+#[tokio::test]
+async fn avro_single_nan_schema() {
+ let mut ctx = ExecutionContext::new();
+ let testdata = datafusion::test_util::arrow_test_data();
+ ctx.register_avro(
+ "single_nan",
+ &format!("{}/avro/single_nan.avro", testdata),
+ AvroReadOptions::default(),
+ )
+ .unwrap();
+ let sql = "SELECT mycol FROM single_nan";
+ let plan = ctx.create_logical_plan(sql).unwrap();
+ let plan = ctx.optimize(&plan).unwrap();
+ let plan = ctx.create_physical_plan(&plan).unwrap();
+ let results = collect(plan).await.unwrap();
+ for batch in results {
+ assert_eq!(1, batch.num_rows());
+ assert_eq!(1, batch.num_columns());
+ }
+}
+
+#[cfg(feature = "avro")]
+#[tokio::test]
+async fn avro_explain() {
+ let mut ctx = ExecutionContext::new();
+ register_alltypes_avro(&mut ctx);
+
+ let sql = "EXPLAIN SELECT count(*) from alltypes_plain";
+ let actual = execute(&mut ctx, sql).await;
+ let actual = normalize_vec_for_explain(actual);
+ let expected = vec![
+ vec![
+ "logical_plan",
+ "Projection: #COUNT(UInt8(1))\
+ \n Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\
+ \n TableScan: alltypes_plain projection=Some([0])",
+ ],
+ vec![
+ "physical_plan",
+ "ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))]\
+ \n HashAggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\
+ \n CoalescePartitionsExec\
+ \n HashAggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\
+ \n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
+ \n AvroExec: source=Path(ARROW_TEST_DATA/avro/alltypes_plain.avro: [ARROW_TEST_DATA/avro/alltypes_plain.avro]), batch_size=8192, limit=None\
+ \n",
+ ],
+ ];
+ assert_eq!(expected, actual);
+}
diff --git a/testing b/testing
index b658b08..a8f7be3 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit b658b087767b041b2081766814655b4dd5a9a439
+Subproject commit a8f7be380531758eb7962542a5eb020d8795aa20