You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/10/03 07:21:51 UTC

[GitHub] [arrow] nevi-me commented on a change in pull request #8330: [Rust] [Parquet] Add roundtrip Arrow -> Parquet tests for all supported Arrow DataTypes

nevi-me commented on a change in pull request #8330:
URL: https://github.com/apache/arrow/pull/8330#discussion_r499099739



##########
File path: rust/parquet/src/arrow/arrow_writer.rs
##########
@@ -688,4 +688,413 @@ mod tests {
         writer.write(&batch).unwrap();
         writer.close().unwrap();
     }
+
+    const SMALL_SIZE: usize = 100;
+
+    fn roundtrip(filename: &str, expected_batch: RecordBatch) {
+        let file = get_temp_file(filename, &[]);
+
+        let mut writer = ArrowWriter::try_new(
+            file.try_clone().unwrap(),
+            expected_batch.schema(),
+            None,
+        )
+        .unwrap();
+        writer.write(&expected_batch).unwrap();
+        writer.close().unwrap();
+
+        let reader = SerializedFileReader::new(file).unwrap();
+        let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(reader));
+        let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap();
+
+        let actual_batch = record_batch_reader.next_batch().unwrap().unwrap();
+
+        assert_eq!(expected_batch.schema(), actual_batch.schema());
+        assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
+        assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
+        for i in 0..expected_batch.num_columns() {
+            let expected_data = expected_batch.column(i).data();
+            let actual_data = actual_batch.column(i).data();
+
+            assert_eq!(expected_data.data_type(), actual_data.data_type());
+            assert_eq!(expected_data.len(), actual_data.len());
+            assert_eq!(expected_data.null_count(), actual_data.null_count());
+            assert_eq!(expected_data.offset(), actual_data.offset());
+            assert_eq!(expected_data.buffers(), actual_data.buffers());
+            assert_eq!(expected_data.child_data(), actual_data.child_data());
+            assert_eq!(expected_data.null_bitmap(), actual_data.null_bitmap());
+        }
+    }
+
+    fn one_column_roundtrip(filename: &str, values: ArrayRef, nullable: bool) {
+        let schema = Schema::new(vec![Field::new(
+            "col",
+            values.data_type().clone(),
+            nullable,
+        )]);
+        let expected_batch =
+            RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
+
+        roundtrip(filename, expected_batch);
+    }
+
+    fn values_required<A, I>(iter: I, filename: &str)
+    where
+        A: From<Vec<I::Item>> + Array + 'static,
+        I: IntoIterator,
+    {
+        let raw_values: Vec<_> = iter.into_iter().collect();
+        let values = Arc::new(A::from(raw_values));
+        one_column_roundtrip(filename, values, false);
+    }
+
+    fn values_optional<A, I>(iter: I, filename: &str)
+    where
+        A: From<Vec<Option<I::Item>>> + Array + 'static,
+        I: IntoIterator,
+    {
+        let optional_raw_values: Vec<_> = iter
+            .into_iter()
+            .enumerate()
+            .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
+            .collect();
+        let optional_values = Arc::new(A::from(optional_raw_values));
+        one_column_roundtrip(filename, optional_values, true);
+    }
+
+    fn required_and_optional<A, I>(iter: I, filename: &str)
+    where
+        A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
+        I: IntoIterator + Clone,
+    {
+        values_required::<A, I>(iter.clone(), filename);
+        values_optional::<A, I>(iter, filename);
+    }
+
+    #[test]
+    #[should_panic(expected = "Null arrays not supported")]
+    fn null_single_column() {
+        let values = Arc::new(NullArray::new(SMALL_SIZE));
+        one_column_roundtrip("null_single_column", values.clone(), true);
+        one_column_roundtrip("null_single_column", values, false);
+    }
+
+    #[test]
+    #[should_panic(
+        expected = "Attempting to write an Arrow type that is not yet implemented"
+    )]
+    fn bool_single_column() {
+        required_and_optional::<BooleanArray, _>(
+            [true, false].iter().cycle().copied().take(SMALL_SIZE),
+            "bool_single_column",
+        );
+    }
+
+    #[test]
+    fn i8_single_column() {
+        required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8, "i8_single_column");
+    }
+
+    #[test]
+    fn i16_single_column() {
+        required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16, "i16_single_column");
+    }
+
+    #[test]
+    fn i32_single_column() {
+        required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32, "i32_single_column");
+    }
+
+    #[test]
+    fn i64_single_column() {
+        required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64, "i64_single_column");
+    }
+
+    #[test]
+    fn u8_single_column() {
+        required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8, "u8_single_column");
+    }
+
+    #[test]
+    fn u16_single_column() {
+        required_and_optional::<UInt16Array, _>(
+            0..SMALL_SIZE as u16,
+            "u16_single_column",
+        );
+    }
+
+    #[test]
+    fn u32_single_column() {
+        required_and_optional::<UInt32Array, _>(
+            0..SMALL_SIZE as u32,
+            "u32_single_column",
+        );
+    }
+
+    #[test]
+    fn u64_single_column() {
+        required_and_optional::<UInt64Array, _>(
+            0..SMALL_SIZE as u64,
+            "u64_single_column",
+        );
+    }
+
+    // How to create Float16 values that aren't supported in Rust?
+
+    #[test]
+    fn f32_single_column() {
+        required_and_optional::<Float32Array, _>(
+            (0..SMALL_SIZE).map(|i| i as f32),
+            "f32_single_column",
+        );
+    }
+
+    #[test]
+    fn f64_single_column() {
+        required_and_optional::<Float64Array, _>(
+            (0..SMALL_SIZE).map(|i| i as f64),
+            "f64_single_column",
+        );
+    }
+
+    // The timestamp array types don't implement From<Vec<T>> because they need the timezone
+    // argument, and they also doesn't support building from a Vec<Option<T>>, so call
+    // one_column_roundtrip manually instead of calling required_and_optional for these tests.
+
+    #[test]
+    #[ignore] // Timestamp support isn't correct yet
+    fn timestamp_second_single_column() {
+        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
+        let values = Arc::new(TimestampSecondArray::from_vec(raw_values, None));
+
+        one_column_roundtrip("timestamp_second_single_column", values, false);
+    }
+
+    #[test]
+    fn timestamp_millisecond_single_column() {
+        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
+        let values = Arc::new(TimestampMillisecondArray::from_vec(raw_values, None));
+
+        one_column_roundtrip("timestamp_millisecond_single_column", values, false);
+    }
+
+    #[test]
+    fn timestamp_microsecond_single_column() {
+        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
+        let values = Arc::new(TimestampMicrosecondArray::from_vec(raw_values, None));
+
+        one_column_roundtrip("timestamp_microsecond_single_column", values, false);
+    }
+
+    #[test]
+    #[ignore] // Timestamp support isn't correct yet
+    fn timestamp_nanosecond_single_column() {
+        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
+        let values = Arc::new(TimestampNanosecondArray::from_vec(raw_values, None));
+
+        one_column_roundtrip("timestamp_nanosecond_single_column", values, false);
+    }
+
+    #[test]
+    fn date32_single_column() {
+        required_and_optional::<Date32Array, _>(
+            0..SMALL_SIZE as i32,
+            "date32_single_column",
+        );
+    }
+
+    #[test]
+    #[ignore] // Date support isn't correct yet

