You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/16 21:36:54 UTC
[arrow-datafusion] branch main updated: Add support for reading Arrow files (#6337)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 8a47c42096 Add support for reading Arrow files (#6337)
8a47c42096 is described below
commit 8a47c42096311cf9b6191cfb9d96e2d9ba3a630d
Author: Jon Mease <jo...@gmail.com>
AuthorDate: Tue May 16 17:36:48 2023 -0400
Add support for reading Arrow files (#6337)
* Add support for reading Arrow files
* Remove commented test
* Fix warnings
* license
* fmt
* Module docstring
* clippy
* Support registering arrow files in SQL
* Add note that arrow files follow the arrow IPC format
* sqllogic test to register and query arrow table
---
.../core/src/datasource/file_format/arrow.rs | 96 ++++++++++++
.../core/src/datasource/file_format/file_type.rs | 7 +-
datafusion/core/src/datasource/file_format/mod.rs | 1 +
.../core/src/datasource/file_format/options.rs | 69 +++++++++
datafusion/core/src/datasource/listing/table.rs | 5 +-
.../core/src/datasource/listing_table_factory.rs | 2 +
datafusion/core/src/execution/context.rs | 37 +++++
.../src/physical_plan/file_format/arrow_file.rs | 166 +++++++++++++++++++++
.../core/src/physical_plan/file_format/mod.rs | 2 +
datafusion/core/tests/data/example.arrow | Bin 0 -> 1842 bytes
datafusion/core/tests/sql/arrow_files.rs | 70 +++++++++
datafusion/core/tests/sql/mod.rs | 1 +
.../core/tests/sqllogictests/test_files/ddl.slt | 12 ++
13 files changed, 465 insertions(+), 3 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs
new file mode 100644
index 0000000000..2a27468f45
--- /dev/null
+++ b/datafusion/core/src/datasource/file_format/arrow.rs
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Apache Arrow format abstractions
+//!
+//! Works with files following the [Arrow IPC format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format)
+
+use crate::datasource::file_format::FileFormat;
+use crate::error::Result;
+use crate::execution::context::SessionState;
+use crate::physical_plan::file_format::{ArrowExec, FileScanConfig};
+use crate::physical_plan::ExecutionPlan;
+use arrow::ipc::reader::FileReader;
+use arrow_schema::{Schema, SchemaRef};
+use async_trait::async_trait;
+use datafusion_common::Statistics;
+use datafusion_physical_expr::PhysicalExpr;
+use object_store::{GetResult, ObjectMeta, ObjectStore};
+use std::any::Any;
+use std::io::{Read, Seek};
+use std::sync::Arc;
+
+/// The default file extension of arrow files
+pub const DEFAULT_ARROW_EXTENSION: &str = ".arrow";
+/// Arrow `FileFormat` implementation.
+#[derive(Default, Debug)]
+pub struct ArrowFormat;
+
+#[async_trait]
+impl FileFormat for ArrowFormat {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ async fn infer_schema(
+ &self,
+ _state: &SessionState,
+ store: &Arc<dyn ObjectStore>,
+ objects: &[ObjectMeta],
+ ) -> Result<SchemaRef> {
+ let mut schemas = vec![];
+ for object in objects {
+ let schema = match store.get(&object.location).await? {
+ GetResult::File(mut file, _) => read_arrow_schema_from_reader(&mut file)?,
+ r @ GetResult::Stream(_) => {
+ // TODO: Fetching entire file to get schema is potentially wasteful
+ let data = r.bytes().await?;
+ let mut cursor = std::io::Cursor::new(&data);
+ read_arrow_schema_from_reader(&mut cursor)?
+ }
+ };
+ schemas.push(schema.as_ref().clone());
+ }
+ let merged_schema = Schema::try_merge(schemas)?;
+ Ok(Arc::new(merged_schema))
+ }
+
+ async fn infer_stats(
+ &self,
+ _state: &SessionState,
+ _store: &Arc<dyn ObjectStore>,
+ _table_schema: SchemaRef,
+ _object: &ObjectMeta,
+ ) -> Result<Statistics> {
+ Ok(Statistics::default())
+ }
+
+ async fn create_physical_plan(
+ &self,
+ _state: &SessionState,
+ conf: FileScanConfig,
+ _filters: Option<&Arc<dyn PhysicalExpr>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let exec = ArrowExec::new(conf);
+ Ok(Arc::new(exec))
+ }
+}
+
+fn read_arrow_schema_from_reader<R: Read + Seek>(reader: R) -> Result<SchemaRef> {
+ let reader = FileReader::try_new(reader, None)?;
+ Ok(reader.schema())
+}
diff --git a/datafusion/core/src/datasource/file_format/file_type.rs b/datafusion/core/src/datasource/file_format/file_type.rs
index e72b1e579e..58877c7a82 100644
--- a/datafusion/core/src/datasource/file_format/file_type.rs
+++ b/datafusion/core/src/datasource/file_format/file_type.rs
@@ -38,6 +38,7 @@ use datafusion_common::parsers::CompressionTypeVariant;
#[cfg(feature = "compression")]
use flate2::read::MultiGzDecoder;
+use crate::datasource::file_format::arrow::DEFAULT_ARROW_EXTENSION;
use futures::stream::BoxStream;
use futures::StreamExt;
#[cfg(feature = "compression")]
@@ -211,6 +212,8 @@ impl FileCompressionType {
/// Readable file type
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FileType {
+ /// Apache Arrow file
+ ARROW,
/// Apache Avro file
AVRO,
/// Apache Parquet file
@@ -224,6 +227,7 @@ pub enum FileType {
impl GetExt for FileType {
fn get_ext(&self) -> String {
match self {
+ FileType::ARROW => DEFAULT_ARROW_EXTENSION.to_owned(),
FileType::AVRO => DEFAULT_AVRO_EXTENSION.to_owned(),
FileType::PARQUET => DEFAULT_PARQUET_EXTENSION.to_owned(),
FileType::CSV => DEFAULT_CSV_EXTENSION.to_owned(),
@@ -238,6 +242,7 @@ impl FromStr for FileType {
fn from_str(s: &str) -> Result<Self> {
let s = s.to_uppercase();
match s.as_str() {
+ "ARROW" => Ok(FileType::ARROW),
"AVRO" => Ok(FileType::AVRO),
"PARQUET" => Ok(FileType::PARQUET),
"CSV" => Ok(FileType::CSV),
@@ -256,7 +261,7 @@ impl FileType {
match self {
FileType::JSON | FileType::CSV => Ok(format!("{}{}", ext, c.get_ext())),
- FileType::PARQUET | FileType::AVRO => match c.variant {
+ FileType::PARQUET | FileType::AVRO | FileType::ARROW => match c.variant {
UNCOMPRESSED => Ok(ext),
_ => Err(DataFusionError::Internal(
"FileCompressionType can be specified for CSV/JSON FileType.".into(),
diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs
index 52da7285e3..28f798ade4 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -20,6 +20,7 @@
/// Default max records to scan to infer the schema
pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
+pub mod arrow;
pub mod avro;
pub mod csv;
pub mod file_type;
diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs
index e51edf829e..4024da3313 100644
--- a/datafusion/core/src/datasource/file_format/options.rs
+++ b/datafusion/core/src/datasource/file_format/options.rs
@@ -23,6 +23,7 @@ use arrow::datatypes::{DataType, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion_common::DataFusionError;
+use crate::datasource::file_format::arrow::{ArrowFormat, DEFAULT_ARROW_EXTENSION};
use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION;
use crate::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
use crate::datasource::file_format::file_type::FileCompressionType;
@@ -214,6 +215,52 @@ impl<'a> ParquetReadOptions<'a> {
}
}
+/// Options that control the reading of ARROW files.
+///
+/// Note this structure is supplied when a datasource is created and
+/// can not not vary from statement to statement. For settings that
+/// can vary statement to statement see
+/// [`ConfigOptions`](crate::config::ConfigOptions).
+#[derive(Clone)]
+pub struct ArrowReadOptions<'a> {
+ /// The data source schema.
+ pub schema: Option<&'a Schema>,
+
+ /// File extension; only files with this extension are selected for data input.
+ /// Defaults to `FileType::ARROW.get_ext().as_str()`.
+ pub file_extension: &'a str,
+
+ /// Partition Columns
+ pub table_partition_cols: Vec<(String, DataType)>,
+}
+
+impl<'a> Default for ArrowReadOptions<'a> {
+ fn default() -> Self {
+ Self {
+ schema: None,
+ file_extension: DEFAULT_ARROW_EXTENSION,
+ table_partition_cols: vec![],
+ }
+ }
+}
+
+impl<'a> ArrowReadOptions<'a> {
+ /// Specify table_partition_cols for partition pruning
+ pub fn table_partition_cols(
+ mut self,
+ table_partition_cols: Vec<(String, DataType)>,
+ ) -> Self {
+ self.table_partition_cols = table_partition_cols;
+ self
+ }
+
+ /// Specify schema to use for AVRO read
+ pub fn schema(mut self, schema: &'a Schema) -> Self {
+ self.schema = Some(schema);
+ self
+ }
+}
+
/// Options that control the reading of AVRO files.
///
/// Note this structure is supplied when a datasource is created and
@@ -484,3 +531,25 @@ impl ReadOptions<'_> for AvroReadOptions<'_> {
.await
}
}
+
+#[async_trait]
+impl ReadOptions<'_> for ArrowReadOptions<'_> {
+ fn to_listing_options(&self, config: &SessionConfig) -> ListingOptions {
+ let file_format = ArrowFormat::default();
+
+ ListingOptions::new(Arc::new(file_format))
+ .with_file_extension(self.file_extension)
+ .with_target_partitions(config.target_partitions())
+ .with_table_partition_cols(self.table_partition_cols.clone())
+ }
+
+ async fn get_resolved_schema(
+ &self,
+ config: &SessionConfig,
+ state: SessionState,
+ table_path: ListingTableUrl,
+ ) -> Result<SchemaRef> {
+ self._get_resolved_schema(config, state, table_path, self.schema, false)
+ .await
+ }
+}
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 94da84ce3b..8bb3818c91 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -35,8 +35,8 @@ use object_store::ObjectMeta;
use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
use crate::datasource::{
file_format::{
- avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat,
- FileFormat,
+ arrow::ArrowFormat, avro::AvroFormat, csv::CsvFormat, json::JsonFormat,
+ parquet::ParquetFormat, FileFormat,
},
get_statistics_with_limit,
listing::ListingTableUrl,
@@ -135,6 +135,7 @@ impl ListingTableConfig {
.map_err(|_| DataFusionError::Internal(err_msg))?;
let file_format: Arc<dyn FileFormat> = match file_type {
+ FileType::ARROW => Arc::new(ArrowFormat::default()),
FileType::AVRO => Arc::new(AvroFormat::default()),
FileType::CSV => Arc::new(
CsvFormat::default().with_file_compression_type(file_compression_type),
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs
index 44595e5122..1e2d4051d5 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -27,6 +27,7 @@ use datafusion_common::DataFusionError;
use datafusion_expr::CreateExternalTable;
use crate::datasource::datasource::TableProviderFactory;
+use crate::datasource::file_format::arrow::ArrowFormat;
use crate::datasource::file_format::avro::AvroFormat;
use crate::datasource::file_format::csv::CsvFormat;
use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
@@ -81,6 +82,7 @@ impl TableProviderFactory for ListingTableFactory {
FileType::JSON => Arc::new(
JsonFormat::default().with_file_compression_type(file_compression_type),
),
+ FileType::ARROW => Arc::new(ArrowFormat::default()),
};
let (provided_schema, table_partition_cols) = if cmd.schema.fields().is_empty() {
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 5d5195e2c4..ee31d9ead5 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -114,6 +114,7 @@ use datafusion_sql::planner::object_name_to_table_reference;
use uuid::Uuid;
// backwards compatibility
+use crate::execution::options::ArrowReadOptions;
use crate::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate;
pub use datafusion_execution::config::SessionConfig;
pub use datafusion_execution::TaskContext;
@@ -844,6 +845,20 @@ impl SessionContext {
self._read_type(table_paths, options).await
}
+ /// Creates a [`DataFrame`] for reading an Arrow data source.
+ ///
+ /// For more control such as reading multiple files, you can use
+ /// [`read_table`](Self::read_table) with a [`ListingTable`].
+ ///
+ /// For an example, see [`read_csv`](Self::read_csv)
+ pub async fn read_arrow<P: DataFilePaths>(
+ &self,
+ table_paths: P,
+ options: ArrowReadOptions<'_>,
+ ) -> Result<DataFrame> {
+ self._read_type(table_paths, options).await
+ }
+
/// Creates an empty DataFrame.
pub fn read_empty(&self) -> Result<DataFrame> {
Ok(DataFrame::new(
@@ -1034,6 +1049,27 @@ impl SessionContext {
Ok(())
}
+ /// Registers an Arrow file as a table that can be referenced from
+ /// SQL statements executed against this context.
+ pub async fn register_arrow(
+ &self,
+ name: &str,
+ table_path: &str,
+ options: ArrowReadOptions<'_>,
+ ) -> Result<()> {
+ let listing_options = options.to_listing_options(&self.copied_config());
+
+ self.register_listing_table(
+ name,
+ table_path,
+ listing_options,
+ options.schema.map(|s| Arc::new(s.to_owned())),
+ None,
+ )
+ .await?;
+ Ok(())
+ }
+
/// Registers a named catalog using a custom `CatalogProvider` so that
/// it can be referenced from SQL statements executed against this
/// context.
@@ -1360,6 +1396,7 @@ impl SessionState {
table_factories.insert("JSON".into(), Arc::new(ListingTableFactory::new()));
table_factories.insert("NDJSON".into(), Arc::new(ListingTableFactory::new()));
table_factories.insert("AVRO".into(), Arc::new(ListingTableFactory::new()));
+ table_factories.insert("ARROW".into(), Arc::new(ListingTableFactory::new()));
if config.create_default_catalog_and_schema() {
let default_catalog = MemoryCatalogProvider::new();
diff --git a/datafusion/core/src/physical_plan/file_format/arrow_file.rs b/datafusion/core/src/physical_plan/file_format/arrow_file.rs
new file mode 100644
index 0000000000..d229031d37
--- /dev/null
+++ b/datafusion/core/src/physical_plan/file_format/arrow_file.rs
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for reading Arrow files
+use crate::error::Result;
+use crate::physical_plan::file_format::{
+ FileMeta, FileOpenFuture, FileOpener, FileScanConfig,
+};
+use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
+use crate::physical_plan::{
+ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+};
+use arrow_schema::SchemaRef;
+use datafusion_common::Statistics;
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::PhysicalSortExpr;
+use futures::StreamExt;
+use object_store::{GetResult, ObjectStore};
+use std::any::Any;
+use std::sync::Arc;
+
+/// Execution plan for scanning Arrow data source
+#[derive(Debug, Clone)]
+#[allow(dead_code)]
+pub struct ArrowExec {
+ base_config: FileScanConfig,
+ projected_statistics: Statistics,
+ projected_schema: SchemaRef,
+ projected_output_ordering: Option<Vec<PhysicalSortExpr>>,
+ /// Execution metrics
+ metrics: ExecutionPlanMetricsSet,
+}
+
+impl ArrowExec {
+ /// Create a new Arrow reader execution plan provided base configurations
+ pub fn new(base_config: FileScanConfig) -> Self {
+ let (projected_schema, projected_statistics, projected_output_ordering) =
+ base_config.project();
+
+ Self {
+ base_config,
+ projected_schema,
+ projected_statistics,
+ projected_output_ordering,
+ metrics: ExecutionPlanMetricsSet::new(),
+ }
+ }
+ /// Ref to the base configs
+ pub fn base_config(&self) -> &FileScanConfig {
+ &self.base_config
+ }
+}
+
+impl ExecutionPlan for ArrowExec {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.projected_schema.clone()
+ }
+
+ fn output_partitioning(&self) -> Partitioning {
+ Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
+ }
+
+ fn unbounded_output(&self, _: &[bool]) -> Result<bool> {
+ Ok(self.base_config().infinite_source)
+ }
+
+ fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+ self.projected_output_ordering.as_deref()
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ Vec::new()
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ _: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Ok(self)
+ }
+
+ fn execute(
+ &self,
+ partition: usize,
+ context: Arc<TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ use super::file_stream::FileStream;
+ let object_store = context
+ .runtime_env()
+ .object_store(&self.base_config.object_store_url)?;
+
+ let opener = ArrowOpener {
+ object_store,
+ projection: self.base_config.projection.clone(),
+ };
+ let stream =
+ FileStream::new(&self.base_config, partition, opener, &self.metrics)?;
+ Ok(Box::pin(stream))
+ }
+
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics.clone_inner())
+ }
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "ArrowExec: {}", self.base_config)
+ }
+ }
+ }
+
+ fn statistics(&self) -> Statistics {
+ self.projected_statistics.clone()
+ }
+}
+
+pub struct ArrowOpener {
+ pub object_store: Arc<dyn ObjectStore>,
+ pub projection: Option<Vec<usize>>,
+}
+
+impl FileOpener for ArrowOpener {
+ fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
+ let object_store = self.object_store.clone();
+ let projection = self.projection.clone();
+ Ok(Box::pin(async move {
+ match object_store.get(file_meta.location()).await? {
+ GetResult::File(file, _) => {
+ let arrow_reader =
+ arrow::ipc::reader::FileReader::try_new(file, projection)?;
+ Ok(futures::stream::iter(arrow_reader).boxed())
+ }
+ r @ GetResult::Stream(_) => {
+ let bytes = r.bytes().await?;
+ let cursor = std::io::Cursor::new(bytes);
+ let arrow_reader =
+ arrow::ipc::reader::FileReader::try_new(cursor, projection)?;
+ Ok(futures::stream::iter(arrow_reader).boxed())
+ }
+ }
+ }))
+ }
+}
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs
index aa0f79ced7..1b235d62a3 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -17,6 +17,7 @@
//! Execution plans that read file formats
+mod arrow_file;
mod avro;
#[cfg(test)]
mod chunked_store;
@@ -35,6 +36,7 @@ use arrow::{
datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type},
record_batch::RecordBatch,
};
+pub use arrow_file::ArrowExec;
pub use avro::AvroExec;
use datafusion_physical_expr::PhysicalSortExpr;
pub use file_stream::{FileOpenFuture, FileOpener, FileStream};
diff --git a/datafusion/core/tests/data/example.arrow b/datafusion/core/tests/data/example.arrow
new file mode 100644
index 0000000000..5314d9eea1
Binary files /dev/null and b/datafusion/core/tests/data/example.arrow differ
diff --git a/datafusion/core/tests/sql/arrow_files.rs b/datafusion/core/tests/sql/arrow_files.rs
new file mode 100644
index 0000000000..e74294b312
--- /dev/null
+++ b/datafusion/core/tests/sql/arrow_files.rs
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+use datafusion::execution::options::ArrowReadOptions;
+
+use super::*;
+
+async fn register_arrow(ctx: &mut SessionContext) {
+ ctx.register_arrow(
+ "arrow_simple",
+ "tests/data/example.arrow",
+ ArrowReadOptions::default(),
+ )
+ .await
+ .unwrap();
+}
+
+#[tokio::test]
+async fn arrow_query() {
+ let mut ctx = SessionContext::new();
+ register_arrow(&mut ctx).await;
+ let sql = "SELECT * FROM arrow_simple";
+ let actual = execute_to_batches(&ctx, sql).await;
+ let expected = vec![
+ "+----+-----+-------+",
+ "| f0 | f1 | f2 |",
+ "+----+-----+-------+",
+ "| 1 | foo | true |",
+ "| 2 | bar | |",
+ "| 3 | baz | false |",
+ "| 4 | | true |",
+ "+----+-----+-------+",
+ ];
+
+ assert_batches_eq!(expected, &actual);
+}
+
+#[tokio::test]
+async fn arrow_explain() {
+ let mut ctx = SessionContext::new();
+ register_arrow(&mut ctx).await;
+ let sql = "EXPLAIN SELECT * FROM arrow_simple";
+ let actual = execute(&ctx, sql).await;
+ let actual = normalize_vec_for_explain(actual);
+ let expected = vec![
+ vec![
+ "logical_plan",
+ "TableScan: arrow_simple projection=[f0, f1, f2]",
+ ],
+ vec![
+ "physical_plan",
+ "ArrowExec: file_groups={1 group: [[WORKING_DIR/tests/data/example.arrow]]}, projection=[f0, f1, f2]\n",
+ ],
+ ];
+
+ assert_eq!(expected, actual);
+}
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index be7b66adb5..3c5845fe22 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -79,6 +79,7 @@ macro_rules! test_expression {
}
pub mod aggregates;
+pub mod arrow_files;
#[cfg(feature = "avro")]
pub mod avro;
pub mod create_drop;
diff --git a/datafusion/core/tests/sqllogictests/test_files/ddl.slt b/datafusion/core/tests/sqllogictests/test_files/ddl.slt
index afb009f89f..006641cac5 100644
--- a/datafusion/core/tests/sqllogictests/test_files/ddl.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/ddl.slt
@@ -310,6 +310,18 @@ DROP TABLE my_table;
statement ok
DROP TABLE aggregate_simple
+# Arrow format
+statement ok
+CREATE external table arrow_simple STORED as ARROW LOCATION 'tests/data/example.arrow';
+
+query ITB rowsort
+SELECT * FROM arrow_simple order by f1 LIMIT 1
+----
+2 bar NULL
+
+statement ok
+DROP TABLE arrow_simple
+
# create_table_with_schema_as_select_mismatch
statement error table 'datafusion.public.aggregate_simple' not found
CREATE TABLE my_table(c1 float, c2 double, c3 boolean, c4 varchar) AS SELECT * FROM aggregate_simple;