You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ne...@apache.org on 2020/09/25 16:02:31 UTC

[arrow] branch rust-parquet-arrow-writer updated (228aec1 -> ac971db)

This is an automated email from the ASF dual-hosted git repository.

nevime pushed a change to branch rust-parquet-arrow-writer
in repository https://gitbox.apache.org/repos/asf/arrow.git.


 discard 228aec1  ARROW-10095: [Rust] Update rust-parquet-arrow-writer branch's encode_arrow_schema with ipc changes
    omit 8f0ed91  ARROW-8423: [Rust] [Parquet] Serialize Arrow schema metadata
    omit adea0c3  ARROW-8289: [Rust] Parquet Arrow writer with nested support
     add 728dec5  ARROW-10022: [C++] Fix divide by zero and overflow error for scalar arithmetic benchmark
     add 0b83c92  ARROW-7302: [C++] CSV: allow dictionary types in explicit column types
     add ca12cd1  ARROW-10024: [C++][Parquet] Create nested reading benchmarks
     add 7a532ed  ARROW-8678: [C++/Python][Parquet] Remove old writer code path
     add 4716cd3  ARROW-3757: [R] R bindings for Flight RPC client
     add eba7347  ARROW-9971: [Rust] Improve speed of `take` by 2x-3x (change scaling with batch size)
     add 6b3df45  ARROW-9990: [Rust] [DataFusion] Fixed the NOT operator
     add f09e7f7  ARROW-10028: [Rust] Simplified macro
     add 5e53d8c  ARROW-9977: [Rust] Added min/max of [Large]StringArray
     add 31025a0  ARROW-9987: [Rust] [DataFusion] Improved docs for `Expr`
     add 9b35e96  ARROW-10001 [Rust] [DataFusion] Added developer guide to README.
     add 84b1512  ARROW-10034: [Rust] Fix Rust build on master
     add 1f30466  ARROW-10002: [Rust] Remove `default fn` from `PrimitiveArrayOps`
     add 7ca8ee0  ARROW-9338: [Rust]  Add clippy instructions
     add 24a4a44  ARROW-9902: [Rust] [DataFusion] Add array() built-in function
     add e067dea  ARROW-9937: [Rust] [DataFusion] Improved aggregations
     add a17717f  ARROW-10017: [Java] Fix LargeMemoryUtil long conversion
     add 0b4fa2a  ARROW-9969: [C++] Fix RecordBatchBuilder with dictionary types
     add 8cb15e6  ARROW-10048: [Rust] Fixed error in computing min/max with null entries.
     add bc987cd  ARROW-9922: [Rust] Add StructArray::TryFrom (+40%)
     add 5e1344b  ARROW-10037: [C++] Workaround to force find AWS SDK to look for shared libraries
     add 9875d29  ARROW-9946: [R] Check `sink` argument class in `ParquetFileWriter`
     add 248803c  ARROW-9775: [C++] Automatic S3 region selection
     add 44f3de2  ARROW-8494: [C++][Parquet] Full support for reading mixed list and structs
     add 9dd18a1  ARROW-10055: [Rust] DoubleEndedIterator implementation for NullableIter
     add c47b58a  ARROW-10013: [FlightRPC][C++] fix setting generic client options
     add c557ac3  ARROW-10035: [C++] Update vendored libraries
     add 40d6475  ARROW-10049: [C++/Python] Sync conda recipe with conda-forge
     add 9546388  ARROW-4189: [Rust] Added coverage report.
     add de7cc0f  ARROW-10062: [Rust] Fix for null elems at key position in dictionary arrays
     add 8595406  ARROW-10060: [Rust] [DataFusion] Fixed error on which Err were discarded in MergeExec.
     add 69d57d4  ARROW-10064: [C++] Resolve compile warnings on Apple Clang 12
     add 02287b4  ARROW-9078: [C++] Parquet read / write extension type with nested storage type
     add 8563b42  PARQUET-1878: [C++] lz4 codec is not compatible with Hadoop Lz4Codec
     add 66aad9d  ARROW-9010: [Java] Framework and interface changes for RecordBatch IPC buffer compression
     add 697f141  ARROW-10016: [Rust] Implement is null / is not null kernels
     add e3a8d06  ARROW-10044: [Rust] Improved Arrow's README.
     add f1f4001  ARROW-9897: [C++][Gandiva] Added to_date function
     add 10e29a2  ARROW-10073: [Python] Don't rely on dict item order in test_parquet_nested_storage
     add 152f8b0  ARROW-10066: [C++] Make sure default AWS region selection algorithm is used
     add ac86123  ARROW-9970: [Go] fix checkptr failure in sum methods
     add f7f5baa  ARROW-10077: [C++] Fix possible integer multiplication overflow
     add 3a32019  ARROW-9934 [Rust] Shape and stride check in tensor
     add 8eef4fd  ARROW-10075: [C++] Use nullopt from arrow::util instead of vendored namespace
     add a4115ba  ARROW-10074: [C++] Use string constructor instead of string_view.to_string
     add ea4a405  ARROW-10051: [C++][Compute] Move kernel state when merging
     add 04eb733  ARROW-9603: [C++] Fix parquet write to not assume leaf-array validity bitmaps have the same values as parent structs
     add 171e8bf  ARROW-10027: [C++] Fix Take array kernel for NullType
     add f358a29  ARROW-10076: [C++] Use temporary directory facility in all unit tests
     add c3e399c  ARROW-10065: [Rust] Simplify code (+500, -1k)
     add 5e150bb  ARROW-9557: [R] Iterating over parquet columns is slow in R
     add 6c31940  ARROW-10083: [C++] Improve Parquet fuzz seed corpus
     add 512e4d1  ARROW-10085: [C++] Fix S3 region resolution on Windows
     add cadaaa9  ARROW-10003: [C++] Create parent dir for any destination fs in CopyFiles
     add c0dd2e2  ARROW-8601: [Go][Flight] Implementations Flight RPC server and client
     add 6a35f8a  ARROW-10081: [C++/Python] Fix bash syntax in drone.io conda builds
     add b470be3  ARROW-10063: [Archery][CI] Fetch main branch in archery build only when it is a pull request
     add 3176548  ARROW-10092: [Dev][Go] Add grpc generated go files to rat exclusion list
     add a67d30c  ARROW-10087: [CI] Fix nightly docs job
     new 32d328b  ARROW-8289: [Rust] Parquet Arrow writer with nested support
     new 28b075d  ARROW-8423: [Rust] [Parquet] Serialize Arrow schema metadata
     new ac971db  ARROW-10095: [Rust] Update rust-parquet-arrow-writer branch's encode_arrow_schema with ipc changes

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (228aec1)
            \
             N -- N -- N   refs/heads/rust-parquet-arrow-writer (ac971db)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/archery.yml                      |    1 +
 .github/workflows/rust_cron.yml                    |   56 +
 LICENSE.txt                                        |    4 +-
 ci/appveyor-cpp-build.bat                          |    2 +-
 ci/appveyor-cpp-setup.bat                          |    1 -
 ci/docker/linux-apt-docs.dockerfile                |    8 +
 ci/scripts/rust_coverage.sh                        |   39 +
 cpp/cmake_modules/SetupCxxFlags.cmake              |    4 +-
 cpp/cmake_modules/ThirdpartyToolchain.cmake        |   20 +
 cpp/src/arrow/array/array_dict_test.cc             |   17 +-
 cpp/src/arrow/array/builder_dict.cc                |   41 +-
 cpp/src/arrow/array/builder_dict.h                 |   76 +-
 cpp/src/arrow/array/dict_internal.h                |   11 -
 cpp/src/arrow/buffer.cc                            |    2 +-
 cpp/src/arrow/builder.cc                           |    6 +
 cpp/src/arrow/compute/exec.cc                      |    2 +-
 cpp/src/arrow/compute/kernel.h                     |    2 +-
 cpp/src/arrow/compute/kernels/aggregate_basic.cc   |    6 +-
 .../compute/kernels/aggregate_basic_internal.h     |    6 +-
 cpp/src/arrow/compute/kernels/aggregate_mode.cc    |   10 +-
 .../compute/kernels/scalar_arithmetic_benchmark.cc |   21 +-
 cpp/src/arrow/compute/kernels/vector_selection.cc  |    4 +-
 .../arrow/compute/kernels/vector_selection_test.cc |    1 +
 cpp/src/arrow/csv/converter.cc                     |  763 +--
 cpp/src/arrow/csv/converter.h                      |    3 +
 cpp/src/arrow/csv/converter_test.cc                |   89 +-
 cpp/src/arrow/dataset/discovery.cc                 |    2 +-
 cpp/src/arrow/dataset/file_base.cc                 |    2 +-
 cpp/src/arrow/dataset/file_csv.cc                  |    2 +-
 cpp/src/arrow/dataset/filter.cc                    |    2 +-
 cpp/src/arrow/dataset/partition.cc                 |    2 +-
 cpp/src/arrow/extension_type.cc                    |   26 +
 cpp/src/arrow/extension_type.h                     |   10 +
 cpp/src/arrow/filesystem/filesystem.cc             |   44 +-
 cpp/src/arrow/filesystem/filesystem.h              |   24 +-
 cpp/src/arrow/filesystem/filesystem_test.cc        |   47 +-
 cpp/src/arrow/filesystem/path_util.cc              |   38 +-
 cpp/src/arrow/filesystem/path_util.h               |    6 +
 cpp/src/arrow/filesystem/s3fs.cc                   |  247 +-
 cpp/src/arrow/filesystem/s3fs.h                    |   23 +-
 cpp/src/arrow/filesystem/s3fs_narrative_test.cc    |    6 +-
 cpp/src/arrow/filesystem/s3fs_test.cc              |  100 +-
 cpp/src/arrow/flight/client.cc                     |   16 +-
 cpp/src/arrow/flight/flight_test.cc                |    2 +-
 cpp/src/arrow/io/buffered_test.cc                  |   10 +-
 cpp/src/arrow/io/file_test.cc                      |   64 +-
 cpp/src/arrow/io/memory_test.cc                    |    4 +-
 cpp/src/arrow/ipc/metadata_internal.cc             |    2 +-
 cpp/src/arrow/ipc/read_write_test.cc               |   41 +-
 cpp/src/arrow/json/parser.cc                       |    2 +-
 cpp/src/arrow/scalar.cc                            |    7 +-
 cpp/src/arrow/scalar_test.cc                       |    2 +-
 cpp/src/arrow/table_builder.cc                     |   16 +-
 cpp/src/arrow/table_builder_test.cc                |   27 +
 cpp/src/arrow/testing/gtest_util.cc                |   21 +
 cpp/src/arrow/testing/gtest_util.h                 |   11 +
 cpp/src/arrow/testing/random.cc                    |   35 +-
 cpp/src/arrow/testing/random.h                     |   49 +-
 cpp/src/arrow/type_traits.h                        |   28 +-
 cpp/src/arrow/util/atomic_shared_ptr.h             |   34 +-
 cpp/src/arrow/util/bit_stream_utils.h              |    3 +-
 cpp/src/arrow/util/compression.cc                  |   15 +
 cpp/src/arrow/util/compression.h                   |   13 +-
 cpp/src/arrow/util/compression_internal.h          |    3 +
 cpp/src/arrow/util/compression_lz4.cc              |  107 +
 cpp/src/arrow/util/compression_test.cc             |   70 +-
 cpp/src/arrow/util/cpu_info.h                      |    5 +
 cpp/src/arrow/util/hashing.h                       |    9 -
 cpp/src/arrow/util/io_util_test.cc                 |   11 +-
 cpp/src/arrow/util/simd.h                          |    4 +
 cpp/src/arrow/util/string.cc                       |    2 +-
 cpp/src/arrow/util/uri.cc                          |    2 +-
 cpp/src/arrow/util/utf8.cc                         |    7 +-
 cpp/src/arrow/vendored/datetime.h                  |    5 +
 cpp/src/arrow/vendored/datetime/README.md          |    2 +-
 cpp/src/arrow/vendored/datetime/date.h             |  101 +-
 cpp/src/arrow/vendored/datetime/ios.h              |    4 +-
 cpp/src/arrow/vendored/datetime/ios.mm             |  526 +--
 cpp/src/arrow/vendored/datetime/tz.cpp             |  145 +-
 cpp/src/arrow/vendored/datetime/tz.h               |  269 +-
 cpp/src/arrow/vendored/datetime/tz_private.h       |    6 +-
 cpp/src/arrow/vendored/string_view.hpp             |   49 +-
 cpp/src/arrow/vendored/utfcpp/README.md            |   28 +
 .../arrow/vendored/{utf8cpp => utfcpp}/checked.h   |   64 +-
 cpp/src/arrow/vendored/{utf8cpp => utfcpp}/core.h  |   45 +-
 cpp/src/arrow/vendored/utfcpp/cpp11.h              |  103 +
 cpp/src/arrow/vendored/xxhash.h                    |   10 -
 cpp/src/arrow/vendored/xxhash/README.md            |    3 +-
 cpp/src/arrow/vendored/xxhash/xxh3.h               | 1583 -------
 cpp/src/arrow/vendored/xxhash/xxhash.c             | 1141 +----
 cpp/src/arrow/vendored/xxhash/xxhash.h             | 4854 ++++++++++++++++++--
 cpp/src/gandiva/function_registry_datetime.cc      |    7 +
 cpp/src/gandiva/gdv_function_stubs.cc              |   25 +
 cpp/src/gandiva/tests/projector_test.cc            |   49 +-
 cpp/src/gandiva/tests/test_util.h                  |    2 +
 cpp/src/gandiva/to_date_holder.cc                  |   35 +-
 cpp/src/parquet/CMakeLists.txt                     |   22 +
 cpp/src/parquet/arrow/arrow_reader_writer_test.cc  |   56 +-
 cpp/src/parquet/arrow/generate_fuzz_corpus.cc      |   83 +-
 cpp/src/parquet/arrow/path_internal.cc             |    3 +
 cpp/src/parquet/arrow/path_internal.h              |    3 +
 cpp/src/parquet/arrow/reader.cc                    |  524 ++-
 cpp/src/parquet/arrow/reader_internal.cc           |  136 -
 cpp/src/parquet/arrow/reader_internal.h            |   10 -
 cpp/src/parquet/arrow/reader_writer_benchmark.cc   |  142 +-
 cpp/src/parquet/arrow/reconstruct_internal_test.cc |  149 +-
 cpp/src/parquet/arrow/schema.cc                    |   84 +-
 cpp/src/parquet/arrow/schema.h                     |   11 +-
 cpp/src/parquet/arrow/writer.cc                    |  381 +-
 cpp/src/parquet/arrow/writer.h                     |    2 -
 cpp/src/parquet/column_reader.cc                   |  135 +-
 cpp/src/parquet/column_reader.h                    |   23 +-
 cpp/src/parquet/column_writer.cc                   |  307 +-
 cpp/src/parquet/column_writer.h                    |   11 +-
 cpp/src/parquet/column_writer_test.cc              |   10 +-
 cpp/src/parquet/exception.h                        |    7 +
 cpp/src/parquet/file_deserialize_test.cc           |    8 +-
 cpp/src/parquet/file_serialize_test.cc             |   15 +-
 cpp/src/parquet/level_comparison.cc                |   82 +
 .../util/simd.h => parquet/level_comparison.h}     |   38 +-
 .../src/parquet/level_comparison_avx2.cc           |   26 +-
 cpp/src/parquet/level_comparison_inc.h             |   64 +
 cpp/src/parquet/level_conversion.cc                |  306 +-
 cpp/src/parquet/level_conversion.h                 |   97 +-
 cpp/src/parquet/level_conversion_benchmark.cc      |   16 +-
 .../src/parquet/level_conversion_bmi2.cc           |   29 +-
 cpp/src/parquet/level_conversion_inc.h             |  140 +
 cpp/src/parquet/level_conversion_test.cc           |  305 +-
 cpp/src/parquet/reader_test.cc                     |   74 +-
 cpp/src/parquet/statistics.cc                      |    8 +-
 cpp/src/parquet/thrift_internal.h                  |    5 +-
 cpp/src/parquet/types.cc                           |   41 +-
 cpp/src/parquet/types.h                            |    9 -
 cpp/src/plasma/client.cc                           |    1 -
 cpp/submodules/parquet-testing                     |    2 +-
 dev/release/rat_exclude_files.txt                  |    1 +
 .../linux_aarch64_python3.6.____cpython.yaml       |   16 +-
 .../linux_aarch64_python3.7.____cpython.yaml       |   16 +-
 .../linux_aarch64_python3.8.____cpython.yaml       |   16 +-
 ...a_compiler_version9.2python3.6.____cpython.yaml |   10 +-
 ...a_compiler_version9.2python3.7.____cpython.yaml |   10 +-
 ...a_compiler_version9.2python3.8.____cpython.yaml |   10 +-
 ..._compiler_versionNonepython3.6.____cpython.yaml |   10 +-
 ..._compiler_versionNonepython3.7.____cpython.yaml |   10 +-
 ..._compiler_versionNonepython3.8.____cpython.yaml |   10 +-
 .../.ci_support/osx_python3.6.____cpython.yaml     |   14 +-
 .../.ci_support/osx_python3.7.____cpython.yaml     |   14 +-
 .../.ci_support/osx_python3.8.____cpython.yaml     |   14 +-
 .../.ci_support/win_python3.6.____cpython.yaml     |   12 +-
 .../.ci_support/win_python3.7.____cpython.yaml     |   12 +-
 .../.ci_support/win_python3.8.____cpython.yaml     |   12 +-
 dev/tasks/conda-recipes/arrow-cpp/bld-arrow.bat    |    3 +-
 dev/tasks/conda-recipes/arrow-cpp/build-arrow.sh   |    1 +
 dev/tasks/conda-recipes/arrow-cpp/meta.yaml        |   25 +-
 dev/tasks/conda-recipes/drone-steps.sh             |    2 +-
 docs/source/developers/cpp/conventions.rst         |   21 +
 format/Flight.proto                                |    2 +
 go/arrow/flight/Flight.pb.go                       | 1473 ++++++
 go/arrow/flight/Flight_grpc.pb.go                  |  877 ++++
 go/arrow/flight/client.go                          |   89 +
 go/arrow/flight/client_auth.go                     |   91 +
 go/arrow/flight/example_flight_server_test.go      |   86 +
 go/arrow/flight/flight_test.go                     |  305 ++
 go/arrow/{go.mod => flight/gen.go}                 |   12 +-
 go/arrow/flight/server.go                          |  118 +
 go/arrow/flight/server_auth.go                     |  145 +
 go/arrow/go.mod                                    |    8 +
 go/arrow/go.sum                                    |   94 +
 go/arrow/ipc/flight_data_reader.go                 |  210 +
 go/arrow/ipc/flight_data_writer.go                 |  150 +
 go/arrow/math/float64_avx2_amd64.go                |    4 +-
 go/arrow/math/float64_sse4_amd64.go                |    4 +-
 go/arrow/math/int64_avx2_amd64.go                  |    4 +-
 go/arrow/math/int64_sse4_amd64.go                  |    4 +-
 go/arrow/math/type_simd_amd64.go.tmpl              |    4 +-
 go/arrow/math/uint64_avx2_amd64.go                 |    4 +-
 go/arrow/math/uint64_sse4_amd64.go                 |    4 +-
 .../java/org/apache/arrow/flight/ArrowMessage.java |    9 +
 .../apache/arrow/memory/util/LargeMemoryUtil.java  |    4 +-
 .../arrow/memory/util/TestLargeMemoryUtil.java     |  105 +
 .../java/org/apache/arrow/vector/VectorLoader.java |   13 +-
 .../org/apache/arrow/vector/VectorUnloader.java    |   30 +-
 .../arrow/vector/compression/CompressionCodec.java |   51 +
 .../arrow/vector/compression/CompressionUtil.java  |   60 +
 .../vector/compression/NoCompressionCodec.java     |   54 +
 .../vector/ipc/message/ArrowBodyCompression.java}  |   59 +-
 .../arrow/vector/ipc/message/ArrowRecordBatch.java |   41 +-
 .../vector/ipc/message/MessageSerializer.java      |    8 +-
 python/pyarrow/_dataset.pyx                        |    3 +-
 python/pyarrow/_flight.pyx                         |    3 +-
 python/pyarrow/_parquet.pyx                        |    1 +
 python/pyarrow/_s3fs.pyx                           |    7 +
 python/pyarrow/includes/libarrow_fs.pxd            |    1 +
 python/pyarrow/parquet.py                          |    7 +-
 python/pyarrow/tests/test_compute.py               |   28 +
 python/pyarrow/tests/test_csv.py                   |   32 +
 python/pyarrow/tests/test_extension_type.py        |   75 +-
 python/pyarrow/tests/test_flight.py                |    2 +
 python/pyarrow/tests/test_fs.py                    |   29 +-
 python/pyarrow/tests/test_parquet.py               |   24 +-
 r/DESCRIPTION                                      |    1 +
 r/NAMESPACE                                        |    5 +
 r/R/arrow-package.R                                |    2 +-
 r/R/arrowExports.R                                 |   32 +-
 r/R/filesystem.R                                   |   19 +-
 r/R/flight.R                                       |   81 +
 r/R/parquet.R                                      |   77 +-
 r/_pkgdown.yml                                     |    6 +
 r/inst/demo_flight_server.py                       |  120 +
 r/man/ParquetFileReader.Rd                         |   21 +-
 r/man/ParquetFileWriter.Rd                         |    9 +
 r/man/flight_connect.Rd                            |   21 +
 r/man/flight_get.Rd                                |   19 +
 r/man/load_flight_server.Rd                        |   17 +
 r/man/push_data.Rd                                 |   21 +
 r/src/arrowExports.cpp                             |  135 +-
 r/src/filesystem.cpp                               |   20 +-
 r/src/parquet.cpp                                  |   55 +
 r/tests/testthat/test-parquet.R                    |   35 +
 r/vignettes/flight.Rmd                             |   78 +
 rust/README.md                                     |   21 +
 rust/arrow/README.md                               |   56 +-
 rust/arrow/benches/array_from_vec.rs               |   53 +
 rust/arrow/benches/take_kernels.rs                 |   79 +-
 rust/arrow/examples/tensor_builder.rs              |   67 +
 rust/arrow/src/array/array.rs                      | 1470 +++---
 rust/arrow/src/array/equal.rs                      |  809 +---
 rust/arrow/src/array/mod.rs                        |    8 +-
 rust/arrow/src/compute/kernels/aggregate.rs        |   85 +-
 rust/arrow/src/compute/kernels/boolean.rs          |  240 +-
 rust/arrow/src/compute/kernels/concat.rs           |    2 +-
 rust/arrow/src/compute/kernels/filter.rs           |   34 +-
 rust/arrow/src/compute/kernels/sort.rs             |    2 +-
 rust/arrow/src/compute/kernels/take.rs             |  241 +-
 rust/arrow/src/datatypes.rs                        |  146 +-
 rust/arrow/src/tensor.rs                           |  307 +-
 rust/arrow/src/util/pretty.rs                      |    1 +
 rust/datafusion/Cargo.toml                         |    1 +
 rust/datafusion/README.md                          |  104 +
 rust/datafusion/benches/aggregate_query_sql.rs     |  137 +-
 rust/datafusion/examples/simple_udf.rs             |    4 +-
 rust/datafusion/src/datasource/parquet.rs          |    1 +
 rust/datafusion/src/execution/context.rs           |   57 +-
 rust/datafusion/src/lib.rs                         |    1 +
 rust/datafusion/src/logical_plan/mod.rs            |  189 +-
 rust/datafusion/src/logical_plan/operators.rs      |    3 -
 rust/datafusion/src/physical_plan/aggregates.rs    |   22 +-
 .../src/physical_plan/array_expressions.rs         |  108 +
 rust/datafusion/src/physical_plan/common.rs        |  159 +-
 .../src/physical_plan/datetime_expressions.rs      |    4 +-
 rust/datafusion/src/physical_plan/explain.rs       |    6 +-
 rust/datafusion/src/physical_plan/expressions.rs   | 1791 ++++----
 rust/datafusion/src/physical_plan/filter.rs        |    7 +-
 rust/datafusion/src/physical_plan/functions.rs     |   98 +-
 .../datafusion/src/physical_plan/hash_aggregate.rs |  852 ++--
 .../src/physical_plan/math_expressions.rs          |    4 +-
 rust/datafusion/src/physical_plan/merge.rs         |    6 +-
 rust/datafusion/src/physical_plan/mod.rs           |  100 +-
 rust/datafusion/src/physical_plan/planner.rs       |   69 +-
 rust/datafusion/src/prelude.rs                     |    2 +-
 rust/datafusion/src/scalar.rs                      |  305 ++
 rust/datafusion/src/sql/planner.rs                 |   21 +-
 rust/datafusion/src/test/mod.rs                    |    1 +
 rust/datafusion/src/test/variable.rs               |    6 +-
 rust/datafusion/src/variable/mod.rs                |    2 +-
 rust/datafusion/tests/sql.rs                       |  276 +-
 rust/datafusion/tests/user_defined_plan.rs         |    5 +-
 267 files changed, 18988 insertions(+), 9140 deletions(-)
 create mode 100644 .github/workflows/rust_cron.yml
 create mode 100755 ci/scripts/rust_coverage.sh
 create mode 100644 cpp/src/arrow/vendored/utfcpp/README.md
 rename cpp/src/arrow/vendored/{utf8cpp => utfcpp}/checked.h (87%)
 rename cpp/src/arrow/vendored/{utf8cpp => utfcpp}/core.h (93%)
 create mode 100644 cpp/src/arrow/vendored/utfcpp/cpp11.h
 delete mode 100644 cpp/src/arrow/vendored/xxhash/xxh3.h
 create mode 100644 cpp/src/parquet/level_comparison.cc
 copy cpp/src/{arrow/util/simd.h => parquet/level_comparison.h} (61%)
 copy rust/datafusion/src/variable/mod.rs => cpp/src/parquet/level_comparison_avx2.cc (63%)
 create mode 100644 cpp/src/parquet/level_comparison_inc.h
 copy rust/datafusion/src/variable/mod.rs => cpp/src/parquet/level_conversion_bmi2.cc (55%)
 create mode 100644 cpp/src/parquet/level_conversion_inc.h
 create mode 100644 go/arrow/flight/Flight.pb.go
 create mode 100644 go/arrow/flight/Flight_grpc.pb.go
 create mode 100644 go/arrow/flight/client.go
 create mode 100644 go/arrow/flight/client_auth.go
 create mode 100644 go/arrow/flight/example_flight_server_test.go
 create mode 100644 go/arrow/flight/flight_test.go
 copy go/arrow/{go.mod => flight/gen.go} (73%)
 create mode 100644 go/arrow/flight/server.go
 create mode 100644 go/arrow/flight/server_auth.go
 create mode 100644 go/arrow/ipc/flight_data_reader.go
 create mode 100644 go/arrow/ipc/flight_data_writer.go
 create mode 100755 java/memory/memory-core/src/test/java/org/apache/arrow/memory/util/TestLargeMemoryUtil.java
 create mode 100644 java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java
 create mode 100644 java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java
 create mode 100644 java/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java
 copy java/{memory/memory-core/src/main/java/org/apache/arrow/memory/util/LargeMemoryUtil.java => vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBodyCompression.java} (50%)
 create mode 100644 r/R/flight.R
 create mode 100644 r/inst/demo_flight_server.py
 create mode 100644 r/man/flight_connect.Rd
 create mode 100644 r/man/flight_get.Rd
 create mode 100644 r/man/load_flight_server.Rd
 create mode 100644 r/man/push_data.Rd
 create mode 100644 r/vignettes/flight.Rmd
 create mode 100644 rust/arrow/examples/tensor_builder.rs
 create mode 100644 rust/datafusion/src/physical_plan/array_expressions.rs
 create mode 100644 rust/datafusion/src/scalar.rs