Review comment:
       The failure because of `ArrowError("underlying Arrow error: Compute error: Casting from Date64(Millisecond) to Int32 not supported")` can be addressed by adding a cast in `arrow::compute::kernels::cast`.
   
   We fail to cast the data in `arrow_writer::197`. The main reason why some casts aren't supported is because I was worried about type loss if one converts something like date32 (number of days since epoch) to int64 then back to date64 (number of millis since epoch). Such a cast should = date32 to date64, which I think it currently doesn't.

##########
File path: rust/parquet/src/arrow/arrow_writer.rs
##########
@@ -688,4 +688,413 @@ mod tests {
         writer.write(&batch).unwrap();
         writer.close().unwrap();
     }
+
+    const SMALL_SIZE: usize = 100;
+
+    fn roundtrip(filename: &str, expected_batch: RecordBatch) {
+        let file = get_temp_file(filename, &[]);
+
+        let mut writer = ArrowWriter::try_new(
+            file.try_clone().unwrap(),
+            expected_batch.schema(),
+            None,
+        )
+        .unwrap();
+        writer.write(&expected_batch).unwrap();
+        writer.close().unwrap();
+
+        let reader = SerializedFileReader::new(file).unwrap();
+        let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(reader));
+        let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap();
+
+        let actual_batch = record_batch_reader.next_batch().unwrap().unwrap();
+
+        assert_eq!(expected_batch.schema(), actual_batch.schema());
+        assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
+        assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
+        for i in 0..expected_batch.num_columns() {
+            let expected_data = expected_batch.column(i).data();
+            let actual_data = actual_batch.column(i).data();
+
+            assert_eq!(expected_data.data_type(), actual_data.data_type());
+            assert_eq!(expected_data.len(), actual_data.len());
+            assert_eq!(expected_data.null_count(), actual_data.null_count());
+            assert_eq!(expected_data.offset(), actual_data.offset());
+            assert_eq!(expected_data.buffers(), actual_data.buffers());
+            assert_eq!(expected_data.child_data(), actual_data.child_data());
+            assert_eq!(expected_data.null_bitmap(), actual_data.null_bitmap());
+        }
+    }
+
+    fn one_column_roundtrip(filename: &str, values: ArrayRef, nullable: bool) {
+        let schema = Schema::new(vec![Field::new(
+            "col",
+            values.data_type().clone(),
+            nullable,
+        )]);
+        let expected_batch =
+            RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
+
+        roundtrip(filename, expected_batch);
+    }
+
+    fn values_required<A, I>(iter: I, filename: &str)
+    where
+        A: From<Vec<I::Item>> + Array + 'static,
+        I: IntoIterator,
+    {
+        let raw_values: Vec<_> = iter.into_iter().collect();
+        let values = Arc::new(A::from(raw_values));
+        one_column_roundtrip(filename, values, false);
+    }
+
+    fn values_optional<A, I>(iter: I, filename: &str)
+    where
+        A: From<Vec<Option<I::Item>>> + Array + 'static,
+        I: IntoIterator,
+    {
+        let optional_raw_values: Vec<_> = iter
+            .into_iter()
+            .enumerate()
+            .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
+            .collect();
+        let optional_values = Arc::new(A::from(optional_raw_values));
+        one_column_roundtrip(filename, optional_values, true);
+    }
+
+    fn required_and_optional<A, I>(iter: I, filename: &str)
+    where
+        A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
+        I: IntoIterator + Clone,
+    {
+        values_required::<A, I>(iter.clone(), filename);
+        values_optional::<A, I>(iter, filename);
+    }
+
+    #[test]
+    #[should_panic(expected = "Null arrays not supported")]
+    fn null_single_column() {
+        let values = Arc::new(NullArray::new(SMALL_SIZE));
+        one_column_roundtrip("null_single_column", values.clone(), true);
+        one_column_roundtrip("null_single_column", values, false);
+    }
+
+    #[test]
+    #[should_panic(
+        expected = "Attempting to write an Arrow type that is not yet implemented"
+    )]
+    fn bool_single_column() {
+        required_and_optional::<BooleanArray, _>(
+            [true, false].iter().cycle().copied().take(SMALL_SIZE),
+            "bool_single_column",
+        );
+    }
+
+    #[test]
+    fn i8_single_column() {
+        required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8, "i8_single_column");
+    }
+
+    #[test]
+    fn i16_single_column() {
+        required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16, "i16_single_column");
+    }
+
+    #[test]
+    fn i32_single_column() {
+        required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32, "i32_single_column");
+    }
+
+    #[test]
+    fn i64_single_column() {
+        required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64, "i64_single_column");
+    }
+
+    #[test]
+    fn u8_single_column() {
+        required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8, "u8_single_column");
+    }
+
+    #[test]
+    fn u16_single_column() {
+        required_and_optional::<UInt16Array, _>(
+            0..SMALL_SIZE as u16,
+            "u16_single_column",
+        );
+    }
+
+    #[test]
+    fn u32_single_column() {
+        required_and_optional::<UInt32Array, _>(
+            0..SMALL_SIZE as u32,
+            "u32_single_column",
+        );
+    }
+
+    #[test]
+    fn u64_single_column() {
+        required_and_optional::<UInt64Array, _>(
+            0..SMALL_SIZE as u64,
+            "u64_single_column",
+        );
+    }
+
+    // How to create Float16 values that aren't supported in Rust?
+
+    #[test]
+    fn f32_single_column() {
+        required_and_optional::<Float32Array, _>(
+            (0..SMALL_SIZE).map(|i| i as f32),
+            "f32_single_column",
+        );
+    }
+
+    #[test]
+    fn f64_single_column() {
+        required_and_optional::<Float64Array, _>(
+            (0..SMALL_SIZE).map(|i| i as f64),
+            "f64_single_column",
+        );
+    }
+
+    // The timestamp array types don't implement From<Vec<T>> because they need the timezone
+    // argument, and they also doesn't support building from a Vec<Option<T>>, so call
+    // one_column_roundtrip manually instead of calling required_and_optional for these tests.
+
+    #[test]
+    #[ignore] // Timestamp support isn't correct yet
+    fn timestamp_second_single_column() {
+        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
+        let values = Arc::new(TimestampSecondArray::from_vec(raw_values, None));
+
+        one_column_roundtrip("timestamp_second_single_column", values, false);
+    }
+
+    #[test]
+    fn timestamp_millisecond_single_column() {
+        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
+        let values = Arc::new(TimestampMillisecondArray::from_vec(raw_values, None));
+
+        one_column_roundtrip("timestamp_millisecond_single_column", values, false);
+    }
+
+    #[test]
+    fn timestamp_microsecond_single_column() {
+        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
+        let values = Arc::new(TimestampMicrosecondArray::from_vec(raw_values, None));
+
+        one_column_roundtrip("timestamp_microsecond_single_column", values, false);
+    }
+
+    #[test]
+    #[ignore] // Timestamp support isn't correct yet
+    fn timestamp_nanosecond_single_column() {
+        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
+        let values = Arc::new(TimestampNanosecondArray::from_vec(raw_values, None));
+
+        one_column_roundtrip("timestamp_nanosecond_single_column", values, false);
+    }
+
+    #[test]
+    fn date32_single_column() {
+        required_and_optional::<Date32Array, _>(
+            0..SMALL_SIZE as i32,
+            "date32_single_column",
+        );
+    }
+
+    #[test]
+    #[ignore] // Date support isn't correct yet
+    fn date64_single_column() {
+        required_and_optional::<Date64Array, _>(
+            0..SMALL_SIZE as i64,
+            "date64_single_column",
+        );
+    }
+
+    #[test]
+    #[ignore] // Time support isn't correct yet
+    fn time32_second_single_column() {
+        required_and_optional::<Time32SecondArray, _>(
+            0..SMALL_SIZE as i32,
+            "time32_second_single_column",
+        );
+    }
+
+    #[test]
+    #[ignore] // Time support isn't correct yet
+    fn time32_millisecond_single_column() {
+        required_and_optional::<Time32MillisecondArray, _>(
+            0..SMALL_SIZE as i32,
+            "time32_millisecond_single_column",
+        );
+    }
+
+    #[test]
+    #[ignore] // Time support isn't correct yet
+    fn time64_microsecond_single_column() {
+        required_and_optional::<Time64MicrosecondArray, _>(
+            0..SMALL_SIZE as i64,
+            "time64_microsecond_single_column",
+        );
+    }
+
+    #[test]
+    #[ignore] // Time support isn't correct yet
+    fn time64_nanosecond_single_column() {
+        required_and_optional::<Time64NanosecondArray, _>(
+            0..SMALL_SIZE as i64,
+            "time64_nanosecond_single_column",
+        );
+    }
+
+    #[test]
+    #[should_panic(expected = "Converting Duration to parquet not supported")]
+    fn duration_second_single_column() {
+        required_and_optional::<DurationSecondArray, _>(
+            0..SMALL_SIZE as i64,
+            "duration_second_single_column",
+        );
+    }
+
+    #[test]
+    #[should_panic(expected = "Converting Duration to parquet not supported")]
+    fn duration_millisecond_single_column() {
+        required_and_optional::<DurationMillisecondArray, _>(
+            0..SMALL_SIZE as i64,
+            "duration_millisecond_single_column",
+        );
+    }
+
+    #[test]
+    #[should_panic(expected = "Converting Duration to parquet not supported")]
+    fn duration_microsecond_single_column() {
+        required_and_optional::<DurationMicrosecondArray, _>(
+            0..SMALL_SIZE as i64,
+            "duration_microsecond_single_column",
+        );
+    }
+
+    #[test]
+    #[should_panic(expected = "Converting Duration to parquet not supported")]
+    fn duration_nanosecond_single_column() {
+        required_and_optional::<DurationNanosecondArray, _>(
+            0..SMALL_SIZE as i64,
+            "duration_nanosecond_single_column",
+        );
+    }
+
+    #[test]
+    #[should_panic(expected = "Currently unreachable because data type not supported")]
+    fn interval_year_month_single_column() {
+        required_and_optional::<IntervalYearMonthArray, _>(
+            0..SMALL_SIZE as i32,
+            "interval_year_month_single_column",
+        );
+    }
+
+    #[test]
+    #[should_panic(expected = "Currently unreachable because data type not supported")]
+    fn interval_day_time_single_column() {
+        required_and_optional::<IntervalDayTimeArray, _>(
+            0..SMALL_SIZE as i64,
+            "interval_day_time_single_column",
+        );
+    }
+
+    #[test]
+    #[ignore] // Binary support isn't correct yet - null_bitmap doesn't match
+    fn binary_single_column() {
+        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
+        let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
+        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
+
+        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
+        values_required::<BinaryArray, _>(many_vecs_iter, "binary_single_column");
+    }
+
+    #[test]
+    #[ignore] // Large Binary support isn't correct yet

