You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/01/05 11:37:29 UTC
[arrow-rs] branch master updated: Consolidate arrow ipc tests and increase coverage (#3427)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 81abc1a94 Consolidate arrow ipc tests and increase coverage (#3427)
81abc1a94 is described below
commit 81abc1a942cd13a92231f4a828077ad60fdabe36
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Thu Jan 5 06:37:25 2023 -0500
Consolidate arrow ipc tests and increase coverage (#3427)
* Consolidate arrow ipc tests and increase coverage
* fix fmt
---
arrow-integration-testing/src/lib.rs | 7 +
arrow-integration-testing/tests/ipc_reader.rs | 278 +++++++-----------
arrow-integration-testing/tests/ipc_writer.rs | 389 ++++++++++++--------------
arrow/Cargo.toml | 4 -
arrow/tests/ipc.rs | 61 ----
5 files changed, 286 insertions(+), 453 deletions(-)
diff --git a/arrow-integration-testing/src/lib.rs b/arrow-integration-testing/src/lib.rs
index 2edd0ed28..b0c8b85af 100644
--- a/arrow-integration-testing/src/lib.rs
+++ b/arrow-integration-testing/src/lib.rs
@@ -77,6 +77,13 @@ pub fn read_json_file(json_name: &str) -> Result<ArrowFile> {
}
/// Read gzipped JSON test file
+///
+/// For example given the input:
+/// version = `0.17.1`
+/// path = `generated_union`
+///
+/// Returns the contents of
+/// `arrow-ipc-stream/integration/0.17.1/generated_union.json.gz`
pub fn read_gzip_json(version: &str, path: &str) -> ArrowJson {
use flate2::read::GzDecoder;
use std::io::Read;
diff --git a/arrow-integration-testing/tests/ipc_reader.rs b/arrow-integration-testing/tests/ipc_reader.rs
index 778d1ee77..e185634f0 100644
--- a/arrow-integration-testing/tests/ipc_reader.rs
+++ b/arrow-integration-testing/tests/ipc_reader.rs
@@ -15,16 +15,18 @@
// specific language governing permissions and limitations
// under the License.
+//! Tests for reading the content of [`FileReader`] and [`StreamReader`]
+//! in `testing/arrow-ipc-stream/integration/...`
+
use arrow::ipc::reader::{FileReader, StreamReader};
use arrow::util::test_util::arrow_test_data;
use arrow_integration_testing::read_gzip_json;
use std::fs::File;
#[test]
-fn read_generated_files_014() {
+fn read_0_1_4() {
let testdata = arrow_test_data();
let version = "0.14.1";
- // the test is repetitive, thus we can read all supported files at once
let paths = vec![
"generated_interval",
"generated_datetime",
@@ -37,51 +39,42 @@ fn read_generated_files_014() {
"generated_decimal",
];
paths.iter().for_each(|path| {
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
- testdata, version, path
- ))
- .unwrap();
-
- let mut reader = FileReader::try_new(file, None).unwrap();
+ verify_arrow_file(&testdata, version, path);
+ verify_arrow_stream(&testdata, version, path);
+ });
+}
- // read expected JSON output
- let arrow_json = read_gzip_json(version, path);
- assert!(arrow_json.equals_reader(&mut reader).unwrap());
+#[test]
+fn read_0_1_7() {
+ let testdata = arrow_test_data();
+ let version = "0.17.1";
+ let paths = vec!["generated_union"];
+ paths.iter().for_each(|path| {
+ verify_arrow_file(&testdata, version, path);
+ verify_arrow_stream(&testdata, version, path);
});
}
#[test]
#[should_panic(expected = "Big Endian is not supported for Decimal!")]
-fn read_decimal_be_file_should_panic() {
+fn read_1_0_0_bigendian_decimal_should_panic() {
let testdata = arrow_test_data();
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/1.0.0-bigendian/generated_decimal.arrow_file",
- testdata
- ))
- .unwrap();
- FileReader::try_new(file, None).unwrap();
+ verify_arrow_file(&testdata, "1.0.0-bigendian", "generated_decimal");
}
#[test]
#[should_panic(
expected = "Last offset 687865856 of Utf8 is larger than values length 41"
)]
-fn read_dictionary_be_not_implemented() {
+fn read_1_0_0_bigendian_dictionary_should_panic() {
// The offsets are not translated for big-endian files
// https://github.com/apache/arrow-rs/issues/859
let testdata = arrow_test_data();
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/1.0.0-bigendian/generated_dictionary.arrow_file",
- testdata
- ))
- .unwrap();
- FileReader::try_new(file, None).unwrap();
+ verify_arrow_file(&testdata, "1.0.0-bigendian", "generated_dictionary");
}
#[test]
-fn read_generated_be_files_should_work() {
- // complementary to the previous test
+fn read_1_0_0_bigendian() {
let testdata = arrow_test_data();
let paths = vec![
"generated_interval",
@@ -102,163 +95,119 @@ fn read_generated_be_files_should_work() {
.unwrap();
FileReader::try_new(file, None).unwrap();
+
+ // While the the reader doesn't error but the values are not read correctly
+ // so verifing the contents fails
+ //verify_arrow_file(&testdata, "1.0.0-bigendian", path);
});
}
#[test]
-fn projection_should_work() {
- // complementary to the previous test
+fn read_1_0_0_littleendian() {
let testdata = arrow_test_data();
+ let version = "1.0.0-littleendian";
let paths = vec![
- "generated_interval",
"generated_datetime",
+ "generated_custom_metadata",
+ "generated_decimal",
+ "generated_decimal256",
+ "generated_dictionary",
+ "generated_dictionary_unsigned",
+ "generated_duplicate_fieldnames",
+ "generated_extension",
+ "generated_interval",
"generated_map",
+ // fails with
+ // thread 'read_1_0_0_littleendian' panicked at 'assertion failed: `(left == right)`
+ //"generated_map_non_canonical",
"generated_nested",
- "generated_null_trivial",
+ "generated_nested_dictionary",
+ "generated_nested_large_offsets",
"generated_null",
+ "generated_null_trivial",
+ "generated_primitive",
+ "generated_primitive_large_offsets",
"generated_primitive_no_batches",
"generated_primitive_zerolength",
- "generated_primitive",
+ "generated_recursive_nested",
+ "generated_union",
];
paths.iter().for_each(|path| {
- // We must use littleendian files here.
- // The offsets are not translated for big-endian files
- // https://github.com/apache/arrow-rs/issues/859
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/1.0.0-littleendian/{}.arrow_file",
- testdata, path
- ))
- .unwrap();
-
- let reader = FileReader::try_new(file, Some(vec![0])).unwrap();
- let datatype_0 = reader.schema().fields()[0].data_type().clone();
- reader.for_each(|batch| {
- let batch = batch.unwrap();
- assert_eq!(batch.columns().len(), 1);
- assert_eq!(datatype_0, batch.schema().fields()[0].data_type().clone());
- });
+ verify_arrow_file(&testdata, version, path);
+ verify_arrow_stream(&testdata, version, path);
});
}
#[test]
-fn read_generated_streams_014() {
+fn read_2_0_0_compression() {
let testdata = arrow_test_data();
- let version = "0.14.1";
+ let version = "2.0.0-compression";
+
// the test is repetitive, thus we can read all supported files at once
- let paths = vec![
- "generated_interval",
- "generated_datetime",
- "generated_dictionary",
- "generated_map",
- "generated_nested",
- "generated_primitive_no_batches",
- "generated_primitive_zerolength",
- "generated_primitive",
- "generated_decimal",
- ];
+ let paths = vec!["generated_lz4", "generated_zstd"];
paths.iter().for_each(|path| {
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/{}/{}.stream",
- testdata, version, path
- ))
- .unwrap();
-
- let mut reader = StreamReader::try_new(file, None).unwrap();
-
- // read expected JSON output
- let arrow_json = read_gzip_json(version, path);
- assert!(arrow_json.equals_reader(&mut reader).unwrap());
- // the next batch must be empty
- assert!(reader.next().is_none());
- // the stream must indicate that it's finished
- assert!(reader.is_finished());
+ verify_arrow_file(&testdata, version, path);
+ verify_arrow_stream(&testdata, version, path);
});
}
-#[test]
-fn read_generated_files_100() {
- let testdata = arrow_test_data();
- let version = "1.0.0-littleendian";
- // the test is repetitive, thus we can read all supported files at once
- let paths = vec![
- "generated_interval",
- "generated_datetime",
- "generated_dictionary",
- "generated_map",
- // "generated_map_non_canonical",
- "generated_nested",
- "generated_null_trivial",
- "generated_null",
- "generated_primitive_no_batches",
- "generated_primitive_zerolength",
- "generated_primitive",
- ];
- paths.iter().for_each(|path| {
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
- testdata, version, path
- ))
- .unwrap();
-
+/// Verifies the arrow file format integration test
+///
+/// Input file:
+/// `arrow-ipc-stream/integration/<version>/<path>.arrow_file
+///
+/// Verification json file
+/// `arrow-ipc-stream/integration/<version>/<path>.json.gz
+fn verify_arrow_file(testdata: &str, version: &str, path: &str) {
+ let filename = format!(
+ "{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
+ testdata, version, path
+ );
+ println!("Verifying {filename}");
+
+ // Compare contents to the expected output format in JSON
+ {
+ println!(" verifying content");
+ let file = File::open(&filename).unwrap();
let mut reader = FileReader::try_new(file, None).unwrap();
// read expected JSON output
let arrow_json = read_gzip_json(version, path);
assert!(arrow_json.equals_reader(&mut reader).unwrap());
- });
-}
+ }
-#[test]
-fn read_generated_streams_100() {
- let testdata = arrow_test_data();
- let version = "1.0.0-littleendian";
- // the test is repetitive, thus we can read all supported files at once
- let paths = vec![
- "generated_interval",
- "generated_datetime",
- "generated_dictionary",
- "generated_map",
- // "generated_map_non_canonical",
- "generated_nested",
- "generated_null_trivial",
- "generated_null",
- "generated_primitive_no_batches",
- "generated_primitive_zerolength",
- "generated_primitive",
- ];
- paths.iter().for_each(|path| {
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/{}/{}.stream",
- testdata, version, path
- ))
- .unwrap();
-
- let mut reader = StreamReader::try_new(file, None).unwrap();
-
- // read expected JSON output
- let arrow_json = read_gzip_json(version, path);
- assert!(arrow_json.equals_reader(&mut reader).unwrap());
- // the next batch must be empty
- assert!(reader.next().is_none());
- // the stream must indicate that it's finished
- assert!(reader.is_finished());
- });
+ // Verify that projection works by selecting the first column
+ {
+ println!(" verifying projection");
+ let file = File::open(&filename).unwrap();
+ let reader = FileReader::try_new(file, Some(vec![0])).unwrap();
+ let datatype_0 = reader.schema().fields()[0].data_type().clone();
+ reader.for_each(|batch| {
+ let batch = batch.unwrap();
+ assert_eq!(batch.columns().len(), 1);
+ assert_eq!(datatype_0, batch.schema().fields()[0].data_type().clone());
+ });
+ }
}
-#[test]
-fn read_generated_streams_200() {
- let testdata = arrow_test_data();
- let version = "2.0.0-compression";
-
- // the test is repetitive, thus we can read all supported files at once
- let paths = vec!["generated_lz4", "generated_zstd"];
- paths.iter().for_each(|path| {
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/{}/{}.stream",
- testdata, version, path
- ))
- .unwrap();
-
+/// Verifies the arrow stream integration test
+///
+/// Input file:
+/// `arrow-ipc-stream/integration/<version>/<path>.stream
+///
+/// Verification json file
+/// `arrow-ipc-stream/integration/<version>/<path>.json.gz
+fn verify_arrow_stream(testdata: &str, version: &str, path: &str) {
+ let filename = format!(
+ "{}/arrow-ipc-stream/integration/{}/{}.stream",
+ testdata, version, path
+ );
+ println!("Verifying {filename}");
+
+ // Compare contents to the expected output format in JSON
+ {
+ println!(" verifying content");
+ let file = File::open(&filename).unwrap();
let mut reader = StreamReader::try_new(file, None).unwrap();
// read expected JSON output
@@ -268,26 +217,5 @@ fn read_generated_streams_200() {
assert!(reader.next().is_none());
// the stream must indicate that it's finished
assert!(reader.is_finished());
- });
-}
-
-#[test]
-fn read_generated_files_200() {
- let testdata = arrow_test_data();
- let version = "2.0.0-compression";
- // the test is repetitive, thus we can read all supported files at once
- let paths = vec!["generated_lz4", "generated_zstd"];
- paths.iter().for_each(|path| {
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
- testdata, version, path
- ))
- .unwrap();
-
- let mut reader = FileReader::try_new(file, None).unwrap();
-
- // read expected JSON output
- let arrow_json = read_gzip_json(version, path);
- assert!(arrow_json.equals_reader(&mut reader).unwrap());
- });
+ }
}
diff --git a/arrow-integration-testing/tests/ipc_writer.rs b/arrow-integration-testing/tests/ipc_writer.rs
index 0aa17cd05..e429b5e5c 100644
--- a/arrow-integration-testing/tests/ipc_writer.rs
+++ b/arrow-integration-testing/tests/ipc_writer.rs
@@ -24,10 +24,9 @@ use std::fs::File;
use std::io::Seek;
#[test]
-fn read_and_rewrite_generated_files_014() {
+fn write_0_1_4() {
let testdata = arrow_test_data();
let version = "0.14.1";
- // the test is repetitive, thus we can read all supported files at once
let paths = vec![
"generated_interval",
"generated_datetime",
@@ -40,275 +39,239 @@ fn read_and_rewrite_generated_files_014() {
"generated_decimal",
];
paths.iter().for_each(|path| {
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
- testdata, version, path
- ))
- .unwrap();
-
- let mut reader = FileReader::try_new(file, None).unwrap();
-
- let mut file = tempfile::tempfile().unwrap();
-
- // read and rewrite the file to a temp location
- {
- let mut writer = FileWriter::try_new(&mut file, &reader.schema()).unwrap();
- while let Some(Ok(batch)) = reader.next() {
- writer.write(&batch).unwrap();
- }
- writer.finish().unwrap();
- }
- file.rewind().unwrap();
-
- let mut reader = FileReader::try_new(file, None).unwrap();
-
- // read expected JSON output
- let arrow_json = read_gzip_json(version, path);
- assert!(arrow_json.equals_reader(&mut reader).unwrap());
+ roundtrip_arrow_file(&testdata, version, path);
+ roundtrip_arrow_stream(&testdata, version, path);
});
}
#[test]
-fn read_and_rewrite_generated_streams_014() {
+fn write_0_1_7() {
let testdata = arrow_test_data();
- let version = "0.14.1";
- // the test is repetitive, thus we can read all supported files at once
- let paths = vec![
- "generated_interval",
- "generated_datetime",
- "generated_dictionary",
- "generated_map",
- "generated_nested",
- "generated_primitive_no_batches",
- "generated_primitive_zerolength",
- "generated_primitive",
- "generated_decimal",
- ];
+ let version = "0.17.1";
+ let paths = vec!["generated_union"];
paths.iter().for_each(|path| {
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/{}/{}.stream",
- testdata, version, path
- ))
- .unwrap();
-
- let reader = StreamReader::try_new(file, None).unwrap();
-
- let mut file = tempfile::tempfile().unwrap();
-
- // read and rewrite the stream to a temp location
- {
- let mut writer = StreamWriter::try_new(&mut file, &reader.schema()).unwrap();
- reader.for_each(|batch| {
- writer.write(&batch.unwrap()).unwrap();
- });
- writer.finish().unwrap();
- }
-
- file.rewind().unwrap();
- let mut reader = StreamReader::try_new(file, None).unwrap();
-
- // read expected JSON output
- let arrow_json = read_gzip_json(version, path);
- assert!(arrow_json.equals_reader(&mut reader).unwrap());
+ roundtrip_arrow_file(&testdata, version, path);
+ roundtrip_arrow_stream(&testdata, version, path);
});
}
#[test]
-fn read_and_rewrite_generated_files_100() {
+fn write_1_0_0_littleendian() {
let testdata = arrow_test_data();
let version = "1.0.0-littleendian";
- // the test is repetitive, thus we can read all supported files at once
let paths = vec![
- "generated_custom_metadata",
"generated_datetime",
- "generated_dictionary_unsigned",
+ "generated_custom_metadata",
+ "generated_decimal",
+ "generated_decimal256",
"generated_dictionary",
- // "generated_duplicate_fieldnames",
+ "generated_dictionary_unsigned",
+ "generated_duplicate_fieldnames",
+ "generated_extension",
"generated_interval",
"generated_map",
+ // thread 'write_1_0_0_littleendian' panicked at 'assertion failed: `(left == right)`
+ // "generated_map_non_canonical",
"generated_nested",
- // "generated_nested_large_offsets",
- "generated_null_trivial",
+ "generated_nested_dictionary",
+ "generated_nested_large_offsets",
"generated_null",
+ "generated_null_trivial",
+ "generated_primitive",
"generated_primitive_large_offsets",
"generated_primitive_no_batches",
"generated_primitive_zerolength",
- "generated_primitive",
- // "generated_recursive_nested",
+ "generated_recursive_nested",
+ "generated_union",
];
paths.iter().for_each(|path| {
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
- testdata, version, path
- ))
- .unwrap();
-
- let mut reader = FileReader::try_new(file, None).unwrap();
-
- let mut file = tempfile::tempfile().unwrap();
-
- // read and rewrite the file to a temp location
- {
- // write IPC version 5
- let options =
- IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5).unwrap();
- let mut writer =
- FileWriter::try_new_with_options(&mut file, &reader.schema(), options)
- .unwrap();
- while let Some(Ok(batch)) = reader.next() {
- writer.write(&batch).unwrap();
- }
- writer.finish().unwrap();
- }
-
- file.rewind().unwrap();
- let mut reader = FileReader::try_new(file, None).unwrap();
-
- // read expected JSON output
- let arrow_json = read_gzip_json(version, path);
- assert!(arrow_json.equals_reader(&mut reader).unwrap());
+ roundtrip_arrow_file(&testdata, version, path);
+ roundtrip_arrow_stream(&testdata, version, path);
});
}
#[test]
-fn read_and_rewrite_generated_streams_100() {
+fn write_2_0_0_compression() {
let testdata = arrow_test_data();
- let version = "1.0.0-littleendian";
- // the test is repetitive, thus we can read all supported files at once
- let paths = vec![
- "generated_custom_metadata",
- "generated_datetime",
- "generated_dictionary_unsigned",
- "generated_dictionary",
- // "generated_duplicate_fieldnames",
- "generated_interval",
- "generated_map",
- "generated_nested",
- // "generated_nested_large_offsets",
- "generated_null_trivial",
- "generated_null",
- "generated_primitive_large_offsets",
- "generated_primitive_no_batches",
- "generated_primitive_zerolength",
- "generated_primitive",
- // "generated_recursive_nested",
- ];
- paths.iter().for_each(|path| {
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/{}/{}.stream",
- testdata, version, path
- ))
- .unwrap();
-
- let reader = StreamReader::try_new(file, None).unwrap();
+ let version = "2.0.0-compression";
+ let paths = vec!["generated_lz4", "generated_zstd"];
- let mut file = tempfile::tempfile().unwrap();
+ // writer options for each compression type
+ let all_options = vec![
+ IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5)
+ .unwrap()
+ .try_with_compression(Some(ipc::CompressionType::LZ4_FRAME))
+ .unwrap(),
+ // write IPC version 5 with zstd
+ IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5)
+ .unwrap()
+ .try_with_compression(Some(ipc::CompressionType::ZSTD))
+ .unwrap(),
+ ];
- // read and rewrite the stream to a temp location
- {
- let options =
- IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5).unwrap();
- let mut writer =
- StreamWriter::try_new_with_options(&mut file, &reader.schema(), options)
- .unwrap();
- reader.for_each(|batch| {
- writer.write(&batch.unwrap()).unwrap();
- });
- writer.finish().unwrap();
+ paths.iter().for_each(|path| {
+ for options in &all_options {
+ println!("Using options {options:?}");
+ roundtrip_arrow_file_with_options(&testdata, version, path, options.clone());
+ roundtrip_arrow_stream_with_options(
+ &testdata,
+ version,
+ path,
+ options.clone(),
+ );
}
-
- file.rewind().unwrap();
-
- let mut reader = StreamReader::try_new(file, None).unwrap();
-
- // read expected JSON output
- let arrow_json = read_gzip_json(version, path);
- assert!(arrow_json.equals_reader(&mut reader).unwrap());
});
}
-#[test]
-fn read_and_rewrite_compression_files_200() {
- let testdata = arrow_test_data();
- let version = "2.0.0-compression";
- // the test is repetitive, thus we can read all supported files at once
- let paths = vec!["generated_lz4", "generated_zstd"];
- paths.iter().for_each(|path| {
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
- testdata, version, path
- ))
- .unwrap();
+/// Verifies the arrow file writer by reading the contents of an
+/// arrow_file, writing it to a file, and then ensuring the contents
+/// match the expected json contents. It also verifies that
+/// RecordBatches read from the new file matches the original.
+///
+/// Input file:
+/// `arrow-ipc-stream/integration/<version>/<path>.arrow_file
+///
+/// Verification json file
+/// `arrow-ipc-stream/integration/<version>/<path>.json.gz
+fn roundtrip_arrow_file(testdata: &str, version: &str, path: &str) {
+ roundtrip_arrow_file_with_options(testdata, version, path, IpcWriteOptions::default())
+}
+fn roundtrip_arrow_file_with_options(
+ testdata: &str,
+ version: &str,
+ path: &str,
+ options: IpcWriteOptions,
+) {
+ let filename = format!(
+ "{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
+ testdata, version, path
+ );
+ println!("Verifying {filename}");
+
+ let mut tempfile = tempfile::tempfile().unwrap();
+
+ {
+ println!(" writing to tempfile {tempfile:?}");
+ let file = File::open(&filename).unwrap();
let mut reader = FileReader::try_new(file, None).unwrap();
- let mut file = tempfile::tempfile().unwrap();
-
// read and rewrite the file to a temp location
{
- // write IPC version 5
- let options = IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5)
- .unwrap()
- .try_with_compression(Some(ipc::CompressionType::LZ4_FRAME))
- .unwrap();
-
- let mut writer =
- FileWriter::try_new_with_options(&mut file, &reader.schema(), options)
- .unwrap();
+ let mut writer = FileWriter::try_new_with_options(
+ &mut tempfile,
+ &reader.schema(),
+ options,
+ )
+ .unwrap();
while let Some(Ok(batch)) = reader.next() {
writer.write(&batch).unwrap();
}
writer.finish().unwrap();
}
+ }
- file.rewind().unwrap();
- let mut reader = FileReader::try_new(file, None).unwrap();
+ {
+ println!(" checking rewrite to with json");
+ tempfile.rewind().unwrap();
+ let mut reader = FileReader::try_new(&tempfile, None).unwrap();
- // read expected JSON output
let arrow_json = read_gzip_json(version, path);
assert!(arrow_json.equals_reader(&mut reader).unwrap());
- });
+ }
+
+ {
+ println!(" checking rewrite with original");
+ let file = File::open(&filename).unwrap();
+ let reader = FileReader::try_new(file, None).unwrap();
+
+ tempfile.rewind().unwrap();
+ let rewrite_reader = FileReader::try_new(&tempfile, None).unwrap();
+
+ // Compare to original reader
+ reader.into_iter().zip(rewrite_reader.into_iter()).for_each(
+ |(batch1, batch2)| {
+ assert_eq!(batch1.unwrap(), batch2.unwrap());
+ },
+ );
+ }
}
-#[test]
-fn read_and_rewrite_compression_stream_200() {
- let testdata = arrow_test_data();
- let version = "2.0.0-compression";
- // the test is repetitive, thus we can read all supported files at once
- let paths = vec!["generated_lz4", "generated_zstd"];
- paths.iter().for_each(|path| {
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/{}/{}.stream",
- testdata, version, path
- ))
- .unwrap();
-
- let reader = StreamReader::try_new(file, None).unwrap();
+/// Verifies the arrow file writer by reading the contents of an
+/// arrow_file, writing it to a file, and then ensuring the contents
+/// match the expected json contents. It also verifies that
+/// RecordBatches read from the new file matches the original.
+///
+/// Input file:
+/// `arrow-ipc-stream/integration/<version>/<path>.stream
+///
+/// Verification json file
+/// `arrow-ipc-stream/integration/<version>/<path>.json.gz
+fn roundtrip_arrow_stream(testdata: &str, version: &str, path: &str) {
+ roundtrip_arrow_stream_with_options(
+ testdata,
+ version,
+ path,
+ IpcWriteOptions::default(),
+ )
+}
- let mut file = tempfile::tempfile().unwrap();
+fn roundtrip_arrow_stream_with_options(
+ testdata: &str,
+ version: &str,
+ path: &str,
+ options: IpcWriteOptions,
+) {
+ let filename = format!(
+ "{}/arrow-ipc-stream/integration/{}/{}.stream",
+ testdata, version, path
+ );
+ println!("Verifying {filename}");
+
+ let mut tempfile = tempfile::tempfile().unwrap();
+
+ {
+ println!(" writing to tempfile {tempfile:?}");
+ let file = File::open(&filename).unwrap();
+ let mut reader = StreamReader::try_new(file, None).unwrap();
- // read and rewrite the stream to a temp location
+ // read and rewrite the file to a temp location
{
- let options = IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5)
- .unwrap()
- .try_with_compression(Some(ipc::CompressionType::ZSTD))
- .unwrap();
-
- let mut writer =
- StreamWriter::try_new_with_options(&mut file, &reader.schema(), options)
- .unwrap();
- reader.for_each(|batch| {
- writer.write(&batch.unwrap()).unwrap();
- });
+ let mut writer = StreamWriter::try_new_with_options(
+ &mut tempfile,
+ &reader.schema(),
+ options,
+ )
+ .unwrap();
+ while let Some(Ok(batch)) = reader.next() {
+ writer.write(&batch).unwrap();
+ }
writer.finish().unwrap();
}
+ }
- file.rewind().unwrap();
+ {
+ println!(" checking rewrite to with json");
+ tempfile.rewind().unwrap();
+ let mut reader = StreamReader::try_new(&tempfile, None).unwrap();
- let mut reader = StreamReader::try_new(file, None).unwrap();
-
- // read expected JSON output
let arrow_json = read_gzip_json(version, path);
assert!(arrow_json.equals_reader(&mut reader).unwrap());
- });
+ }
+
+ {
+ println!(" checking rewrite with original");
+ let file = File::open(&filename).unwrap();
+ let reader = StreamReader::try_new(file, None).unwrap();
+
+ tempfile.rewind().unwrap();
+ let rewrite_reader = StreamReader::try_new(&tempfile, None).unwrap();
+
+ // Compare to original reader
+ reader.into_iter().zip(rewrite_reader.into_iter()).for_each(
+ |(batch1, batch2)| {
+ assert_eq!(batch1.unwrap(), batch2.unwrap());
+ },
+ );
+ }
}
diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml
index 202b4c4f4..d83637cbc 100644
--- a/arrow/Cargo.toml
+++ b/arrow/Cargo.toml
@@ -265,10 +265,6 @@ name = "lexsort"
harness = false
required-features = ["test_utils"]
-[[test]]
-name = "ipc"
-required-features = ["test_utils", "ipc"]
-
[[test]]
name = "csv"
required-features = ["csv", "chrono-tz"]
diff --git a/arrow/tests/ipc.rs b/arrow/tests/ipc.rs
deleted file mode 100644
index abaa238ba..000000000
--- a/arrow/tests/ipc.rs
+++ /dev/null
@@ -1,61 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use arrow_ipc::reader::StreamReader;
-use arrow_ipc::writer::StreamWriter;
-use std::fs::File;
-use std::io::Seek;
-
-#[test]
-fn read_union_017() {
- let testdata = arrow::util::test_util::arrow_test_data();
- let data_file = File::open(format!(
- "{}/arrow-ipc-stream/integration/0.17.1/generated_union.stream",
- testdata,
- ))
- .unwrap();
-
- let reader = StreamReader::try_new(data_file, None).unwrap();
-
- let mut file = tempfile::tempfile().unwrap();
- // read and rewrite the stream to a temp location
- {
- let mut writer = StreamWriter::try_new(&mut file, &reader.schema()).unwrap();
- reader.for_each(|batch| {
- writer.write(&batch.unwrap()).unwrap();
- });
- writer.finish().unwrap();
- }
- file.rewind().unwrap();
-
- // Compare original file and rewrote file
- let rewrite_reader = StreamReader::try_new(file, None).unwrap();
-
- let data_file = File::open(format!(
- "{}/arrow-ipc-stream/integration/0.17.1/generated_union.stream",
- testdata,
- ))
- .unwrap();
- let reader = StreamReader::try_new(data_file, None).unwrap();
-
- reader
- .into_iter()
- .zip(rewrite_reader.into_iter())
- .for_each(|(batch1, batch2)| {
- assert_eq!(batch1.unwrap(), batch2.unwrap());
- });
-}