[arrow] 01/03: ARROW-8289: [Rust] Parquet Arrow writer with nested support

Posted by ne...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nevime pushed a commit to branch rust-parquet-arrow-writer
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 32d328b816e6cb3b89af967e262bf10bd48eca76
Author: Neville Dipale <ne...@gmail.com>
AuthorDate: Thu Aug 13 18:47:34 2020 +0200

    ARROW-8289: [Rust] Parquet Arrow writer with nested support
    
    **Note**: I started making changes to #6785, and ended up deviating a lot, so I opted for making a new draft PR in case my approach is not suitable.
    ___
    
    This is a draft to implement an arrow writer for parquet. It supports the following (no complete test coverage yet):
    
    * writing primitives except for booleans and binary
    * nested structs
    * null values (via definition levels)
    
    It does not yet support:
    
    - Boolean arrays (have to be handled differently from numeric values)
    - Binary arrays
    - Dictionary arrays
    - Union arrays (are they even possible?)
    
    I have only added a test by creating a nested schema, which I tested on pyarrow.
    
    ```jupyter
    # schema of test_complex.parquet
    
    a: int32 not null
    b: int32
    c: struct<d: double, e: struct<f: float>> not null
      child 0, d: double
      child 1, e: struct<f: float>
          child 0, f: float
    ```
    
    This PR potentially addresses:
    
    * https://issues.apache.org/jira/browse/ARROW-8289
    * https://issues.apache.org/jira/browse/ARROW-8423
    * https://issues.apache.org/jira/browse/ARROW-8424
    * https://issues.apache.org/jira/browse/ARROW-8425
    
    And I would like to propose either opening new JIRAs for the above incomplete items, or renaming the last 3 above.
    
    ___
    
    **Help Needed**
    
    I'm implementing the definition and repetition levels on first principle from an old Parquet blog post from the Twitter engineering blog. It's likely that I'm not getting some concepts correct, so I would appreciate help with:
    
    * Checking if my logic is correct
    * Guidance or suggestions on how to more efficiently extract levels from arrays
    * Adding tests - I suspect we might need a lot of tests, so far we only test writing 1 batch, so I don't know how paging would work when writing a large enough file
    
    I also don't know if the various encoding levels (dictionary, RLE, etc.) and compression levels are applied automagically, or if that'd be something we need to explicitly enable.
    
    CC @sunchao @sadikovi @andygrove @paddyhoran
    
    Might be of interest to @mcassels @maxburke
    
    Closes #7319 from nevi-me/arrow-parquet-writer
    
    Lead-authored-by: Neville Dipale <ne...@gmail.com>
    Co-authored-by: Max Burke <ma...@urbanlogiq.com>
    Co-authored-by: Andy Grove <an...@gmail.com>
    Co-authored-by: Max Burke <ma...@gmail.com>
    Signed-off-by: Neville Dipale <ne...@gmail.com>