Review comment:
       In this case, the reader is supposed to get the correct type from the serialized schema, and then do the conversion to a large array

##########
File path: rust/parquet/src/arrow/arrow_writer.rs
##########
@@ -688,4 +688,413 @@ mod tests {
         writer.write(&batch).unwrap();
         writer.close().unwrap();
     }
+
+    const SMALL_SIZE: usize = 100;
+
+    fn roundtrip(filename: &str, expected_batch: RecordBatch) {
+        let file = get_temp_file(filename, &[]);
+
+        let mut writer = ArrowWriter::try_new(
+            file.try_clone().unwrap(),
+            expected_batch.schema(),
+            None,
+        )
+        .unwrap();
+        writer.write(&expected_batch).unwrap();
+        writer.close().unwrap();
+
+        let reader = SerializedFileReader::new(file).unwrap();
+        let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(reader));
+        let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap();
+
+        let actual_batch = record_batch_reader.next_batch().unwrap().unwrap();
+
+        assert_eq!(expected_batch.schema(), actual_batch.schema());
+        assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
+        assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
+        for i in 0..expected_batch.num_columns() {
+            let expected_data = expected_batch.column(i).data();
+            let actual_data = actual_batch.column(i).data();
+
+            assert_eq!(expected_data.data_type(), actual_data.data_type());
+            assert_eq!(expected_data.len(), actual_data.len());
+            assert_eq!(expected_data.null_count(), actual_data.null_count());
+            assert_eq!(expected_data.offset(), actual_data.offset());
+            assert_eq!(expected_data.buffers(), actual_data.buffers());
+            assert_eq!(expected_data.child_data(), actual_data.child_data());
+            assert_eq!(expected_data.null_bitmap(), actual_data.null_bitmap());
+        }
+    }
+
+    fn one_column_roundtrip(filename: &str, values: ArrayRef, nullable: bool) {
+        let schema = Schema::new(vec![Field::new(
+            "col",
+            values.data_type().clone(),
+            nullable,
+        )]);
+        let expected_batch =
+            RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
+
+        roundtrip(filename, expected_batch);
+    }
+
+    fn values_required<A, I>(iter: I, filename: &str)
+    where
+        A: From<Vec<I::Item>> + Array + 'static,
+        I: IntoIterator,
+    {
+        let raw_values: Vec<_> = iter.into_iter().collect();
+        let values = Arc::new(A::from(raw_values));
+        one_column_roundtrip(filename, values, false);
+    }
+
+    fn values_optional<A, I>(iter: I, filename: &str)
+    where
+        A: From<Vec<Option<I::Item>>> + Array + 'static,
+        I: IntoIterator,
+    {
+        let optional_raw_values: Vec<_> = iter
+            .into_iter()
+            .enumerate()
+            .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
+            .collect();
+        let optional_values = Arc::new(A::from(optional_raw_values));
+        one_column_roundtrip(filename, optional_values, true);
+    }
+
+    fn required_and_optional<A, I>(iter: I, filename: &str)
+    where
+        A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
+        I: IntoIterator + Clone,
+    {
+        values_required::<A, I>(iter.clone(), filename);
+        values_optional::<A, I>(iter, filename);
+    }
+
+    #[test]
+    #[should_panic(expected = "Null arrays not supported")]
+    fn null_single_column() {
+        let values = Arc::new(NullArray::new(SMALL_SIZE));
+        one_column_roundtrip("null_single_column", values.clone(), true);
+        one_column_roundtrip("null_single_column", values, false);
+    }
+
+    #[test]
+    #[should_panic(
+        expected = "Attempting to write an Arrow type that is not yet implemented"
+    )]
+    fn bool_single_column() {
+        required_and_optional::<BooleanArray, _>(
+            [true, false].iter().cycle().copied().take(SMALL_SIZE),
+            "bool_single_column",
+        );
+    }
+
+    #[test]
+    fn i8_single_column() {
+        required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8, "i8_single_column");
+    }
+
+    #[test]
+    fn i16_single_column() {
+        required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16, "i16_single_column");
+    }
+
+    #[test]
+    fn i32_single_column() {
+        required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32, "i32_single_column");
+    }
+
+    #[test]
+    fn i64_single_column() {
+        required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64, "i64_single_column");
+    }
+
+    #[test]
+    fn u8_single_column() {
+        required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8, "u8_single_column");
+    }
+
+    #[test]
+    fn u16_single_column() {
+        required_and_optional::<UInt16Array, _>(
+            0..SMALL_SIZE as u16,
+            "u16_single_column",
+        );
+    }
+
+    #[test]
+    fn u32_single_column() {
+        required_and_optional::<UInt32Array, _>(
+            0..SMALL_SIZE as u32,
+            "u32_single_column",
+        );
+    }
+
+    #[test]
+    fn u64_single_column() {
+        required_and_optional::<UInt64Array, _>(
+            0..SMALL_SIZE as u64,
+            "u64_single_column",
+        );
+    }
+
+    // How to create Float16 values that aren't supported in Rust?

