You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/16 21:36:54 UTC

[arrow-datafusion] branch main updated: Add support for reading Arrow files (#6337)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a47c42096 Add support for reading Arrow files (#6337)
8a47c42096 is described below

commit 8a47c42096311cf9b6191cfb9d96e2d9ba3a630d
Author: Jon Mease <jo...@gmail.com>
AuthorDate: Tue May 16 17:36:48 2023 -0400

    Add support for reading Arrow files (#6337)
    
    * Add support for reading Arrow files
    
    * Remove commented test
    
    * Fix warnings
    
    * license
    
    * fmt
    
    * Module docstring
    
    * clippy
    
    * Support registering arrow files in SQL
    
    * Add note that arrow files follow the arrow IPC format
    
    * sqllogic test to register and query arrow table
---
 .../core/src/datasource/file_format/arrow.rs       |  96 ++++++++++++
 .../core/src/datasource/file_format/file_type.rs   |   7 +-
 datafusion/core/src/datasource/file_format/mod.rs  |   1 +
 .../core/src/datasource/file_format/options.rs     |  69 +++++++++
 datafusion/core/src/datasource/listing/table.rs    |   5 +-
 .../core/src/datasource/listing_table_factory.rs   |   2 +
 datafusion/core/src/execution/context.rs           |  37 +++++
 .../src/physical_plan/file_format/arrow_file.rs    | 166 +++++++++++++++++++++
 .../core/src/physical_plan/file_format/mod.rs      |   2 +
 datafusion/core/tests/data/example.arrow           | Bin 0 -> 1842 bytes
 datafusion/core/tests/sql/arrow_files.rs           |  70 +++++++++
 datafusion/core/tests/sql/mod.rs                   |   1 +
 .../core/tests/sqllogictests/test_files/ddl.slt    |  12 ++
 13 files changed, 465 insertions(+), 3 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs
new file mode 100644
index 0000000000..2a27468f45
--- /dev/null
+++ b/datafusion/core/src/datasource/file_format/arrow.rs
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Apache Arrow format abstractions
+//!
+//! Works with files following the [Arrow IPC format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format)
+
+use crate::datasource::file_format::FileFormat;
+use crate::error::Result;
+use crate::execution::context::SessionState;
+use crate::physical_plan::file_format::{ArrowExec, FileScanConfig};
+use crate::physical_plan::ExecutionPlan;
+use arrow::ipc::reader::FileReader;
+use arrow_schema::{Schema, SchemaRef};
+use async_trait::async_trait;
+use datafusion_common::Statistics;
+use datafusion_physical_expr::PhysicalExpr;
+use object_store::{GetResult, ObjectMeta, ObjectStore};
+use std::any::Any;
+use std::io::{Read, Seek};
+use std::sync::Arc;
+
+/// The default file extension of arrow files
+pub const DEFAULT_ARROW_EXTENSION: &str = ".arrow";
+/// Arrow `FileFormat` implementation.
+#[derive(Default, Debug)]
+pub struct ArrowFormat;
+
+#[async_trait]
+impl FileFormat for ArrowFormat {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    async fn infer_schema(
+        &self,
+        _state: &SessionState,
+        store: &Arc<dyn ObjectStore>,
+        objects: &[ObjectMeta],
+    ) -> Result<SchemaRef> {
+        let mut schemas = vec![];
+        for object in objects {
+            let schema = match store.get(&object.location).await? {
+                GetResult::File(mut file, _) => read_arrow_schema_from_reader(&mut file)?,
+                r @ GetResult::Stream(_) => {
+                    // TODO: Fetching entire file to get schema is potentially wasteful
+                    let data = r.bytes().await?;
+                    let mut cursor = std::io::Cursor::new(&data);
+                    read_arrow_schema_from_reader(&mut cursor)?
+                }
+            };
+            schemas.push(schema.as_ref().clone());
+        }
+        let merged_schema = Schema::try_merge(schemas)?;
+        Ok(Arc::new(merged_schema))
+    }
+
+    async fn infer_stats(
+        &self,
+        _state: &SessionState,
+        _store: &Arc<dyn ObjectStore>,
+        _table_schema: SchemaRef,
+        _object: &ObjectMeta,
+    ) -> Result<Statistics> {
+        Ok(Statistics::default())
+    }
+
+    async fn create_physical_plan(
+        &self,
+        _state: &SessionState,
+        conf: FileScanConfig,
+        _filters: Option<&Arc<dyn PhysicalExpr>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let exec = ArrowExec::new(conf);
+        Ok(Arc::new(exec))
+    }
+}
+
+fn read_arrow_schema_from_reader<R: Read + Seek>(reader: R) -> Result<SchemaRef> {
+    let reader = FileReader::try_new(reader, None)?;
+    Ok(reader.schema())
+}
diff --git a/datafusion/core/src/datasource/file_format/file_type.rs b/datafusion/core/src/datasource/file_format/file_type.rs
index e72b1e579e..58877c7a82 100644
--- a/datafusion/core/src/datasource/file_format/file_type.rs
+++ b/datafusion/core/src/datasource/file_format/file_type.rs
@@ -38,6 +38,7 @@ use datafusion_common::parsers::CompressionTypeVariant;
 #[cfg(feature = "compression")]
 use flate2::read::MultiGzDecoder;
 
+use crate::datasource::file_format::arrow::DEFAULT_ARROW_EXTENSION;
 use futures::stream::BoxStream;
 use futures::StreamExt;
 #[cfg(feature = "compression")]
@@ -211,6 +212,8 @@ impl FileCompressionType {
 /// Readable file type
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub enum FileType {
+    /// Apache Arrow file
+    ARROW,
     /// Apache Avro file
     AVRO,
     /// Apache Parquet file
@@ -224,6 +227,7 @@ pub enum FileType {
 impl GetExt for FileType {
     fn get_ext(&self) -> String {
         match self {
+            FileType::ARROW => DEFAULT_ARROW_EXTENSION.to_owned(),
             FileType::AVRO => DEFAULT_AVRO_EXTENSION.to_owned(),
             FileType::PARQUET => DEFAULT_PARQUET_EXTENSION.to_owned(),
             FileType::CSV => DEFAULT_CSV_EXTENSION.to_owned(),
@@ -238,6 +242,7 @@ impl FromStr for FileType {
     fn from_str(s: &str) -> Result<Self> {
         let s = s.to_uppercase();
         match s.as_str() {
+            "ARROW" => Ok(FileType::ARROW),
             "AVRO" => Ok(FileType::AVRO),
             "PARQUET" => Ok(FileType::PARQUET),
             "CSV" => Ok(FileType::CSV),
@@ -256,7 +261,7 @@ impl FileType {
 
         match self {
             FileType::JSON | FileType::CSV => Ok(format!("{}{}", ext, c.get_ext())),
-            FileType::PARQUET | FileType::AVRO => match c.variant {
+            FileType::PARQUET | FileType::AVRO | FileType::ARROW => match c.variant {
                 UNCOMPRESSED => Ok(ext),
                 _ => Err(DataFusionError::Internal(
                     "FileCompressionType can be specified for CSV/JSON FileType.".into(),
diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs
index 52da7285e3..28f798ade4 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -20,6 +20,7 @@
 /// Default max records to scan to infer the schema
 pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
 
+pub mod arrow;
 pub mod avro;
 pub mod csv;
 pub mod file_type;
diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs
index e51edf829e..4024da3313 100644
--- a/datafusion/core/src/datasource/file_format/options.rs
+++ b/datafusion/core/src/datasource/file_format/options.rs
@@ -23,6 +23,7 @@ use arrow::datatypes::{DataType, Schema, SchemaRef};
 use async_trait::async_trait;
 use datafusion_common::DataFusionError;
 
+use crate::datasource::file_format::arrow::{ArrowFormat, DEFAULT_ARROW_EXTENSION};
 use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION;
 use crate::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
 use crate::datasource::file_format::file_type::FileCompressionType;
@@ -214,6 +215,52 @@ impl<'a> ParquetReadOptions<'a> {
     }
 }
 
+/// Options that control the reading of ARROW files.
+///
+/// Note this structure is supplied when a datasource is created and
+/// can not not vary from statement to statement. For settings that
+/// can vary statement to statement see
+/// [`ConfigOptions`](crate::config::ConfigOptions).
+#[derive(Clone)]
+pub struct ArrowReadOptions<'a> {
+    /// The data source schema.
+    pub schema: Option<&'a Schema>,
+
+    /// File extension; only files with this extension are selected for data input.
+    /// Defaults to `FileType::ARROW.get_ext().as_str()`.
+    pub file_extension: &'a str,
+
+    /// Partition Columns
+    pub table_partition_cols: Vec<(String, DataType)>,
+}
+
+impl<'a> Default for ArrowReadOptions<'a> {
+    fn default() -> Self {
+        Self {
+            schema: None,
+            file_extension: DEFAULT_ARROW_EXTENSION,
+            table_partition_cols: vec![],
+        }
+    }
+}
+
+impl<'a> ArrowReadOptions<'a> {
+    /// Specify table_partition_cols for partition pruning
+    pub fn table_partition_cols(
+        mut self,
+        table_partition_cols: Vec<(String, DataType)>,
+    ) -> Self {
+        self.table_partition_cols = table_partition_cols;
+        self
+    }
+
+    /// Specify schema to use for AVRO read
+    pub fn schema(mut self, schema: &'a Schema) -> Self {
+        self.schema = Some(schema);
+        self
+    }
+}
+
 /// Options that control the reading of AVRO files.
 ///
 /// Note this structure is supplied when a datasource is created and
@@ -484,3 +531,25 @@ impl ReadOptions<'_> for AvroReadOptions<'_> {
             .await
     }
 }
+
+#[async_trait]
+impl ReadOptions<'_> for ArrowReadOptions<'_> {
+    fn to_listing_options(&self, config: &SessionConfig) -> ListingOptions {
+        let file_format = ArrowFormat::default();
+
+        ListingOptions::new(Arc::new(file_format))
+            .with_file_extension(self.file_extension)
+            .with_target_partitions(config.target_partitions())
+            .with_table_partition_cols(self.table_partition_cols.clone())
+    }
+
+    async fn get_resolved_schema(
+        &self,
+        config: &SessionConfig,
+        state: SessionState,
+        table_path: ListingTableUrl,
+    ) -> Result<SchemaRef> {
+        self._get_resolved_schema(config, state, table_path, self.schema, false)
+            .await
+    }
+}
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 94da84ce3b..8bb3818c91 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -35,8 +35,8 @@ use object_store::ObjectMeta;
 use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
 use crate::datasource::{
     file_format::{
-        avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat,
-        FileFormat,
+        arrow::ArrowFormat, avro::AvroFormat, csv::CsvFormat, json::JsonFormat,
+        parquet::ParquetFormat, FileFormat,
     },
     get_statistics_with_limit,
     listing::ListingTableUrl,
@@ -135,6 +135,7 @@ impl ListingTableConfig {
             .map_err(|_| DataFusionError::Internal(err_msg))?;
 
         let file_format: Arc<dyn FileFormat> = match file_type {
+            FileType::ARROW => Arc::new(ArrowFormat::default()),
             FileType::AVRO => Arc::new(AvroFormat::default()),
             FileType::CSV => Arc::new(
                 CsvFormat::default().with_file_compression_type(file_compression_type),
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs
index 44595e5122..1e2d4051d5 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -27,6 +27,7 @@ use datafusion_common::DataFusionError;
 use datafusion_expr::CreateExternalTable;
 
 use crate::datasource::datasource::TableProviderFactory;
+use crate::datasource::file_format::arrow::ArrowFormat;
 use crate::datasource::file_format::avro::AvroFormat;
 use crate::datasource::file_format::csv::CsvFormat;
 use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
@@ -81,6 +82,7 @@ impl TableProviderFactory for ListingTableFactory {
             FileType::JSON => Arc::new(
                 JsonFormat::default().with_file_compression_type(file_compression_type),
             ),
+            FileType::ARROW => Arc::new(ArrowFormat::default()),
         };
 
         let (provided_schema, table_partition_cols) = if cmd.schema.fields().is_empty() {
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 5d5195e2c4..ee31d9ead5 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -114,6 +114,7 @@ use datafusion_sql::planner::object_name_to_table_reference;
 use uuid::Uuid;
 
 // backwards compatibility
+use crate::execution::options::ArrowReadOptions;
 use crate::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate;
 pub use datafusion_execution::config::SessionConfig;
 pub use datafusion_execution::TaskContext;
@@ -844,6 +845,20 @@ impl SessionContext {
         self._read_type(table_paths, options).await
     }
 
+    /// Creates a [`DataFrame`] for reading an Arrow data source.
+    ///
+    /// For more control such as reading multiple files, you can use
+    /// [`read_table`](Self::read_table) with a [`ListingTable`].
+    ///
+    /// For an example, see [`read_csv`](Self::read_csv)
+    pub async fn read_arrow<P: DataFilePaths>(
+        &self,
+        table_paths: P,
+        options: ArrowReadOptions<'_>,
+    ) -> Result<DataFrame> {
+        self._read_type(table_paths, options).await
+    }
+
     /// Creates an empty DataFrame.
     pub fn read_empty(&self) -> Result<DataFrame> {
         Ok(DataFrame::new(
@@ -1034,6 +1049,27 @@ impl SessionContext {
         Ok(())
     }
 
+    /// Registers an Arrow file as a table that can be referenced from
+    /// SQL statements executed against this context.
+    pub async fn register_arrow(
+        &self,
+        name: &str,
+        table_path: &str,
+        options: ArrowReadOptions<'_>,
+    ) -> Result<()> {
+        let listing_options = options.to_listing_options(&self.copied_config());
+
+        self.register_listing_table(
+            name,
+            table_path,
+            listing_options,
+            options.schema.map(|s| Arc::new(s.to_owned())),
+            None,
+        )
+        .await?;
+        Ok(())
+    }
+
     /// Registers a named catalog using a custom `CatalogProvider` so that
     /// it can be referenced from SQL statements executed against this
     /// context.
@@ -1360,6 +1396,7 @@ impl SessionState {
         table_factories.insert("JSON".into(), Arc::new(ListingTableFactory::new()));
         table_factories.insert("NDJSON".into(), Arc::new(ListingTableFactory::new()));
         table_factories.insert("AVRO".into(), Arc::new(ListingTableFactory::new()));
+        table_factories.insert("ARROW".into(), Arc::new(ListingTableFactory::new()));
 
         if config.create_default_catalog_and_schema() {
             let default_catalog = MemoryCatalogProvider::new();
diff --git a/datafusion/core/src/physical_plan/file_format/arrow_file.rs b/datafusion/core/src/physical_plan/file_format/arrow_file.rs
new file mode 100644
index 0000000000..d229031d37
--- /dev/null
+++ b/datafusion/core/src/physical_plan/file_format/arrow_file.rs
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for reading Arrow files
+use crate::error::Result;
+use crate::physical_plan::file_format::{
+    FileMeta, FileOpenFuture, FileOpener, FileScanConfig,
+};
+use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
+use crate::physical_plan::{
+    DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+};
+use arrow_schema::SchemaRef;
+use datafusion_common::Statistics;
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::PhysicalSortExpr;
+use futures::StreamExt;
+use object_store::{GetResult, ObjectStore};
+use std::any::Any;
+use std::sync::Arc;
+
+/// Execution plan for scanning Arrow data source
+#[derive(Debug, Clone)]
+#[allow(dead_code)]
+pub struct ArrowExec {
+    base_config: FileScanConfig,
+    projected_statistics: Statistics,
+    projected_schema: SchemaRef,
+    projected_output_ordering: Option<Vec<PhysicalSortExpr>>,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
+}
+
+impl ArrowExec {
+    /// Create a new Arrow reader execution plan provided base configurations
+    pub fn new(base_config: FileScanConfig) -> Self {
+        let (projected_schema, projected_statistics, projected_output_ordering) =
+            base_config.project();
+
+        Self {
+            base_config,
+            projected_schema,
+            projected_statistics,
+            projected_output_ordering,
+            metrics: ExecutionPlanMetricsSet::new(),
+        }
+    }
+    /// Ref to the base configs
+    pub fn base_config(&self) -> &FileScanConfig {
+        &self.base_config
+    }
+}
+
+impl ExecutionPlan for ArrowExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.projected_schema.clone()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
+    }
+
+    fn unbounded_output(&self, _: &[bool]) -> Result<bool> {
+        Ok(self.base_config().infinite_source)
+    }
+
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        self.projected_output_ordering.as_deref()
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        Vec::new()
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        _: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(self)
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        use super::file_stream::FileStream;
+        let object_store = context
+            .runtime_env()
+            .object_store(&self.base_config.object_store_url)?;
+
+        let opener = ArrowOpener {
+            object_store,
+            projection: self.base_config.projection.clone(),
+        };
+        let stream =
+            FileStream::new(&self.base_config, partition, opener, &self.metrics)?;
+        Ok(Box::pin(stream))
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "ArrowExec: {}", self.base_config)
+            }
+        }
+    }
+
+    fn statistics(&self) -> Statistics {
+        self.projected_statistics.clone()
+    }
+}
+
+pub struct ArrowOpener {
+    pub object_store: Arc<dyn ObjectStore>,
+    pub projection: Option<Vec<usize>>,
+}
+
+impl FileOpener for ArrowOpener {
+    fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
+        let object_store = self.object_store.clone();
+        let projection = self.projection.clone();
+        Ok(Box::pin(async move {
+            match object_store.get(file_meta.location()).await? {
+                GetResult::File(file, _) => {
+                    let arrow_reader =
+                        arrow::ipc::reader::FileReader::try_new(file, projection)?;
+                    Ok(futures::stream::iter(arrow_reader).boxed())
+                }
+                r @ GetResult::Stream(_) => {
+                    let bytes = r.bytes().await?;
+                    let cursor = std::io::Cursor::new(bytes);
+                    let arrow_reader =
+                        arrow::ipc::reader::FileReader::try_new(cursor, projection)?;
+                    Ok(futures::stream::iter(arrow_reader).boxed())
+                }
+            }
+        }))
+    }
+}
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs
index aa0f79ced7..1b235d62a3 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -17,6 +17,7 @@
 
 //! Execution plans that read file formats
 
+mod arrow_file;
 mod avro;
 #[cfg(test)]
 mod chunked_store;
@@ -35,6 +36,7 @@ use arrow::{
     datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type},
     record_batch::RecordBatch,
 };
+pub use arrow_file::ArrowExec;
 pub use avro::AvroExec;
 use datafusion_physical_expr::PhysicalSortExpr;
 pub use file_stream::{FileOpenFuture, FileOpener, FileStream};
diff --git a/datafusion/core/tests/data/example.arrow b/datafusion/core/tests/data/example.arrow
new file mode 100644
index 0000000000..5314d9eea1
Binary files /dev/null and b/datafusion/core/tests/data/example.arrow differ
diff --git a/datafusion/core/tests/sql/arrow_files.rs b/datafusion/core/tests/sql/arrow_files.rs
new file mode 100644
index 0000000000..e74294b312
--- /dev/null
+++ b/datafusion/core/tests/sql/arrow_files.rs
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+use datafusion::execution::options::ArrowReadOptions;
+
+use super::*;
+
+async fn register_arrow(ctx: &mut SessionContext) {
+    ctx.register_arrow(
+        "arrow_simple",
+        "tests/data/example.arrow",
+        ArrowReadOptions::default(),
+    )
+    .await
+    .unwrap();
+}
+
+#[tokio::test]
+async fn arrow_query() {
+    let mut ctx = SessionContext::new();
+    register_arrow(&mut ctx).await;
+    let sql = "SELECT * FROM arrow_simple";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+----+-----+-------+",
+        "| f0 | f1  | f2    |",
+        "+----+-----+-------+",
+        "| 1  | foo | true  |",
+        "| 2  | bar |       |",
+        "| 3  | baz | false |",
+        "| 4  |     | true  |",
+        "+----+-----+-------+",
+    ];
+
+    assert_batches_eq!(expected, &actual);
+}
+
+#[tokio::test]
+async fn arrow_explain() {
+    let mut ctx = SessionContext::new();
+    register_arrow(&mut ctx).await;
+    let sql = "EXPLAIN SELECT * FROM arrow_simple";
+    let actual = execute(&ctx, sql).await;
+    let actual = normalize_vec_for_explain(actual);
+    let expected = vec![
+        vec![
+            "logical_plan",
+            "TableScan: arrow_simple projection=[f0, f1, f2]",
+        ],
+        vec![
+            "physical_plan",
+            "ArrowExec: file_groups={1 group: [[WORKING_DIR/tests/data/example.arrow]]}, projection=[f0, f1, f2]\n",
+        ],
+    ];
+
+    assert_eq!(expected, actual);
+}
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index be7b66adb5..3c5845fe22 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -79,6 +79,7 @@ macro_rules! test_expression {
 }
 
 pub mod aggregates;
+pub mod arrow_files;
 #[cfg(feature = "avro")]
 pub mod avro;
 pub mod create_drop;
diff --git a/datafusion/core/tests/sqllogictests/test_files/ddl.slt b/datafusion/core/tests/sqllogictests/test_files/ddl.slt
index afb009f89f..006641cac5 100644
--- a/datafusion/core/tests/sqllogictests/test_files/ddl.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/ddl.slt
@@ -310,6 +310,18 @@ DROP TABLE my_table;
 statement ok
 DROP TABLE aggregate_simple
 
+# Arrow format
+statement ok
+CREATE external table arrow_simple STORED as ARROW LOCATION 'tests/data/example.arrow';
+
+query ITB rowsort
+SELECT * FROM arrow_simple order by f1 LIMIT 1
+----
+2 bar NULL
+
+statement ok
+DROP TABLE arrow_simple
+
 # create_table_with_schema_as_select_mismatch
 statement error table 'datafusion.public.aggregate_simple' not found
 CREATE TABLE my_table(c1 float, c2 double, c3 boolean, c4 varchar) AS SELECT * FROM aggregate_simple;