---
 rust/parquet/src/arrow/arrow_writer.rs | 682 +++++++++++++++++++++++++++++++++
 rust/parquet/src/arrow/mod.rs          |   5 +-
 rust/parquet/src/schema/types.rs       |   6 +-
 3 files changed, 691 insertions(+), 2 deletions(-)

diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs
new file mode 100644
index 0000000..0c1c490
--- /dev/null
+++ b/rust/parquet/src/arrow/arrow_writer.rs
@@ -0,0 +1,682 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains writer which writes arrow data into parquet data.
+
+use std::rc::Rc;
+
+use arrow::array as arrow_array;
+use arrow::datatypes::{DataType as ArrowDataType, SchemaRef};
+use arrow::record_batch::RecordBatch;
+use arrow_array::Array;
+
+use crate::column::writer::ColumnWriter;
+use crate::errors::{ParquetError, Result};
+use crate::file::properties::WriterProperties;
+use crate::{
+    data_type::*,
+    file::writer::{FileWriter, ParquetWriter, RowGroupWriter, SerializedFileWriter},
+};
+
+/// Arrow writer
+///
+/// Writes Arrow `RecordBatch`es to a Parquet writer
+pub struct ArrowWriter<W: ParquetWriter> {
+    /// Underlying Parquet writer
+    writer: SerializedFileWriter<W>,
+    /// A copy of the Arrow schema.
+    ///
+    /// The schema is used to verify that each record batch written has the correct schema
+    arrow_schema: SchemaRef,
+}
+
+impl<W: 'static + ParquetWriter> ArrowWriter<W> {
+    /// Try to create a new Arrow writer
+    ///
+    /// The writer will fail if:
+    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
+    ///  * the Arrow schema contains unsupported datatypes such as Unions
+    pub fn try_new(
+        writer: W,
+        arrow_schema: SchemaRef,
+        props: Option<Rc<WriterProperties>>,
+    ) -> Result<Self> {
+        let schema = crate::arrow::arrow_to_parquet_schema(&arrow_schema)?;
+        let props = match props {
+            Some(props) => props,
+            None => Rc::new(WriterProperties::builder().build()),
+        };
+        let file_writer = SerializedFileWriter::new(
+            writer.try_clone()?,
+            schema.root_schema_ptr(),
+            props,
+        )?;
+
+        Ok(Self {
+            writer: file_writer,
+            arrow_schema,
+        })
+    }
+
+    /// Write a RecordBatch to writer
+    ///
+    /// *NOTE:* The writer currently does not support all Arrow data types
+    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+        // validate batch schema against writer's supplied schema
+        if self.arrow_schema != batch.schema() {
+            return Err(ParquetError::ArrowError(
+                "Record batch schema does not match writer schema".to_string(),
+            ));
+        }
+        // compute the definition and repetition levels of the batch
+        let mut levels = vec![];
+        batch.columns().iter().for_each(|array| {
+            let mut array_levels =
+                get_levels(array, 0, &vec![1i16; batch.num_rows()][..], None);
+            levels.append(&mut array_levels);
+        });
+        // reverse levels so we can use Vec::pop(&mut self)
+        levels.reverse();
+
+        let mut row_group_writer = self.writer.next_row_group()?;
+
+        // write leaves
+        for column in batch.columns() {
+            write_leaves(&mut row_group_writer, column, &mut levels)?;
+        }
+
+        self.writer.close_row_group(row_group_writer)
+    }
+
+    /// Close and finalise the underlying Parquet writer
+    pub fn close(&mut self) -> Result<()> {
+        self.writer.close()
+    }
+}
+
+/// Convenience method to get the next ColumnWriter from the RowGroupWriter
+#[inline]
+#[allow(clippy::borrowed_box)]
+fn get_col_writer(
+    row_group_writer: &mut Box<dyn RowGroupWriter>,
+) -> Result<ColumnWriter> {
+    let col_writer = row_group_writer
+        .next_column()?
+        .expect("Unable to get column writer");
+    Ok(col_writer)
+}
+
+#[allow(clippy::borrowed_box)]
+fn write_leaves(
+    mut row_group_writer: &mut Box<dyn RowGroupWriter>,
+    array: &arrow_array::ArrayRef,
+    mut levels: &mut Vec<Levels>,
+) -> Result<()> {
+    match array.data_type() {
+        ArrowDataType::Int8
+        | ArrowDataType::Int16
+        | ArrowDataType::Int32
+        | ArrowDataType::Int64
+        | ArrowDataType::UInt8
+        | ArrowDataType::UInt16
+        | ArrowDataType::UInt32
+        | ArrowDataType::UInt64
+        | ArrowDataType::Float16
+        | ArrowDataType::Float32
+        | ArrowDataType::Float64
+        | ArrowDataType::Timestamp(_, _)
+        | ArrowDataType::Date32(_)
+        | ArrowDataType::Date64(_)
+        | ArrowDataType::Time32(_)
+        | ArrowDataType::Time64(_)
+        | ArrowDataType::Duration(_)
+        | ArrowDataType::Interval(_)
+        | ArrowDataType::LargeBinary
+        | ArrowDataType::Binary
+        | ArrowDataType::Utf8
+        | ArrowDataType::LargeUtf8 => {
+            let mut col_writer = get_col_writer(&mut row_group_writer)?;
+            write_leaf(
+                &mut col_writer,
+                array,
+                levels.pop().expect("Levels exhausted"),
+            )?;
+            row_group_writer.close_column(col_writer)?;
+            Ok(())
+        }
+        ArrowDataType::List(_) | ArrowDataType::LargeList(_) => {
+            // write the child list
+            let data = array.data();
+            let child_array = arrow_array::make_array(data.child_data()[0].clone());
+            write_leaves(&mut row_group_writer, &child_array, &mut levels)?;
+            Ok(())
+        }
+        ArrowDataType::Struct(_) => {
+            let struct_array: &arrow_array::StructArray = array
+                .as_any()
+                .downcast_ref::<arrow_array::StructArray>()
+                .expect("Unable to get struct array");
+            for field in struct_array.columns() {
+                write_leaves(&mut row_group_writer, field, &mut levels)?;
+            }
+            Ok(())
+        }
+        ArrowDataType::FixedSizeList(_, _)
+        | ArrowDataType::Null
+        | ArrowDataType::Boolean
+        | ArrowDataType::FixedSizeBinary(_)
+        | ArrowDataType::Union(_)
+        | ArrowDataType::Dictionary(_, _) => Err(ParquetError::NYI(
+            "Attempting to write an Arrow type that is not yet implemented".to_string(),
+        )),
+    }
+}
+
+fn write_leaf(
+    writer: &mut ColumnWriter,
+    column: &arrow_array::ArrayRef,
+    levels: Levels,
+) -> Result<i64> {
+    let written = match writer {
+        ColumnWriter::Int32ColumnWriter(ref mut typed) => {
+            let array = arrow::compute::cast(column, &ArrowDataType::Int32)?;
+            let array = array
+                .as_any()
+                .downcast_ref::<arrow_array::Int32Array>()
+                .expect("Unable to get int32 array");
+            typed.write_batch(
+                get_numeric_array_slice::<Int32Type, _>(&array).as_slice(),
+                Some(levels.definition.as_slice()),
+                levels.repetition.as_deref(),
+            )?
+        }
+        ColumnWriter::BoolColumnWriter(ref mut _typed) => {
+            unreachable!("Currently unreachable because data type not supported")
+        }
+        ColumnWriter::Int64ColumnWriter(ref mut typed) => {
+            let array = arrow_array::Int64Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<Int64Type, _>(&array).as_slice(),
+                Some(levels.definition.as_slice()),
+                levels.repetition.as_deref(),
+            )?
+        }
+        ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
+            unreachable!("Currently unreachable because data type not supported")
+        }
+        ColumnWriter::FloatColumnWriter(ref mut typed) => {
+            let array = arrow_array::Float32Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<FloatType, _>(&array).as_slice(),
+                Some(levels.definition.as_slice()),
+                levels.repetition.as_deref(),
+            )?
+        }
+        ColumnWriter::DoubleColumnWriter(ref mut typed) => {
+            let array = arrow_array::Float64Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<DoubleType, _>(&array).as_slice(),
+                Some(levels.definition.as_slice()),
+                levels.repetition.as_deref(),
+            )?
+        }
+        ColumnWriter::ByteArrayColumnWriter(ref mut typed) => match column.data_type() {
+            ArrowDataType::Binary | ArrowDataType::Utf8 => {
+                let array = arrow_array::BinaryArray::from(column.data());
+                typed.write_batch(
+                    get_binary_array(&array).as_slice(),
+                    Some(levels.definition.as_slice()),
+                    levels.repetition.as_deref(),
+                )?
+            }
+            ArrowDataType::LargeBinary | ArrowDataType::LargeUtf8 => {
+                let array = arrow_array::LargeBinaryArray::from(column.data());
+                typed.write_batch(
+                    get_large_binary_array(&array).as_slice(),
+                    Some(levels.definition.as_slice()),
+                    levels.repetition.as_deref(),
+                )?
+            }
+            _ => unreachable!("Currently unreachable because data type not supported"),
+        },
+        ColumnWriter::FixedLenByteArrayColumnWriter(ref mut _typed) => {
+            unreachable!("Currently unreachable because data type not supported")
+        }
+    };
+    Ok(written as i64)
+}
+
+/// A struct that represents definition and repetition levels.
+/// Repetition levels are only populated if the parent or current leaf is repeated
+#[derive(Debug)]
+struct Levels {
+    definition: Vec<i16>,
+    repetition: Option<Vec<i16>>,
+}
+
+/// Compute nested levels of the Arrow array, recursing into lists and structs
+fn get_levels(
+    array: &arrow_array::ArrayRef,
+    level: i16,
+    parent_def_levels: &[i16],
+    parent_rep_levels: Option<&[i16]>,
+) -> Vec<Levels> {
+    match array.data_type() {
+        ArrowDataType::Null => unimplemented!(),
+        ArrowDataType::Boolean
+        | ArrowDataType::Int8
+        | ArrowDataType::Int16
+        | ArrowDataType::Int32
+        | ArrowDataType::Int64
+        | ArrowDataType::UInt8
+        | ArrowDataType::UInt16
+        | ArrowDataType::UInt32
+        | ArrowDataType::UInt64
+        | ArrowDataType::Float16
+        | ArrowDataType::Float32
+        | ArrowDataType::Float64
+        | ArrowDataType::Utf8
+        | ArrowDataType::LargeUtf8
+        | ArrowDataType::Timestamp(_, _)
+        | ArrowDataType::Date32(_)
+        | ArrowDataType::Date64(_)
+        | ArrowDataType::Time32(_)
+        | ArrowDataType::Time64(_)
+        | ArrowDataType::Duration(_)
+        | ArrowDataType::Interval(_)
+        | ArrowDataType::Binary
+        | ArrowDataType::LargeBinary => vec![Levels {
+            definition: get_primitive_def_levels(array, parent_def_levels),
+            repetition: None,
+        }],
+        ArrowDataType::FixedSizeBinary(_) => unimplemented!(),
+        ArrowDataType::List(_) | ArrowDataType::LargeList(_) => {
+            let array_data = array.data();
+            let child_data = array_data.child_data().get(0).unwrap();
+            // get offsets, accounting for large offsets if present
+            let offsets: Vec<i64> = {
+                if let ArrowDataType::LargeList(_) = array.data_type() {
+                    unsafe { array_data.buffers()[0].typed_data::<i64>() }.to_vec()
+                } else {
+                    let offsets = unsafe { array_data.buffers()[0].typed_data::<i32>() };
+                    offsets.to_vec().into_iter().map(|v| v as i64).collect()
+                }
+            };
+            let child_array = arrow_array::make_array(child_data.clone());
+
+            let mut list_def_levels = Vec::with_capacity(child_array.len());
+            let mut list_rep_levels = Vec::with_capacity(child_array.len());
+            let rep_levels: Vec<i16> = parent_rep_levels
+                .map(|l| l.to_vec())
+                .unwrap_or_else(|| vec![0i16; parent_def_levels.len()]);
+            parent_def_levels
+                .iter()
+                .zip(rep_levels)
+                .zip(offsets.windows(2))
+                .for_each(|((parent_def_level, parent_rep_level), window)| {
+                    if *parent_def_level == 0 {
+                        // parent is null, list element must also be null
+                        list_def_levels.push(0);
+                        list_rep_levels.push(0);
+                    } else {
+                        // parent is not null, check if list is empty or null
+                        let start = window[0];
+                        let end = window[1];
+                        let len = end - start;
+                        if len == 0 {
+                            list_def_levels.push(*parent_def_level - 1);
+                            list_rep_levels.push(parent_rep_level);
+                        } else {
+                            list_def_levels.push(*parent_def_level);
+                            list_rep_levels.push(parent_rep_level);
+                            for _ in 1..len {
+                                list_def_levels.push(*parent_def_level);
+                                list_rep_levels.push(parent_rep_level + 1);
+                            }
+                        }
+                    }
+                });
+
+            // if datatype is a primitive, we can construct levels of the child array
+            match child_array.data_type() {
+                ArrowDataType::Null => unimplemented!(),
+                ArrowDataType::Boolean => unimplemented!(),
+                ArrowDataType::Int8
+                | ArrowDataType::Int16
+                | ArrowDataType::Int32
+                | ArrowDataType::Int64
+                | ArrowDataType::UInt8
+                | ArrowDataType::UInt16
+                | ArrowDataType::UInt32
+                | ArrowDataType::UInt64
+                | ArrowDataType::Float16
+                | ArrowDataType::Float32
+                | ArrowDataType::Float64
+                | ArrowDataType::Timestamp(_, _)
+                | ArrowDataType::Date32(_)
+                | ArrowDataType::Date64(_)
+                | ArrowDataType::Time32(_)
+                | ArrowDataType::Time64(_)
+                | ArrowDataType::Duration(_)
+                | ArrowDataType::Interval(_) => {
+                    let def_levels =
+                        get_primitive_def_levels(&child_array, &list_def_levels[..]);
+                    vec![Levels {
+                        definition: def_levels,
+                        repetition: Some(list_rep_levels),
+                    }]
+                }
+                ArrowDataType::Binary
+                | ArrowDataType::Utf8
+                | ArrowDataType::LargeUtf8 => unimplemented!(),
+                ArrowDataType::FixedSizeBinary(_) => unimplemented!(),
+                ArrowDataType::LargeBinary => unimplemented!(),
+                ArrowDataType::List(_) | ArrowDataType::LargeList(_) => {
+                    // nested list
+                    unimplemented!()
+                }
+                ArrowDataType::FixedSizeList(_, _) => unimplemented!(),
+                ArrowDataType::Struct(_) => get_levels(
+                    array,
+                    level + 1, // indicates a nesting level of 2 (list + struct)
+                    &list_def_levels[..],
+                    Some(&list_rep_levels[..]),
+                ),
+                ArrowDataType::Union(_) => unimplemented!(),
+                ArrowDataType::Dictionary(_, _) => unimplemented!(),
+            }
+        }
+        ArrowDataType::FixedSizeList(_, _) => unimplemented!(),
+        ArrowDataType::Struct(_) => {
+            let struct_array: &arrow_array::StructArray = array
+                .as_any()
+                .downcast_ref::<arrow_array::StructArray>()
+                .expect("Unable to get struct array");
+            let mut struct_def_levels = Vec::with_capacity(struct_array.len());
+            for i in 0..array.len() {
+                struct_def_levels.push(level + struct_array.is_valid(i) as i16);
+            }
+            // trying to create levels for struct's fields
+            let mut struct_levels = vec![];
+            struct_array.columns().into_iter().for_each(|col| {
+                let mut levels =
+                    get_levels(col, level + 1, &struct_def_levels[..], parent_rep_levels);
+                struct_levels.append(&mut levels);
+            });
+            struct_levels
+        }
+        ArrowDataType::Union(_) => unimplemented!(),
+        ArrowDataType::Dictionary(_, _) => unimplemented!(),
+    }
+}
+
+/// Get the definition levels of the numeric array, with level 0 being null and 1 being not null
+/// In the case where the array in question is a child of either a list or struct, the levels
+/// are incremented in accordance with the `level` parameter.
+/// Parent levels are either 0 or 1, and are used to higher (correct terminology?) leaves as null
+fn get_primitive_def_levels(
+    array: &arrow_array::ArrayRef,
+    parent_def_levels: &[i16],
+) -> Vec<i16> {
+    let mut array_index = 0;
+    let max_def_level = parent_def_levels.iter().max().unwrap();
+    let mut primitive_def_levels = vec![];
+    parent_def_levels.iter().for_each(|def_level| {
+        if def_level < max_def_level {
+            primitive_def_levels.push(*def_level);
+        } else {
+            primitive_def_levels.push(def_level - array.is_null(array_index) as i16);
+            array_index += 1;
+        }
+    });
+    primitive_def_levels
+}
+
+macro_rules! def_get_binary_array_fn {
+    ($name:ident, $ty:ty) => {
+        fn $name(array: &$ty) -> Vec<ByteArray> {
+            let mut values = Vec::with_capacity(array.len() - array.null_count());
+            for i in 0..array.len() {
+                if array.is_valid(i) {
+                    let bytes = ByteArray::from(array.value(i).to_vec());
+                    values.push(bytes);
+                }
+            }
+            values
+        }
+    };
+}
+
+def_get_binary_array_fn!(get_binary_array, arrow_array::BinaryArray);
+def_get_binary_array_fn!(get_large_binary_array, arrow_array::LargeBinaryArray);
+
+/// Get the underlying numeric array slice, skipping any null values.
+/// If there are no null values, it might be quicker to get the slice directly instead of
+/// calling this function.
+fn get_numeric_array_slice<T, A>(array: &arrow_array::PrimitiveArray<A>) -> Vec<T::T>
+where
+    T: DataType,
+    A: arrow::datatypes::ArrowNumericType,
+    T::T: From<A::Native>,
+{
+    let mut values = Vec::with_capacity(array.len() - array.null_count());
+    for i in 0..array.len() {
+        if array.is_valid(i) {
+            values.push(array.value(i).into())
+        }
+    }
+    values
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    use std::io::Seek;
+    use std::sync::Arc;
+
+    use arrow::array::*;
+    use arrow::datatypes::ToByteSlice;
+    use arrow::datatypes::{DataType, Field, Schema};
+    use arrow::record_batch::{RecordBatch, RecordBatchReader};
+
+    use crate::arrow::{ArrowReader, ParquetFileArrowReader};
+    use crate::file::reader::SerializedFileReader;
+    use crate::util::test_common::get_temp_file;
+
+    #[test]
+    fn arrow_writer() {
+        // define schema
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Int32, true),
+        ]);
+
+        // create some data
+        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
+        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
+
+        // build a record batch
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![Arc::new(a), Arc::new(b)],
+        )
+        .unwrap();
+
+        let file = get_temp_file("test_arrow_writer.parquet", &[]);
+        let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap();
+        writer.write(&batch).unwrap();
+        writer.close().unwrap();
+    }
+
+    #[test]
+    fn arrow_writer_list() {
+        // define schema
+        let schema = Schema::new(vec![Field::new(
+            "a",
+            DataType::List(Box::new(DataType::Int32)),
+            false,
+        )]);
+
+        // create some data
+        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
+
+        // Construct a buffer for value offsets, for the nested array:
+        //  [[false], [true, false], null, [true, false, true], [false, true, false, true]]
+        let a_value_offsets =
+            arrow::buffer::Buffer::from(&[0, 1, 3, 3, 6, 10].to_byte_slice());
+
+        // Construct a list array from the above two
+        let a_list_data = ArrayData::builder(DataType::List(Box::new(DataType::Int32)))
+            .len(5)
+            .add_buffer(a_value_offsets)
+            .add_child_data(a_values.data())
+            .build();
+        let a = ListArray::from(a_list_data);
+
+        // build a record batch
+        let batch =
+            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)]).unwrap();
+
+        let file = get_temp_file("test_arrow_writer_list.parquet", &[]);
+        let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap();
+        writer.write(&batch).unwrap();
+        writer.close().unwrap();
+    }
+
+    #[test]
+    fn arrow_writer_binary() {
+        let string_field = Field::new("a", DataType::Utf8, false);
+        let binary_field = Field::new("b", DataType::Binary, false);
+        let schema = Schema::new(vec![string_field, binary_field]);
+
+        let raw_string_values = vec!["foo", "bar", "baz", "quux"];
+        let raw_binary_values = vec![
+            b"foo".to_vec(),
+            b"bar".to_vec(),
+            b"baz".to_vec(),
+            b"quux".to_vec(),
+        ];
+        let raw_binary_value_refs = raw_binary_values
+            .iter()
+            .map(|x| x.as_slice())
+            .collect::<Vec<_>>();
+
+        let string_values = StringArray::from(raw_string_values.clone());
+        let binary_values = BinaryArray::from(raw_binary_value_refs);
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![Arc::new(string_values), Arc::new(binary_values)],
+        )
+        .unwrap();
+
+        let mut file = get_temp_file("test_arrow_writer.parquet", &[]);
+        let mut writer =
+            ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema), None)
+                .unwrap();
+        writer.write(&batch).unwrap();
+        writer.close().unwrap();
+
+        file.seek(std::io::SeekFrom::Start(0)).unwrap();
+        let file_reader = SerializedFileReader::new(file).unwrap();
+        let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(file_reader));
+        let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap();
+
+        let batch = record_batch_reader.next_batch().unwrap().unwrap();
+        let string_col = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        let binary_col = batch
+            .column(1)
+            .as_any()
+            .downcast_ref::<BinaryArray>()
+            .unwrap();
+
+        for i in 0..batch.num_rows() {
+            assert_eq!(string_col.value(i), raw_string_values[i]);
+            assert_eq!(binary_col.value(i), raw_binary_values[i].as_slice());
+        }
+    }
+
+    #[test]
+    fn arrow_writer_complex() {
+        // define schema
+        let struct_field_d = Field::new("d", DataType::Float64, true);
+        let struct_field_f = Field::new("f", DataType::Float32, true);
+        let struct_field_g =
+            Field::new("g", DataType::List(Box::new(DataType::Int16)), false);
+        let struct_field_e = Field::new(
+            "e",
+            DataType::Struct(vec![struct_field_f.clone(), struct_field_g.clone()]),
+            true,
+        );
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Int32, true),
+            Field::new(
+                "c",
+                DataType::Struct(vec![struct_field_d.clone(), struct_field_e.clone()]),
+                false,
+            ),
+        ]);
+
+        // create some data
+        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
+        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
+        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
+        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
+
+        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
+
+        // Construct a buffer for value offsets, for the nested array:
+        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
+        let g_value_offsets =
+            arrow::buffer::Buffer::from(&[0, 1, 3, 3, 6, 10].to_byte_slice());
+
+        // Construct a list array from the above two
+        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
+            .len(5)
+            .add_buffer(g_value_offsets)
+            .add_child_data(g_value.data())
+            .build();
+        let g = ListArray::from(g_list_data);
+
+        let e = StructArray::from(vec![
+            (struct_field_f, Arc::new(f) as ArrayRef),
+            (struct_field_g, Arc::new(g) as ArrayRef),
+        ]);
+
+        let c = StructArray::from(vec![
+            (struct_field_d, Arc::new(d) as ArrayRef),
+            (struct_field_e, Arc::new(e) as ArrayRef),
+        ]);
+
+        // build a record batch
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![Arc::new(a), Arc::new(b), Arc::new(c)],
+        )
+        .unwrap();
+
+        let file = get_temp_file("test_arrow_writer_complex.parquet", &[]);
+        let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap();
+        writer.write(&batch).unwrap();
+        writer.close().unwrap();
+    }
+}
diff --git a/rust/parquet/src/arrow/mod.rs b/rust/parquet/src/arrow/mod.rs
index 02f50fd..c8739c2 100644
--- a/rust/parquet/src/arrow/mod.rs
+++ b/rust/parquet/src/arrow/mod.rs
@@ -51,10 +51,13 @@
 
 pub(in crate::arrow) mod array_reader;
 pub mod arrow_reader;
