You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/11/02 13:48:43 UTC
(arrow-datafusion) branch main updated: Read only enough bytes to infer Arrow IPC file schema via stream (#7962)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 436a4fa348 Read only enough bytes to infer Arrow IPC file schema via stream (#7962)
436a4fa348 is described below
commit 436a4fa348f1fcdf47bb2624deb24f8b5266df8c
Author: Jeffrey <22...@users.noreply.github.com>
AuthorDate: Fri Nov 3 00:48:36 2023 +1100
Read only enough bytes to infer Arrow IPC file schema via stream (#7962)
* Read only enough bytes to infer Arrow IPC file schema via stream
* Error checking for collect bytes func
* Update datafusion/core/src/datasource/file_format/arrow.rs
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---------
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
.../core/src/datasource/file_format/arrow.rs | 197 +++++++++++++++++++--
1 file changed, 186 insertions(+), 11 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs
index 16ae4411d1..2777805078 100644
--- a/datafusion/core/src/datasource/file_format/arrow.rs
+++ b/datafusion/core/src/datasource/file_format/arrow.rs
@@ -20,7 +20,7 @@
//! Works with files following the [Arrow IPC format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format)
use std::any::Any;
-use std::io::{Read, Seek};
+use std::borrow::Cow;
use std::sync::Arc;
use crate::datasource::file_format::FileFormat;
@@ -29,13 +29,18 @@ use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::ExecutionPlan;
+use arrow::ipc::convert::fb_to_schema;
use arrow::ipc::reader::FileReader;
-use arrow_schema::{Schema, SchemaRef};
+use arrow::ipc::root_as_message;
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use bytes::Bytes;
use datafusion_common::{FileType, Statistics};
use datafusion_physical_expr::PhysicalExpr;
use async_trait::async_trait;
+use futures::stream::BoxStream;
+use futures::StreamExt;
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
/// Arrow `FileFormat` implementation.
@@ -59,13 +64,11 @@ impl FileFormat for ArrowFormat {
let r = store.as_ref().get(&object.location).await?;
let schema = match r.payload {
GetResultPayload::File(mut file, _) => {
- read_arrow_schema_from_reader(&mut file)?
+ let reader = FileReader::try_new(&mut file, None)?;
+ reader.schema()
}
- GetResultPayload::Stream(_) => {
- // TODO: Fetching entire file to get schema is potentially wasteful
- let data = r.bytes().await?;
- let mut cursor = std::io::Cursor::new(&data);
- read_arrow_schema_from_reader(&mut cursor)?
+ GetResultPayload::Stream(stream) => {
+ infer_schema_from_file_stream(stream).await?
}
};
schemas.push(schema.as_ref().clone());
@@ -99,7 +102,179 @@ impl FileFormat for ArrowFormat {
}
}
-fn read_arrow_schema_from_reader<R: Read + Seek>(reader: R) -> Result<SchemaRef> {
- let reader = FileReader::try_new(reader, None)?;
- Ok(reader.schema())
+const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
+const CONTINUATION_MARKER: [u8; 4] = [0xff; 4];
+
+/// Custom implementation of inferring schema. Should eventually be moved upstream to arrow-rs.
+/// See https://github.com/apache/arrow-rs/issues/5021
+async fn infer_schema_from_file_stream(
+ mut stream: BoxStream<'static, object_store::Result<Bytes>>,
+) -> Result<SchemaRef> {
+ // Expected format:
+ // <magic number "ARROW1"> - 6 bytes
+ // <empty padding bytes [to 8 byte boundary]> - 2 bytes
+ // <continutation: 0xFFFFFFFF> - 4 bytes, not present below v0.15.0
+ // <metadata_size: int32> - 4 bytes
+ // <metadata_flatbuffer: bytes>
+ // <rest of file bytes>
+
+ // So in first read we need at least all known sized sections,
+ // which is 6 + 2 + 4 + 4 = 16 bytes.
+ let bytes = collect_at_least_n_bytes(&mut stream, 16, None).await?;
+
+ // Files should start with these magic bytes
+ if bytes[0..6] != ARROW_MAGIC {
+ return Err(ArrowError::ParseError(
+ "Arrow file does not contian correct header".to_string(),
+ ))?;
+ }
+
+ // Since continuation marker bytes added in later versions
+ let (meta_len, rest_of_bytes_start_index) = if bytes[8..12] == CONTINUATION_MARKER {
+ (&bytes[12..16], 16)
+ } else {
+ (&bytes[8..12], 12)
+ };
+
+ let meta_len = [meta_len[0], meta_len[1], meta_len[2], meta_len[3]];
+ let meta_len = i32::from_le_bytes(meta_len);
+
+ // Read bytes for Schema message
+ let block_data = if bytes[rest_of_bytes_start_index..].len() < meta_len as usize {
+ // Need to read more bytes to decode Message
+ let mut block_data = Vec::with_capacity(meta_len as usize);
+ // In case we had some spare bytes in our initial read chunk
+ block_data.extend_from_slice(&bytes[rest_of_bytes_start_index..]);
+ let size_to_read = meta_len as usize - block_data.len();
+ let block_data =
+ collect_at_least_n_bytes(&mut stream, size_to_read, Some(block_data)).await?;
+ Cow::Owned(block_data)
+ } else {
+ // Already have the bytes we need
+ let end_index = meta_len as usize + rest_of_bytes_start_index;
+ let block_data = &bytes[rest_of_bytes_start_index..end_index];
+ Cow::Borrowed(block_data)
+ };
+
+ // Decode Schema message
+ let message = root_as_message(&block_data).map_err(|err| {
+ ArrowError::ParseError(format!("Unable to read IPC message as metadata: {err:?}"))
+ })?;
+ let ipc_schema = message.header_as_schema().ok_or_else(|| {
+ ArrowError::IpcError("Unable to read IPC message as schema".to_string())
+ })?;
+ let schema = fb_to_schema(ipc_schema);
+
+ Ok(Arc::new(schema))
+}
+
+async fn collect_at_least_n_bytes(
+ stream: &mut BoxStream<'static, object_store::Result<Bytes>>,
+ n: usize,
+ extend_from: Option<Vec<u8>>,
+) -> Result<Vec<u8>> {
+ let mut buf = extend_from.unwrap_or_else(|| Vec::with_capacity(n));
+ // If extending existing buffer then ensure we read n additional bytes
+ let n = n + buf.len();
+ while let Some(bytes) = stream.next().await.transpose()? {
+ buf.extend_from_slice(&bytes);
+ if buf.len() >= n {
+ break;
+ }
+ }
+ if buf.len() < n {
+ return Err(ArrowError::ParseError(
+ "Unexpected end of byte stream for Arrow IPC file".to_string(),
+ ))?;
+ }
+ Ok(buf)
+}
+
+#[cfg(test)]
+mod tests {
+ use chrono::DateTime;
+ use object_store::{chunked::ChunkedStore, memory::InMemory, path::Path};
+
+ use crate::execution::context::SessionContext;
+
+ use super::*;
+
+ #[tokio::test]
+ async fn test_infer_schema_stream() -> Result<()> {
+ let mut bytes = std::fs::read("tests/data/example.arrow")?;
+ bytes.truncate(bytes.len() - 20); // mangle end to show we don't need to read whole file
+ let location = Path::parse("example.arrow")?;
+ let in_memory_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
+ in_memory_store.put(&location, bytes.into()).await?;
+
+ let session_ctx = SessionContext::new();
+ let state = session_ctx.state();
+ let object_meta = ObjectMeta {
+ location,
+ last_modified: DateTime::default(),
+ size: usize::MAX,
+ e_tag: None,
+ };
+
+ let arrow_format = ArrowFormat {};
+ let expected = vec!["f0: Int64", "f1: Utf8", "f2: Boolean"];
+
+ // Test chunk sizes where too small so we keep having to read more bytes
+ // And when large enough that first read contains all we need
+ for chunk_size in [7, 3000] {
+ let store = Arc::new(ChunkedStore::new(in_memory_store.clone(), chunk_size));
+ let inferred_schema = arrow_format
+ .infer_schema(
+ &state,
+ &(store.clone() as Arc<dyn ObjectStore>),
+ &[object_meta.clone()],
+ )
+ .await?;
+ let actual_fields = inferred_schema
+ .fields()
+ .iter()
+ .map(|f| format!("{}: {:?}", f.name(), f.data_type()))
+ .collect::<Vec<_>>();
+ assert_eq!(expected, actual_fields);
+ }
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_infer_schema_short_stream() -> Result<()> {
+ let mut bytes = std::fs::read("tests/data/example.arrow")?;
+ bytes.truncate(20); // should cause error that file shorter than expected
+ let location = Path::parse("example.arrow")?;
+ let in_memory_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
+ in_memory_store.put(&location, bytes.into()).await?;
+
+ let session_ctx = SessionContext::new();
+ let state = session_ctx.state();
+ let object_meta = ObjectMeta {
+ location,
+ last_modified: DateTime::default(),
+ size: usize::MAX,
+ e_tag: None,
+ };
+
+ let arrow_format = ArrowFormat {};
+
+ let store = Arc::new(ChunkedStore::new(in_memory_store.clone(), 7));
+ let err = arrow_format
+ .infer_schema(
+ &state,
+ &(store.clone() as Arc<dyn ObjectStore>),
+ &[object_meta.clone()],
+ )
+ .await;
+
+ assert!(err.is_err());
+ assert_eq!(
+ "Arrow error: Parser error: Unexpected end of byte stream for Arrow IPC file",
+ err.unwrap_err().to_string()
+ );
+
+ Ok(())
+ }
}