You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/11/02 13:48:43 UTC

(arrow-datafusion) branch main updated: Read only enough bytes to infer Arrow IPC file schema via stream (#7962)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 436a4fa348 Read only enough bytes to infer Arrow IPC file schema via stream (#7962)
436a4fa348 is described below

commit 436a4fa348f1fcdf47bb2624deb24f8b5266df8c
Author: Jeffrey <22...@users.noreply.github.com>
AuthorDate: Fri Nov 3 00:48:36 2023 +1100

    Read only enough bytes to infer Arrow IPC file schema via stream (#7962)
    
    * Read only enough bytes to infer Arrow IPC file schema via stream
    
    * Error checking for collect bytes func
    
    * Update datafusion/core/src/datasource/file_format/arrow.rs
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    ---------
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 .../core/src/datasource/file_format/arrow.rs       | 197 +++++++++++++++++++--
 1 file changed, 186 insertions(+), 11 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs
index 16ae4411d1..2777805078 100644
--- a/datafusion/core/src/datasource/file_format/arrow.rs
+++ b/datafusion/core/src/datasource/file_format/arrow.rs
@@ -20,7 +20,7 @@
 //! Works with files following the [Arrow IPC format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format)
 
 use std::any::Any;
-use std::io::{Read, Seek};
+use std::borrow::Cow;
 use std::sync::Arc;
 
 use crate::datasource::file_format::FileFormat;
@@ -29,13 +29,18 @@ use crate::error::Result;
 use crate::execution::context::SessionState;
 use crate::physical_plan::ExecutionPlan;
 
+use arrow::ipc::convert::fb_to_schema;
 use arrow::ipc::reader::FileReader;
-use arrow_schema::{Schema, SchemaRef};
+use arrow::ipc::root_as_message;
+use arrow_schema::{ArrowError, Schema, SchemaRef};
 
+use bytes::Bytes;
 use datafusion_common::{FileType, Statistics};
 use datafusion_physical_expr::PhysicalExpr;
 
 use async_trait::async_trait;
+use futures::stream::BoxStream;
+use futures::StreamExt;
 use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
 
 /// Arrow `FileFormat` implementation.
@@ -59,13 +64,11 @@ impl FileFormat for ArrowFormat {
             let r = store.as_ref().get(&object.location).await?;
             let schema = match r.payload {
                 GetResultPayload::File(mut file, _) => {
-                    read_arrow_schema_from_reader(&mut file)?
+                    let reader = FileReader::try_new(&mut file, None)?;
+                    reader.schema()
                 }
-                GetResultPayload::Stream(_) => {
-                    // TODO: Fetching entire file to get schema is potentially wasteful
-                    let data = r.bytes().await?;
-                    let mut cursor = std::io::Cursor::new(&data);
-                    read_arrow_schema_from_reader(&mut cursor)?
+                GetResultPayload::Stream(stream) => {
+                    infer_schema_from_file_stream(stream).await?
                 }
             };
             schemas.push(schema.as_ref().clone());
@@ -99,7 +102,179 @@ impl FileFormat for ArrowFormat {
     }
 }
 
-fn read_arrow_schema_from_reader<R: Read + Seek>(reader: R) -> Result<SchemaRef> {
-    let reader = FileReader::try_new(reader, None)?;
-    Ok(reader.schema())
+const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
+const CONTINUATION_MARKER: [u8; 4] = [0xff; 4];
+
+/// Custom implementation of inferring schema. Should eventually be moved upstream to arrow-rs. 
+/// See https://github.com/apache/arrow-rs/issues/5021
+async fn infer_schema_from_file_stream(
+    mut stream: BoxStream<'static, object_store::Result<Bytes>>,
+) -> Result<SchemaRef> {
+    // Expected format:
+    // <magic number "ARROW1"> - 6 bytes
+    // <empty padding bytes [to 8 byte boundary]> - 2 bytes
+    // <continutation: 0xFFFFFFFF> - 4 bytes, not present below v0.15.0
+    // <metadata_size: int32> - 4 bytes
+    // <metadata_flatbuffer: bytes>
+    // <rest of file bytes>
+
+    // So in first read we need at least all known sized sections,
+    // which is 6 + 2 + 4 + 4 = 16 bytes.
+    let bytes = collect_at_least_n_bytes(&mut stream, 16, None).await?;
+
+    // Files should start with these magic bytes
+    if bytes[0..6] != ARROW_MAGIC {
+        return Err(ArrowError::ParseError(
+            "Arrow file does not contian correct header".to_string(),
+        ))?;
+    }
+
+    // Since continuation marker bytes added in later versions
+    let (meta_len, rest_of_bytes_start_index) = if bytes[8..12] == CONTINUATION_MARKER {
+        (&bytes[12..16], 16)
+    } else {
+        (&bytes[8..12], 12)
+    };
+
+    let meta_len = [meta_len[0], meta_len[1], meta_len[2], meta_len[3]];
+    let meta_len = i32::from_le_bytes(meta_len);
+
+    // Read bytes for Schema message
+    let block_data = if bytes[rest_of_bytes_start_index..].len() < meta_len as usize {
+        // Need to read more bytes to decode Message
+        let mut block_data = Vec::with_capacity(meta_len as usize);
+        // In case we had some spare bytes in our initial read chunk
+        block_data.extend_from_slice(&bytes[rest_of_bytes_start_index..]);
+        let size_to_read = meta_len as usize - block_data.len();
+        let block_data =
+            collect_at_least_n_bytes(&mut stream, size_to_read, Some(block_data)).await?;
+        Cow::Owned(block_data)
+    } else {
+        // Already have the bytes we need
+        let end_index = meta_len as usize + rest_of_bytes_start_index;
+        let block_data = &bytes[rest_of_bytes_start_index..end_index];
+        Cow::Borrowed(block_data)
+    };
+
+    // Decode Schema message
+    let message = root_as_message(&block_data).map_err(|err| {
+        ArrowError::ParseError(format!("Unable to read IPC message as metadata: {err:?}"))
+    })?;
+    let ipc_schema = message.header_as_schema().ok_or_else(|| {
+        ArrowError::IpcError("Unable to read IPC message as schema".to_string())
+    })?;
+    let schema = fb_to_schema(ipc_schema);
+
+    Ok(Arc::new(schema))
+}
+
+async fn collect_at_least_n_bytes(
+    stream: &mut BoxStream<'static, object_store::Result<Bytes>>,
+    n: usize,
+    extend_from: Option<Vec<u8>>,
+) -> Result<Vec<u8>> {
+    let mut buf = extend_from.unwrap_or_else(|| Vec::with_capacity(n));
+    // If extending existing buffer then ensure we read n additional bytes
+    let n = n + buf.len();
+    while let Some(bytes) = stream.next().await.transpose()? {
+        buf.extend_from_slice(&bytes);
+        if buf.len() >= n {
+            break;
+        }
+    }
+    if buf.len() < n {
+        return Err(ArrowError::ParseError(
+            "Unexpected end of byte stream for Arrow IPC file".to_string(),
+        ))?;
+    }
+    Ok(buf)
+}
+
+#[cfg(test)]
+mod tests {
+    use chrono::DateTime;
+    use object_store::{chunked::ChunkedStore, memory::InMemory, path::Path};
+
+    use crate::execution::context::SessionContext;
+
+    use super::*;
+
+    #[tokio::test]
+    async fn test_infer_schema_stream() -> Result<()> {
+        let mut bytes = std::fs::read("tests/data/example.arrow")?;
+        bytes.truncate(bytes.len() - 20); // mangle end to show we don't need to read whole file
+        let location = Path::parse("example.arrow")?;
+        let in_memory_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
+        in_memory_store.put(&location, bytes.into()).await?;
+
+        let session_ctx = SessionContext::new();
+        let state = session_ctx.state();
+        let object_meta = ObjectMeta {
+            location,
+            last_modified: DateTime::default(),
+            size: usize::MAX,
+            e_tag: None,
+        };
+
+        let arrow_format = ArrowFormat {};
+        let expected = vec!["f0: Int64", "f1: Utf8", "f2: Boolean"];
+
+        // Test chunk sizes where too small so we keep having to read more bytes
+        // And when large enough that first read contains all we need
+        for chunk_size in [7, 3000] {
+            let store = Arc::new(ChunkedStore::new(in_memory_store.clone(), chunk_size));
+            let inferred_schema = arrow_format
+                .infer_schema(
+                    &state,
+                    &(store.clone() as Arc<dyn ObjectStore>),
+                    &[object_meta.clone()],
+                )
+                .await?;
+            let actual_fields = inferred_schema
+                .fields()
+                .iter()
+                .map(|f| format!("{}: {:?}", f.name(), f.data_type()))
+                .collect::<Vec<_>>();
+            assert_eq!(expected, actual_fields);
+        }
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_infer_schema_short_stream() -> Result<()> {
+        let mut bytes = std::fs::read("tests/data/example.arrow")?;
+        bytes.truncate(20); // should cause error that file shorter than expected
+        let location = Path::parse("example.arrow")?;
+        let in_memory_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
+        in_memory_store.put(&location, bytes.into()).await?;
+
+        let session_ctx = SessionContext::new();
+        let state = session_ctx.state();
+        let object_meta = ObjectMeta {
+            location,
+            last_modified: DateTime::default(),
+            size: usize::MAX,
+            e_tag: None,
+        };
+
+        let arrow_format = ArrowFormat {};
+
+        let store = Arc::new(ChunkedStore::new(in_memory_store.clone(), 7));
+        let err = arrow_format
+            .infer_schema(
+                &state,
+                &(store.clone() as Arc<dyn ObjectStore>),
+                &[object_meta.clone()],
+            )
+            .await;
+
+        assert!(err.is_err());
+        assert_eq!(
+            "Arrow error: Parser error: Unexpected end of byte stream for Arrow IPC file",
+            err.unwrap_err().to_string()
+        );
+
+        Ok(())
+    }
 }