+pub mod arrow_writer;
 pub(in crate::arrow) mod converter;
 pub(in crate::arrow) mod record_reader;
 pub mod schema;
 
 pub use self::arrow_reader::ArrowReader;
 pub use self::arrow_reader::ParquetFileArrowReader;
-pub use self::schema::{parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns};
+pub use self::schema::{
+    arrow_to_parquet_schema, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns,
+};
diff --git a/rust/parquet/src/schema/types.rs b/rust/parquet/src/schema/types.rs
index 416073a..5799905 100644
--- a/rust/parquet/src/schema/types.rs
+++ b/rust/parquet/src/schema/types.rs
@@ -788,7 +788,7 @@ impl SchemaDescriptor {
         result.clone()
     }
 
-    fn column_root_of(&self, i: usize) -> &Rc<Type> {
+    fn column_root_of(&self, i: usize) -> &TypePtr {
         assert!(
             i < self.leaves.len(),
             "Index out of bound: {} not in [0, {})",
@@ -810,6 +810,10 @@ impl SchemaDescriptor {
         self.schema.as_ref()
     }
 
+    pub fn root_schema_ptr(&self) -> TypePtr {
+        self.schema.clone()
+    }
+
     /// Returns schema name.
     pub fn name(&self) -> &str {
         self.schema.name()


[arrow] 02/03: ARROW-8423: [Rust] [Parquet] Serialize Arrow schema metadata

Posted by ne...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nevime pushed a commit to branch rust-parquet-arrow-writer
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 28b075d2481d3cf47542d96bff42e843845be4c6
Author: Neville Dipale <ne...@gmail.com>
AuthorDate: Tue Aug 18 18:39:37 2020 +0200

    ARROW-8423: [Rust] [Parquet] Serialize Arrow schema metadata
    
    This will allow preserving Arrow-specific metadata when writing or reading Parquet files created from C++ or Rust.
    If the schema can't be deserialised, the normal Parquet > Arrow schema conversion is performed.
    
    Closes #7917 from nevi-me/ARROW-8243
    
    Authored-by: Neville Dipale <ne...@gmail.com>
    Signed-off-by: Neville Dipale <ne...@gmail.com>
---
 rust/parquet/Cargo.toml                |   3 +-
 rust/parquet/src/arrow/arrow_writer.rs |  27 ++-
 rust/parquet/src/arrow/mod.rs          |   4 +
 rust/parquet/src/arrow/schema.rs       | 306 ++++++++++++++++++++++++++++-----
 rust/parquet/src/file/properties.rs    |   6 +-
 5 files changed, 290 insertions(+), 56 deletions(-)

diff --git a/rust/parquet/Cargo.toml b/rust/parquet/Cargo.toml
index 50d7c34..60e43c9 100644
--- a/rust/parquet/Cargo.toml
+++ b/rust/parquet/Cargo.toml
@@ -40,6 +40,7 @@ zstd = { version = "0.5", optional = true }
 chrono = "0.4"
 num-bigint = "0.3"
 arrow = { path = "../arrow", version = "2.0.0-SNAPSHOT", optional = true }
+base64 = { version = "*", optional = true }
 
 [dev-dependencies]
 rand = "0.7"
@@ -52,4 +53,4 @@ arrow = { path = "../arrow", version = "2.0.0-SNAPSHOT" }
 serde_json = { version = "1.0", features = ["preserve_order"] }
 
 [features]
-default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd"]
+default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs
index 0c1c490..1ca8d50 100644
--- a/rust/parquet/src/arrow/arrow_writer.rs
+++ b/rust/parquet/src/arrow/arrow_writer.rs
@@ -24,6 +24,7 @@ use arrow::datatypes::{DataType as ArrowDataType, SchemaRef};
 use arrow::record_batch::RecordBatch;
 use arrow_array::Array;
 
+use super::schema::add_encoded_arrow_schema_to_metadata;
 use crate::column::writer::ColumnWriter;
 use crate::errors::{ParquetError, Result};
 use crate::file::properties::WriterProperties;
@@ -53,17 +54,17 @@ impl<W: 'static + ParquetWriter> ArrowWriter<W> {
     pub fn try_new(
         writer: W,
         arrow_schema: SchemaRef,
-        props: Option<Rc<WriterProperties>>,
+        props: Option<WriterProperties>,
     ) -> Result<Self> {
         let schema = crate::arrow::arrow_to_parquet_schema(&arrow_schema)?;
-        let props = match props {
-            Some(props) => props,
-            None => Rc::new(WriterProperties::builder().build()),
-        };
+        // add serialized arrow schema
+        let mut props = props.unwrap_or_else(|| WriterProperties::builder().build());
+        add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
+
         let file_writer = SerializedFileWriter::new(
             writer.try_clone()?,
             schema.root_schema_ptr(),
-            props,
+            Rc::new(props),
         )?;
 
         Ok(Self {
@@ -495,7 +496,7 @@ mod tests {
     use arrow::record_batch::{RecordBatch, RecordBatchReader};
 
     use crate::arrow::{ArrowReader, ParquetFileArrowReader};
-    use crate::file::reader::SerializedFileReader;
+    use crate::file::{metadata::KeyValue, reader::SerializedFileReader};
     use crate::util::test_common::get_temp_file;
 
     #[test]
@@ -584,7 +585,7 @@ mod tests {
         )
         .unwrap();
 
-        let mut file = get_temp_file("test_arrow_writer.parquet", &[]);
+        let mut file = get_temp_file("test_arrow_writer_binary.parquet", &[]);
         let mut writer =
             ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema), None)
                 .unwrap();
@@ -674,8 +675,16 @@ mod tests {
         )
         .unwrap();
 
+        let props = WriterProperties::builder()
+            .set_key_value_metadata(Some(vec![KeyValue {
+                key: "test_key".to_string(),
+                value: Some("test_value".to_string()),
+            }]))
+            .build();
+
         let file = get_temp_file("test_arrow_writer_complex.parquet", &[]);
-        let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap();
+        let mut writer =
+            ArrowWriter::try_new(file, Arc::new(schema), Some(props)).unwrap();
         writer.write(&batch).unwrap();
         writer.close().unwrap();
     }
diff --git a/rust/parquet/src/arrow/mod.rs b/rust/parquet/src/arrow/mod.rs
index c8739c2..2bdb07c 100644
--- a/rust/parquet/src/arrow/mod.rs
+++ b/rust/parquet/src/arrow/mod.rs
@@ -58,6 +58,10 @@ pub mod schema;
 
 pub use self::arrow_reader::ArrowReader;
 pub use self::arrow_reader::ParquetFileArrowReader;
+pub use self::arrow_writer::ArrowWriter;
 pub use self::schema::{
     arrow_to_parquet_schema, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns,
 };
+
+/// Schema metadata key used to store serialized Arrow IPC schema
+pub const ARROW_SCHEMA_META_KEY: &str = "ARROW:schema";
diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs
index aebb9e7..d4cfe1f 100644
--- a/rust/parquet/src/arrow/schema.rs
+++ b/rust/parquet/src/arrow/schema.rs
@@ -26,24 +26,33 @@
 use std::collections::{HashMap, HashSet};
 use std::rc::Rc;
 
+use arrow::datatypes::{DataType, DateUnit, Field, Schema, TimeUnit};
+
 use crate::basic::{LogicalType, Repetition, Type as PhysicalType};
 use crate::errors::{ParquetError::ArrowError, Result};
-use crate::file::metadata::KeyValue;
+use crate::file::{metadata::KeyValue, properties::WriterProperties};
 use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type, TypePtr};
 
-use arrow::datatypes::TimeUnit;
-use arrow::datatypes::{DataType, DateUnit, Field, Schema};
-
-/// Convert parquet schema to arrow schema including optional metadata.
+/// Convert Parquet schema to Arrow schema including optional metadata.
+/// Attempts to decode any existing Arrow shcema metadata, falling back
+/// to converting the Parquet schema column-wise
 pub fn parquet_to_arrow_schema(
     parquet_schema: &SchemaDescriptor,
-    metadata: &Option<Vec<KeyValue>>,
+    key_value_metadata: &Option<Vec<KeyValue>>,
 ) -> Result<Schema> {
-    parquet_to_arrow_schema_by_columns(
-        parquet_schema,
-        0..parquet_schema.columns().len(),
-        metadata,
-    )
+    let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
+    let arrow_schema_metadata = metadata
+        .remove(super::ARROW_SCHEMA_META_KEY)
+        .map(|encoded| get_arrow_schema_from_metadata(&encoded));
+
+    match arrow_schema_metadata {
+        Some(Some(schema)) => Ok(schema),
+        _ => parquet_to_arrow_schema_by_columns(
+            parquet_schema,
+            0..parquet_schema.columns().len(),
+            key_value_metadata,
+        ),
+    }
 }
 
 /// Convert parquet schema to arrow schema including optional metadata, only preserving some leaf columns.
@@ -81,6 +90,80 @@ where
         .map(|fields| Schema::new_with_metadata(fields, metadata))
 }
 
+/// Try to convert Arrow schema metadata into a schema
+fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Option<Schema> {
+    let decoded = base64::decode(encoded_meta);
+    match decoded {
+        Ok(bytes) => {
+            let slice = if bytes[0..4] == [255u8; 4] {
+                &bytes[8..]
+            } else {
+                bytes.as_slice()
+            };
+            let message = arrow::ipc::get_root_as_message(slice);
+            message
+                .header_as_schema()
+                .map(arrow::ipc::convert::fb_to_schema)
+        }
+        Err(err) => {
+            // The C++ implementation returns an error if the schema can't be parsed.
+            // To prevent this, we explicitly log this, then compute the schema without the metadata
+            eprintln!(
+                "Unable to decode the encoded schema stored in {}, {:?}",
+                super::ARROW_SCHEMA_META_KEY,
+                err
+            );
+            None
+        }
+    }
+}
+
+/// Encodes the Arrow schema into the IPC format, and base64 encodes it
+fn encode_arrow_schema(schema: &Schema) -> String {
+    let mut serialized_schema = arrow::ipc::writer::schema_to_bytes(&schema);
+
+    // manually prepending the length to the schema as arrow uses the legacy IPC format
+    // TODO: change after addressing ARROW-9777
+    let schema_len = serialized_schema.len();
+    let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
+    len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
+    len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
+    len_prefix_schema.append(&mut serialized_schema);
+
+    base64::encode(&len_prefix_schema)
+}
+
+/// Mutates writer metadata by storing the encoded Arrow schema.
+/// If there is an existing Arrow schema metadata, it is replaced.
+pub(crate) fn add_encoded_arrow_schema_to_metadata(
+    schema: &Schema,
+    props: &mut WriterProperties,
+) {
+    let encoded = encode_arrow_schema(schema);
+
+    let schema_kv = KeyValue {
+        key: super::ARROW_SCHEMA_META_KEY.to_string(),
+        value: Some(encoded),
+    };
+
+    let mut meta = props.key_value_metadata.clone().unwrap_or_default();
+    // check if ARROW:schema exists, and overwrite it
+    let schema_meta = meta
+        .iter()
+        .enumerate()
+        .find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY);
+    match schema_meta {
+        Some((i, _)) => {
+            meta.remove(i);
+            meta.push(schema_kv);
+        }
+        None => {
+            meta.push(schema_kv);
+        }
+    }
+    props.key_value_metadata = Some(meta);
+}
+
 /// Convert arrow schema to parquet schema
 pub fn arrow_to_parquet_schema(schema: &Schema) -> Result<SchemaDescriptor> {
     let fields: Result<Vec<TypePtr>> = schema
@@ -215,42 +298,48 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
             Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
                 .with_logical_type(LogicalType::INTERVAL)
                 .with_repetition(repetition)
-                .with_length(3)
+                .with_length(12)
+                .build()
+        }
+        DataType::Binary | DataType::LargeBinary => {
+            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
+                .with_repetition(repetition)
                 .build()
         }
-        DataType::Binary => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
-            .with_repetition(repetition)
-            .build(),
         DataType::FixedSizeBinary(length) => {
             Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
                 .with_repetition(repetition)
                 .with_length(*length)
                 .build()
         }
-        DataType::Utf8 => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
-            .with_logical_type(LogicalType::UTF8)
-            .with_repetition(repetition)
-            .build(),
-        DataType::List(dtype) | DataType::FixedSizeList(dtype, _) => {
-            Type::group_type_builder(name)
-                .with_fields(&mut vec![Rc::new(
-                    Type::group_type_builder("list")
-                        .with_fields(&mut vec![Rc::new({
-                            let list_field = Field::new(
-                                "element",
-                                *dtype.clone(),
-                                field.is_nullable(),
-                            );
-                            arrow_to_parquet_type(&list_field)?
-                        })])
-                        .with_repetition(Repetition::REPEATED)
-                        .build()?,
-                )])
-                .with_logical_type(LogicalType::LIST)
-                .with_repetition(Repetition::REQUIRED)
+        DataType::Utf8 | DataType::LargeUtf8 => {
+            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
+                .with_logical_type(LogicalType::UTF8)
+                .with_repetition(repetition)
                 .build()
         }
+        DataType::List(dtype)
+        | DataType::FixedSizeList(dtype, _)
+        | DataType::LargeList(dtype) => Type::group_type_builder(name)
+            .with_fields(&mut vec![Rc::new(
+                Type::group_type_builder("list")
+                    .with_fields(&mut vec![Rc::new({
+                        let list_field =
+                            Field::new("element", *dtype.clone(), field.is_nullable());
+                        arrow_to_parquet_type(&list_field)?
+                    })])
+                    .with_repetition(Repetition::REPEATED)
+                    .build()?,
+            )])
+            .with_logical_type(LogicalType::LIST)
+            .with_repetition(Repetition::REQUIRED)
+            .build(),
         DataType::Struct(fields) => {
+            if fields.is_empty() {
+                return Err(ArrowError(
+                    "Parquet does not support writing empty structs".to_string(),
+                ));
+            }
             // recursively convert children to types/nodes
             let fields: Result<Vec<TypePtr>> = fields
                 .iter()
@@ -267,9 +356,6 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
             let dict_field = Field::new(name, *value.clone(), field.is_nullable());
             arrow_to_parquet_type(&dict_field)
         }
-        DataType::LargeUtf8 | DataType::LargeBinary | DataType::LargeList(_) => {
-            Err(ArrowError("Large arrays not supported".to_string()))
-        }
     }
 }
 /// This struct is used to group methods and data structures used to convert parquet
@@ -555,12 +641,16 @@ impl ParquetTypeConverter<'_> {
 mod tests {
     use super::*;
 
-    use std::collections::HashMap;
+    use std::{collections::HashMap, convert::TryFrom, sync::Arc};
 
-    use arrow::datatypes::{DataType, DateUnit, Field, TimeUnit};
+    use arrow::datatypes::{DataType, DateUnit, Field, IntervalUnit, TimeUnit};
 
-    use crate::file::metadata::KeyValue;
-    use crate::schema::{parser::parse_message_type, types::SchemaDescriptor};
+    use crate::file::{metadata::KeyValue, reader::SerializedFileReader};
+    use crate::{
+        arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader},
+        schema::{parser::parse_message_type, types::SchemaDescriptor},
+        util::test_common::get_temp_file,
+    };
 
     #[test]
     fn test_flat_primitives() {
@@ -1195,6 +1285,17 @@ mod tests {
     }
 
     #[test]
+    #[should_panic(expected = "Parquet does not support writing empty structs")]
+    fn test_empty_struct_field() {
+        let arrow_fields = vec![Field::new("struct", DataType::Struct(vec![]), false)];
+        let arrow_schema = Schema::new(arrow_fields);
+        let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema);
+
+        assert!(converted_arrow_schema.is_err());
+        converted_arrow_schema.unwrap();
+    }
+
+    #[test]
     fn test_metadata() {
         let message_type = "
         message test_schema {
@@ -1216,4 +1317,123 @@ mod tests {
 
         assert_eq!(converted_arrow_schema.metadata(), &expected_metadata);
     }
+
+    #[test]
+    fn test_arrow_schema_roundtrip() -> Result<()> {
+        // This tests the roundtrip of an Arrow schema
+        // Fields that are commented out fail roundtrip tests or are unsupported by the writer
+        let metadata: HashMap<String, String> =
+            [("Key".to_string(), "Value".to_string())]
+                .iter()
+                .cloned()
+                .collect();
+
+        let schema = Schema::new_with_metadata(
+            vec![
+                Field::new("c1", DataType::Utf8, false),
+                Field::new("c2", DataType::Binary, false),
+                Field::new("c3", DataType::FixedSizeBinary(3), false),
+                Field::new("c4", DataType::Boolean, false),
+                Field::new("c5", DataType::Date32(DateUnit::Day), false),
+                Field::new("c6", DataType::Date64(DateUnit::Millisecond), false),
+                Field::new("c7", DataType::Time32(TimeUnit::Second), false),
+                Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false),
+                Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false),
+                Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false),
+                Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false),
+                Field::new(
+                    "c16",
+                    DataType::Timestamp(
+                        TimeUnit::Millisecond,
+                        Some(Arc::new("UTC".to_string())),
+                    ),
+                    false,
+                ),
+                Field::new(
+                    "c17",
+                    DataType::Timestamp(
+                        TimeUnit::Microsecond,
+                        Some(Arc::new("Africa/Johannesburg".to_string())),
+                    ),
+                    false,
+                ),
+                Field::new(
+                    "c18",
+                    DataType::Timestamp(TimeUnit::Nanosecond, None),
+                    false,
+                ),
+                Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false),
+                Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false),
+                Field::new("c21", DataType::List(Box::new(DataType::Boolean)), false),
+                Field::new(
+                    "c22",
+                    DataType::FixedSizeList(Box::new(DataType::Boolean), 5),
+                    false,
+                ),
+                Field::new(
+                    "c23",
+                    DataType::List(Box::new(DataType::List(Box::new(DataType::Struct(
+                        vec![
+                            Field::new("a", DataType::Int16, true),
+                            Field::new("b", DataType::Float64, false),
+                        ],
+                    ))))),
+                    true,
+                ),
+                Field::new(
+                    "c24",
+                    DataType::Struct(vec![
+                        Field::new("a", DataType::Utf8, false),
+                        Field::new("b", DataType::UInt16, false),
+                    ]),
+                    false,
+                ),
+                Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), true),
+                Field::new("c26", DataType::Interval(IntervalUnit::DayTime), true),
+                // Field::new("c27", DataType::Duration(TimeUnit::Second), false),
+                // Field::new("c28", DataType::Duration(TimeUnit::Millisecond), false),
+                // Field::new("c29", DataType::Duration(TimeUnit::Microsecond), false),
+                // Field::new("c30", DataType::Duration(TimeUnit::Nanosecond), false),
+                // Field::new_dict(
+                //     "c31",
+                //     DataType::Dictionary(
+                //         Box::new(DataType::Int32),
+                //         Box::new(DataType::Utf8),
+                //     ),
+                //     true,
+                //     123,
+                //     true,
+                // ),
+                Field::new("c32", DataType::LargeBinary, true),
+                Field::new("c33", DataType::LargeUtf8, true),
+                Field::new(
+                    "c34",
+                    DataType::LargeList(Box::new(DataType::LargeList(Box::new(
+                        DataType::Struct(vec![
+                            Field::new("a", DataType::Int16, true),
+                            Field::new("b", DataType::Float64, true),
+                        ]),
+                    )))),
+                    true,
+                ),
+            ],
+            metadata,
+        );
+
+        // write to an empty parquet file so that schema is serialized
+        let file = get_temp_file("test_arrow_schema_roundtrip.parquet", &[]);
+        let mut writer = ArrowWriter::try_new(
+            file.try_clone().unwrap(),
+            Arc::new(schema.clone()),
+            None,
+        )?;
+        writer.close()?;
+
+        // read file back
+        let parquet_reader = SerializedFileReader::try_from(file)?;
+        let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(parquet_reader));
+        let read_schema = arrow_reader.get_schema()?;
+        assert_eq!(schema, read_schema);
+        Ok(())
+    }
 }