Review comment:
       We can't create them, so we always leave `f16` as unsupported

##########
File path: rust/parquet/src/arrow/arrow_writer.rs
##########
@@ -688,4 +688,413 @@ mod tests {
         writer.write(&batch).unwrap();
         writer.close().unwrap();
     }
+
+    const SMALL_SIZE: usize = 100;
+
+    fn roundtrip(filename: &str, expected_batch: RecordBatch) {
+        let file = get_temp_file(filename, &[]);
+
+        let mut writer = ArrowWriter::try_new(
+            file.try_clone().unwrap(),
+            expected_batch.schema(),
+            None,
+        )
+        .unwrap();
+        writer.write(&expected_batch).unwrap();
+        writer.close().unwrap();
+
+        let reader = SerializedFileReader::new(file).unwrap();
+        let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(reader));
+        let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap();
+
+        let actual_batch = record_batch_reader.next_batch().unwrap().unwrap();
+
+        assert_eq!(expected_batch.schema(), actual_batch.schema());
+        assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
+        assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
+        for i in 0..expected_batch.num_columns() {
+            let expected_data = expected_batch.column(i).data();
+            let actual_data = actual_batch.column(i).data();
+
+            assert_eq!(expected_data.data_type(), actual_data.data_type());
+            assert_eq!(expected_data.len(), actual_data.len());
+            assert_eq!(expected_data.null_count(), actual_data.null_count());
+            assert_eq!(expected_data.offset(), actual_data.offset());
+            assert_eq!(expected_data.buffers(), actual_data.buffers());
+            assert_eq!(expected_data.child_data(), actual_data.child_data());
+            assert_eq!(expected_data.null_bitmap(), actual_data.null_bitmap());
+        }
+    }
+
+    fn one_column_roundtrip(filename: &str, values: ArrayRef, nullable: bool) {
+        let schema = Schema::new(vec![Field::new(
+            "col",
+            values.data_type().clone(),
+            nullable,
+        )]);
+        let expected_batch =
+            RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
+
+        roundtrip(filename, expected_batch);
+    }
+
+    fn values_required<A, I>(iter: I, filename: &str)
+    where
+        A: From<Vec<I::Item>> + Array + 'static,
+        I: IntoIterator,
+    {
+        let raw_values: Vec<_> = iter.into_iter().collect();
+        let values = Arc::new(A::from(raw_values));
+        one_column_roundtrip(filename, values, false);
+    }
+
+    fn values_optional<A, I>(iter: I, filename: &str)
+    where
+        A: From<Vec<Option<I::Item>>> + Array + 'static,
+        I: IntoIterator,
+    {
+        let optional_raw_values: Vec<_> = iter
+            .into_iter()
+            .enumerate()
+            .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
+            .collect();
+        let optional_values = Arc::new(A::from(optional_raw_values));
+        one_column_roundtrip(filename, optional_values, true);
+    }
+
+    fn required_and_optional<A, I>(iter: I, filename: &str)
+    where
+        A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
+        I: IntoIterator + Clone,
+    {
+        values_required::<A, I>(iter.clone(), filename);
+        values_optional::<A, I>(iter, filename);
+    }
+
+    #[test]
+    #[should_panic(expected = "Null arrays not supported")]
+    fn null_single_column() {
+        let values = Arc::new(NullArray::new(SMALL_SIZE));
+        one_column_roundtrip("null_single_column", values.clone(), true);
+        one_column_roundtrip("null_single_column", values, false);
+    }
+
+    #[test]
+    #[should_panic(
+        expected = "Attempting to write an Arrow type that is not yet implemented"
+    )]
+    fn bool_single_column() {
+        required_and_optional::<BooleanArray, _>(
+            [true, false].iter().cycle().copied().take(SMALL_SIZE),
+            "bool_single_column",
+        );
+    }
+
+    #[test]
+    fn i8_single_column() {
+        required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8, "i8_single_column");
+    }
+
+    #[test]
+    fn i16_single_column() {
+        required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16, "i16_single_column");
+    }
+
+    #[test]
+    fn i32_single_column() {
+        required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32, "i32_single_column");
+    }
+
+    #[test]
+    fn i64_single_column() {
+        required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64, "i64_single_column");
+    }
+
+    #[test]
+    fn u8_single_column() {
+        required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8, "u8_single_column");
+    }
+
+    #[test]
+    fn u16_single_column() {
+        required_and_optional::<UInt16Array, _>(
+            0..SMALL_SIZE as u16,
+            "u16_single_column",
+        );
+    }
+
+    #[test]
+    fn u32_single_column() {
+        required_and_optional::<UInt32Array, _>(
+            0..SMALL_SIZE as u32,
+            "u32_single_column",
+        );
+    }
+
+    #[test]
+    fn u64_single_column() {
+        required_and_optional::<UInt64Array, _>(
+            0..SMALL_SIZE as u64,
+            "u64_single_column",
+        );
+    }
+
+    // How to create Float16 values that aren't supported in Rust?
+
+    #[test]
+    fn f32_single_column() {
+        required_and_optional::<Float32Array, _>(
+            (0..SMALL_SIZE).map(|i| i as f32),
+            "f32_single_column",
+        );
+    }
+
+    #[test]
+    fn f64_single_column() {
+        required_and_optional::<Float64Array, _>(
+            (0..SMALL_SIZE).map(|i| i as f64),
+            "f64_single_column",
+        );
+    }
+
+    // The timestamp array types don't implement From<Vec<T>> because they need the timezone
+    // argument, and they also doesn't support building from a Vec<Option<T>>, so call
+    // one_column_roundtrip manually instead of calling required_and_optional for these tests.
+
+    #[test]
+    #[ignore] // Timestamp support isn't correct yet
+    fn timestamp_second_single_column() {
+        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
+        let values = Arc::new(TimestampSecondArray::from_vec(raw_values, None));
+
+        one_column_roundtrip("timestamp_second_single_column", values, false);
+    }
+
+    #[test]
+    fn timestamp_millisecond_single_column() {
+        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
+        let values = Arc::new(TimestampMillisecondArray::from_vec(raw_values, None));
+
+        one_column_roundtrip("timestamp_millisecond_single_column", values, false);
+    }
+
+    #[test]
+    fn timestamp_microsecond_single_column() {
+        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
+        let values = Arc::new(TimestampMicrosecondArray::from_vec(raw_values, None));
+
+        one_column_roundtrip("timestamp_microsecond_single_column", values, false);
+    }
+
+    #[test]
+    #[ignore] // Timestamp support isn't correct yet
+    fn timestamp_nanosecond_single_column() {
+        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
+        let values = Arc::new(TimestampNanosecondArray::from_vec(raw_values, None));
+
+        one_column_roundtrip("timestamp_nanosecond_single_column", values, false);
+    }
+
+    #[test]
+    fn date32_single_column() {
+        required_and_optional::<Date32Array, _>(
+            0..SMALL_SIZE as i32,
+            "date32_single_column",
+        );
+    }
+
+    #[test]
+    #[ignore] // Date support isn't correct yet
+    fn date64_single_column() {
+        required_and_optional::<Date64Array, _>(
+            0..SMALL_SIZE as i64,
+            "date64_single_column",
+        );
+    }
+
+    #[test]
+    #[ignore] // Time support isn't correct yet
+    fn time32_second_single_column() {
+        required_and_optional::<Time32SecondArray, _>(
+            0..SMALL_SIZE as i32,
+            "time32_second_single_column",
+        );
+    }
+
+    #[test]
+    #[ignore] // Time support isn't correct yet
+    fn time32_millisecond_single_column() {
+        required_and_optional::<Time32MillisecondArray, _>(
+            0..SMALL_SIZE as i32,
+            "time32_millisecond_single_column",
+        );
+    }
+
+    #[test]
+    #[ignore] // Time support isn't correct yet
+    fn time64_microsecond_single_column() {
+        required_and_optional::<Time64MicrosecondArray, _>(
+            0..SMALL_SIZE as i64,
+            "time64_microsecond_single_column",
+        );
+    }
+
+    #[test]
+    #[ignore] // Time support isn't correct yet
+    fn time64_nanosecond_single_column() {
+        required_and_optional::<Time64NanosecondArray, _>(
+            0..SMALL_SIZE as i64,
+            "time64_nanosecond_single_column",
+        );
+    }
+
+    #[test]
+    #[should_panic(expected = "Converting Duration to parquet not supported")]
+    fn duration_second_single_column() {
+        required_and_optional::<DurationSecondArray, _>(
+            0..SMALL_SIZE as i64,
+            "duration_second_single_column",
+        );
+    }
+
+    #[test]
+    #[should_panic(expected = "Converting Duration to parquet not supported")]
+    fn duration_millisecond_single_column() {
+        required_and_optional::<DurationMillisecondArray, _>(
+            0..SMALL_SIZE as i64,
+            "duration_millisecond_single_column",
+        );
+    }
+
+    #[test]
+    #[should_panic(expected = "Converting Duration to parquet not supported")]
+    fn duration_microsecond_single_column() {
+        required_and_optional::<DurationMicrosecondArray, _>(
+            0..SMALL_SIZE as i64,
+            "duration_microsecond_single_column",
+        );
+    }
+
+    #[test]
+    #[should_panic(expected = "Converting Duration to parquet not supported")]
+    fn duration_nanosecond_single_column() {
+        required_and_optional::<DurationNanosecondArray, _>(
+            0..SMALL_SIZE as i64,
+            "duration_nanosecond_single_column",
+        );
+    }
+
+    #[test]
+    #[should_panic(expected = "Currently unreachable because data type not supported")]
+    fn interval_year_month_single_column() {
+        required_and_optional::<IntervalYearMonthArray, _>(
+            0..SMALL_SIZE as i32,
+            "interval_year_month_single_column",
+        );
+    }
+
+    #[test]
+    #[should_panic(expected = "Currently unreachable because data type not supported")]
+    fn interval_day_time_single_column() {
+        required_and_optional::<IntervalDayTimeArray, _>(
+            0..SMALL_SIZE as i64,
+            "interval_day_time_single_column",
+        );
+    }
+
+    #[test]
+    #[ignore] // Binary support isn't correct yet - null_bitmap doesn't match
+    fn binary_single_column() {
+        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
+        let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
+        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
+
+        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
+        values_required::<BinaryArray, _>(many_vecs_iter, "binary_single_column");
+    }
+
+    #[test]
+    #[ignore] // Large Binary support isn't correct yet
+    fn large_binary_single_column() {
+        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
+        let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
+        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
+
+        // LargeBinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
+        values_required::<LargeBinaryArray, _>(
+            many_vecs_iter,
+            "large_binary_single_column",
+        );
+    }
+
+    #[test]
+    #[ignore] // String support isn't correct yet - null_bitmap doesn't match
+    fn string_single_column() {
+        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
+        let raw_strs = raw_values.iter().map(|s| s.as_str());
+
+        required_and_optional::<StringArray, _>(raw_strs, "string_single_column");
+    }
+
+    #[test]
+    #[ignore] // Large String support isn't correct yet - null_bitmap and buffers don't match
+    fn large_string_single_column() {
+        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
+        let raw_strs = raw_values.iter().map(|s| s.as_str());
+
+        required_and_optional::<LargeStringArray, _>(
+            raw_strs,
+            "large_string_single_column",
+        );
+    }
+
+    #[test]
+    #[should_panic(
+        expected = "Reading parquet list array into arrow is not supported yet!"
+    )]
+    fn list_single_column() {
+        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
+        let a_value_offsets =
+            arrow::buffer::Buffer::from(&[0, 1, 3, 3, 6, 10].to_byte_slice());
+        let a_list_data = ArrayData::builder(DataType::List(Box::new(DataType::Int32)))
+            .len(5)
+            .add_buffer(a_value_offsets)
+            .add_child_data(a_values.data())
+            .build();
+        let a = ListArray::from(a_list_data);
+
+        let values = Arc::new(a);
+        one_column_roundtrip("list_single_column", values, false);
+    }
+
+    #[test]
+    #[should_panic(
+        expected = "Reading parquet list array into arrow is not supported yet!"
+    )]
+    fn large_list_single_column() {
+        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
+        let a_value_offsets =
+            arrow::buffer::Buffer::from(&[0i64, 1, 3, 3, 6, 10].to_byte_slice());
+        let a_list_data =
+            ArrayData::builder(DataType::LargeList(Box::new(DataType::Int32)))
+                .len(5)
+                .add_buffer(a_value_offsets)
+                .add_child_data(a_values.data())
+                .build();
+        let a = LargeListArray::from(a_list_data);
+
+        let values = Arc::new(a);
+        one_column_roundtrip("large_list_single_column", values, false);
+    }
+
+    #[test]
+    #[ignore] // Struct support isn't correct yet - null_bitmap doesn't match
+    fn struct_single_column() {
+        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
+        let struct_field_a = Field::new("f", DataType::Int32, false);
+        let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);

Review comment:
       The cause might be this here. See https://issues.apache.org/jira/browse/ARROW-5408. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org