diff --git a/rust/parquet/src/file/properties.rs b/rust/parquet/src/file/properties.rs
index 188d6ec..b62ce7b 100644
--- a/rust/parquet/src/file/properties.rs
+++ b/rust/parquet/src/file/properties.rs
@@ -89,8 +89,8 @@ pub type WriterPropertiesPtr = Rc<WriterProperties>;
 
 /// Writer properties.
 ///
-/// It is created as an immutable data structure, use [`WriterPropertiesBuilder`] to
-/// assemble the properties.
+/// All properties except the key-value metadata are immutable,
+/// use [`WriterPropertiesBuilder`] to assemble these properties.
 #[derive(Debug, Clone)]
 pub struct WriterProperties {
     data_pagesize_limit: usize,
@@ -99,7 +99,7 @@ pub struct WriterProperties {
     max_row_group_size: usize,
     writer_version: WriterVersion,
     created_by: String,
-    key_value_metadata: Option<Vec<KeyValue>>,
+    pub(crate) key_value_metadata: Option<Vec<KeyValue>>,
     default_column_properties: ColumnProperties,
     column_properties: HashMap<ColumnPath, ColumnProperties>,
 }


[arrow] 03/03: ARROW-10095: [Rust] Update rust-parquet-arrow-writer branch's encode_arrow_schema with ipc changes

Posted by ne...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nevime pushed a commit to branch rust-parquet-arrow-writer
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit ac971db48245d18cf879981358cd37d5d034a429
Author: Carol (Nichols || Goulding) <ca...@gmail.com>
AuthorDate: Fri Sep 25 17:54:11 2020 +0200

    ARROW-10095: [Rust] Update rust-parquet-arrow-writer branch's encode_arrow_schema with ipc changes
    
    Note that this PR is deliberately filed against the rust-parquet-arrow-writer branch, not master!!
    
    Hi! 👋 I'm looking to help out with the rust-parquet-arrow-writer branch, and I just pulled it down and it wasn't compiling because in 75f804efbfe367175fef5a2238d9cd2d30ed3afe, `schema_to_bytes` was changed to take `IpcWriteOptions` and to return `EncodedData`. This updates `encode_arrow_schema` to use those changes, which should get this branch compiling and passing tests again.
    
    I'm kind of guessing which JIRA ticket this should be associated with; honestly I think this commit can just be squashed with https://github.com/apache/arrow/commit/8f0ed91469f2e569472edaa3b69ffde051088555 next time this branch gets rebased.
    
    Please let me know if I should change anything, I'm happy to!
    
    Closes #8274 from carols10cents/update-with-ipc-changes
    
    Authored-by: Carol (Nichols || Goulding) <ca...@gmail.com>
    Signed-off-by: Neville Dipale <ne...@gmail.com>
---
 rust/parquet/src/arrow/schema.rs | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs
index d4cfe1f..d5a0ff9 100644
--- a/rust/parquet/src/arrow/schema.rs
+++ b/rust/parquet/src/arrow/schema.rs
@@ -27,6 +27,7 @@ use std::collections::{HashMap, HashSet};
 use std::rc::Rc;
 
 use arrow::datatypes::{DataType, DateUnit, Field, Schema, TimeUnit};
+use arrow::ipc::writer;
 
 use crate::basic::{LogicalType, Repetition, Type as PhysicalType};
 use crate::errors::{ParquetError::ArrowError, Result};
@@ -120,15 +121,16 @@ fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Option<Schema> {
 
 /// Encodes the Arrow schema into the IPC format, and base64 encodes it
 fn encode_arrow_schema(schema: &Schema) -> String {
-    let mut serialized_schema = arrow::ipc::writer::schema_to_bytes(&schema);
+    let options = writer::IpcWriteOptions::default();
+    let mut serialized_schema = arrow::ipc::writer::schema_to_bytes(&schema, &options);
 
     // manually prepending the length to the schema as arrow uses the legacy IPC format
     // TODO: change after addressing ARROW-9777
-    let schema_len = serialized_schema.len();
+    let schema_len = serialized_schema.ipc_message.len();
     let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
     len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
     len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
-    len_prefix_schema.append(&mut serialized_schema);
+    len_prefix_schema.append(&mut serialized_schema.ipc_message);
 
     base64::encode(&len_prefix_schema)
 }