You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/09/11 22:07:33 UTC
[arrow] branch ARROW-6313-flatbuffer-alignment updated (4f9b887 ->
0352456)
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a change to branch ARROW-6313-flatbuffer-alignment
in repository https://gitbox.apache.org/repos/asf/arrow.git.
omit 4f9b887 ARROW-6315: [Java] Make change to ensure flatbuffer reads are aligned
omit 8bcef56 ARROW-6316: [Go] implement new ARROW format with 32b-aligned buffers
omit 397d0f8 ARROW-6314: [C++] Implement IPC message format alignment changes, provide backwards compatibility and "legacy" option to emit old message format
add 1c8c218 ARROW-6113: [Java] Support vector deduplicate function
add e0fedf6 ARROW-6304: [Java][Doc] Add a description to each module
add 6f7cc9b ARROW-6136: [FlightRPC][Java] don't double-close response stream
add 4e77035 ARROW-6297: [Java] Compare ArrowBufPointers by unsinged integers
add b4eeab0 ARROW-6306: [Java] Support stable sort by stable comparators
add d4d4a12 ARROW-6334: [Java] Improve the dictionary builder API to return the position of the value in the dictionary
add e29732b ARROW-6351: [Ruby] Improve Arrow#values performance
add a1dbba8 ARROW-6354: [C++] Fix failing build when ARROW_PARQUET=OFF
add 67d46c7 ARROW-4511: [Format][Docs] Revamp Format documentation, consolidate columnar format docs into a more coherent single document. Add Versioning/Stability page
add e5ccef5 ARROW-5522: [Packaging][Documentation] Comments out of date in python/manylinux1/build_arrow.sh
add 443ac07 ARROW-4648: [Doc] Add documentation about C++ file naming
add 63dbc12 ARROW-6376: [Developer] Use target ref of PR when merging instead of hard-coding "master"
add 05bc63c ARROW-6263: [Python] Use RecordBatch::Validate in RecordBatch.from_arrays. Normalize API vs. Table.from_arrays. Add record_batch factory function
add 115de28 ARROW-6373: [C++] Make FixedWidthBinaryBuilder consistent with other fixed width builders in zeroing memory when appending null batches
add 53384de ARROW-6372: [Rust][Datafusion] Casting from Un-signed to Signed Integers not supported
add 7ec1731 ARROW-453: [C++] Filesystem implementation for Amazon S3
add b9d8cd5 ARROW-6381: [C++] BufferOutputStream::Write does extra work that slows down small writes
add ab712d2 ARROW-6384: [C++] Bump dependency versions
add bcf5897 ARROW-6348: [R] arrow::read_csv_arrow namespace error when package not loaded
add 407973b ARROW-6371: [Doc] Row to columnar conversion example mentions arrow::Column in comments
add 99cdb7e ARROW-6144: [C++][Gandiva] Implement random functions in Gandiva
add 99b0e30 ARROW-6387: [Archery] Errors with make
add beea8f9 ARROW-6231: [C++] Allow generating CSV column names
add 157b179 ARROW-6078: [Java] Implement dictionary-encoded subfields for List type
add 2f3ea96 ARROW-6094: [FlightRPC] Add Flight RPC method getFlightSchema
add c2762a6 ARROW-2769: [Python] Deprecate and rename add_metadata methods
add f104f2d ARROW-6265: [Java] Avro adapter implement Array/Map/Fixed type
add 5a8285d ARROW-4095: [C++] Optimize DictionaryArray::Transpose() for trivial transpositions
add 0b41e53 ARROW-6397: [C++][CI] Generate minio server connect string
add 8cacb2f ARROW-6031: [Java] Support iterating a vector by ArrowBufPointer
add 7c46c27 ARROW-6247: [Java] Provide a common interface for float4 and float8 vectors
add 7d7e25e ARROW-6402: [C++] Suppress sign-compare warning with g++ 9.2.1
add a1b477f ARROW-5300: [C++] Remove the ARROW_NO_DEFAULT_MEMORY_POOL macro
add 2164e3b ARROW-4398: [C++][Python][Parquet] Improve BYTE_ARRAY PLAIN encoding write performance. Add BYTE_ARRAY write benchmarks
add 95640a0 ARROW-6347: [GLib] Add garrow_array_diff()
add 7d63dfe ARROW-6406: [C++] Fix jemalloc URL for offline build in thirdparty/versions.txt
add a179933 ARROW-6392: [FlightRPC][Python] check type of list_flights result
add b74b027 ARROW-6403: [Python] Expose FileReader::ReadRowGroups() to Python
add a985483 ARROW-4752: [Rust] Add explicit SIMD vectorization for the divide kernel
add 149d4cb ARROW-6383: [Java] Report outstanding child allocators on close
add 32ef12c ARROW-6063: [FlightRPC] implement half-closed semantics for DoPut
add 1c42e6f ARROW-6141: [C++] Enable memory-mapping a file region
add ab908cc ARROW-6411: [Python][Parquet] Improve performance of DictEncoder::PutIndices
add 9517ade ARROW-6269: [C++] check decimal precision in IPC code
add c39e350 ARROW-5610: [Python] define extension types in Python
add 48d9069 ARROW-6422: [Gandiva] Fix double-conversion linker issue
add 561f86d ARROW-6412: [C++] Improve TCP port allocation in tests
add 327057e ARROW-6424: [C++] Fix IPC fuzzing test name
add 6b714a1 ARROW-6423: [C++] Fix crash when trying to instantiate Snappy CompressedOutputStream
add 4a7dd43 ARROW-6415: [R] Remove usage of R CMD config CXXCPP
add d818299 ARROW-6358: [C++] Add FileSystem::DeleteDirContents
add 96928d5 ARROW-6296: [Java] Cleanup JDBC interfaces and eliminate one memcopy for binary/varchar fields
add 26f631f ARROW-4836: [C++] Support Tell() on compressed streams
add ea309dd ARROW-6450: [C++] Use 2x reallocation strategy in BufferBuilder instead of 1.5x
add 6330b2f ARROW-6355: [Java] Make range equal visitor reusable
add c0656af ARROW-6432: [CI][Crossbow] Remove alpine nightly crossbow jobs
add a6e0599 ARROW-6451: [Format] Add clarifications to Columnar.rst about the contents of "null" slots in Varbinary or List arrays
add eebae5f ARROW-6416: [Python] Improve API & documentation regarding chunksizes
add 131ae4d ARROW-6454: [LICENSE] Add LLVM's license due to static linkage
add ab8ee21 ARROW-6418: [C++][Plasma] Remove cmake project directive for plasma
add 552820a ARROW-5876: [C++][Python] add basic auth flight proto message to C++ and Python
add 243d488 ARROW-6431: [Python] Test suite fails without pandas installed
add 314e9f0 ARROW-6457: [C++] Always set CMAKE_BUILD_TYPE if it is not defined
add d0abe12 ARROW-6441: [Packaging][RPM] Follow plasma-store-server name change
add 1138b9b ARROW-6440: [Packaging][deb] Follow plasma-store-server name change
add 934c18e ARROW-6462: [C++] Fix build error on CentOS 6 x86_64 with bundled double-conversion
add d2be6a5 ARROW-6453: [C++] More informative error messages with S3
add 8cfa163 ARROW-6447: [C++] Allow rest of arrow_objlib to build in parallel while memory_pool.cc is waiting on jemalloc_ep
add 4a5d10e ARROW-5558: [C++] Support Array::View on arrays with non-zero offset
add 26d72f3 ARROW-6318: [Integration] Run tests against pregenerated files
add 45e41ca ARROW-6417: [C++][Parquet] Miscellaneous optimizations yielding slightly better Parquet binary read performance
add b829f53 ARROW-6385: [C++] Use xxh3 instead of custom hashing code for non-tiny strings
add 5931d59 ARROW-6443: [CI][Crossbow] Nightly conda osx builds fail
add 2620ed1 ARROW-6242: [C++][Dataset] Implement Dataset, Scanner and ScannerBuilder
add f0efc3b ARROW-6461: [Java] Prevent EchoServer from closing the client socket after writing
add 40d08a7 ARROW-6433: [Java][CI] Fix java docker image
add c0dbf71 ARROW-6369: [C++] Handle Array.to_pandas case for type=list<bool>
add 200e308 ARROW-6120: [C++] Forbid use of <iostream> in public header files
add b8ebc9d ARROW-6475: [C++] Don't try to dictionary encode dictionary arrays
add 53c5af0 ARROW-6478: [C++] Revert to jemalloc stable-4 until we understand 5.2.x performance issues
add e29e267 ARROW-6435: [Python] Use pandas null coding consistently on List and Struct types
add a89300b ARROW-6476: [Java][CI] Fix java docker build script
add 1137de9 ARROW-5292: [C++] Work around symbol visibility issues so building static libraries is not necessary when building unit tests on WIN32 platform
add d7ef11f ARROW-6171: [R][CI] Fix R library search path
add fb51ecf ARROW-6356: [Java] Avro adapter implement Enum type and nested Record
add 03e6c0b ARROW-4880: [Python] Rehabilitate ASV benchmark build scripts
add 1f893a8 ARROW-3933: [C++][Parquet] Handle non-nullable struct children when reading Parquet file, better error messages
add 9ca682b ARROW-5125: [Python] Round-trip extreme dates on windows
add 6ed87b1 ARROW-6477: [Packaging][Crossbow] Use Azure Pipelines to build linux packages
add 0158ae1 ARROW-6446: [OSX][Python][Wheel] Turn off ORC feature in the wheel building scripts
add d6bc0b6 ARROW-6326: [C++] Nullable fields when converting std::tuple to Table
add 72f4cac ARROW-6100: [Rust] Pin to specific nightly rust for reproducible/stable builds
add 3145e9b ARROW-6408: [Rust] use "if cfg!" pattern
add dadfd48 ARROW-6487: [Rust] [DataFusion] Introduce common test module
add 7acada9 ARROW-6427: [GLib] Add support for column names autogeneration CSV read option
add e9f35a8 ARROW-6331: [Java] Incorporate ErrorProne into the java build
add 7749d88 ARROW-6489: [Developer][Documentation] Fix merge script and readme
add 1f57ba1 ARROW-6491: [Java][Hotfix] fix master fail caused by ErrorProne
add 04c8e89 ARROW-6490: [Java][Memory] Log error for leak in allocator close
add 92f16e3 ARROW-6368: [C++][Dataset] Add interface for "projecting" RecordBatch from one schema to another, inserting null values where needed
add 9c2694e ARROW-5374: [Python][C++] Improve ipc.read_record_batch docstring, fix IPC message type error messages generated in C++
add 62403aa ARROW-3643: [Rust] optimize BooleanBufferBuilder::append_slice
add 1bdb4a0 ARROW-5722: [Rust] Implement Debug for List/Struct/BinaryArray
add ef426a7 ARROW-6365: [R] Should be able to coerce numeric to integer with schema
add 44e7f1d ARROW-6492: [Python] Handle pandas_metadata created by fastparquet with missing field_name
add 19681ff ARROW-6465: [Python] Improvement to Windows build instructions
add 74d8296 ARROW-3762: [Python] Add large_memory unit test exercising BYTE_ARRAY overflow edge cases from ARROW-3762
add 4f7ead8 ARROW-6413: [R] Support autogenerating column names
add c97c64b ARROW-3651: [Python] Handle 'datetime' logical type when reconstructing pandas columns from custom metadata
add dd29b04 ARROW-6300: [C++] Add Abort() method to streams
add 165b02d ARROW-5471: [C++][Gandiva] Array offset is ignored in Gandiva projector
add a32112f ARROW-6292: [C++] Add option to use the mimalloc allocator
add c0baff4 ARROW-5743: [C++] Add cmake option and macros for enabling large memory tests
add 1c746c5 ARROW-6484: [Java] Enable create indexType for DictionaryEncoding according to dictionary value count
add 1dfa258 ARROW-6502: [GLib][CI] Pin gobject-introspection gem to 3.3.7
add 3f2a33f ARROW-6480: [Crossbow] Summary report e-mailer with polling logic
add b1025c2 ARROW-6481: [C++] Avoid copying large ConvertOptions
add 0fbaff6 ARROW-5736: [Format][C++] Support small bit-width indices in sparse tensor
add 9dec79b ARROW-5505: [R] Normalize file and class names, stop masking base R functions, add vignette, improve documentation
add 6d25dfd ARROW-5646: [Crossbow][Documentation] Move the user guide to the Sphinx documentation
add 3437c97 ARROW-6524: [Developer][Packaging] Nightly build report's subject should contain Arrow
add 1d27386 ARROW-6506: [C++] Fix validation of ExtensionArray with struct storage type
add 28bfd2b ARROW-6101: [Rust] [DataFusion] Parallel execution of physical query plan
add b23c989 ARROW-6426: [FlightRPC][C++][Java] Expose gRPC configuration knobs
add 6f72457 ARROW-5450: [Python] Always return datetime.datetime in TimestampValue.as_py for units other than nanoseconds
add 54dfc00 ARROW-1324: [C++] Add support for bundled Boost with MSVC
add 2cedd6b ARROW-6015: [Python] Add note to python/README.md about installing Visual C++ Redistributable on Windows when using pip
add a6b118b ARROW-6522: [Python] Fix failing pandas tests on older pandas / older python
add d2b6d11 ARROW-6243: [C++][Dataset] Filter expressions
add ddfebf6 ARROW-6360: [R] Update support for compression
new 9514e2e ARROW-6314: [C++] Implement IPC message format alignment changes, provide backwards compatibility and "legacy" option to emit old message format
new fa96d4f ARROW-6316: [Go] implement new ARROW format with 32b-aligned buffers
new 0352456 ARROW-6315: [Java] Make change to ensure flatbuffer reads are aligned
This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version. This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:
* -- * -- B -- O -- O -- O (4f9b887)
\
N -- N -- N refs/heads/ARROW-6313-flatbuffer-alignment (0352456)
You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.
Any revisions marked "omit" are not gone; other references still
refer to them. Any revisions marked "discard" are gone forever.
The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.github/workflows/windows-msvc-cpp.yml | 52 +
.travis.yml | 1 +
LICENSE.txt | 74 +
appveyor.yml | 3 +
c_glib/Gemfile | 2 +-
c_glib/arrow-glib/basic-array.cpp | 29 +
c_glib/arrow-glib/basic-array.h | 5 +-
c_glib/arrow-glib/reader.cpp | 29 +-
c_glib/arrow-glib/record-batch.cpp | 15 +-
c_glib/test/test-array.rb | 27 +
c_glib/test/test-csv-reader.rb | 17 +
ci/appveyor-cpp-setup.bat | 5 +
ci/conda_env_cpp.yml | 1 +
c_glib/Gemfile => ci/conda_env_crossbow.txt | 14 +-
ci/cpp-msvc-build-main.bat | 2 +
ci/docker_build_cpp.sh | 4 +-
ci/docker_build_java.sh | 6 +-
ci/travis_before_script_c_glib.sh | 4 +-
ci/travis_before_script_cpp.sh | 6 +
ci/travis_install_linux.sh | 8 +
ci/travis_script_integration.sh | 3 +-
cpp/CMakeLists.txt | 28 +-
cpp/build-support/lint_cpp_cli.py | 1 +
cpp/build-support/run_cpplint.py | 1 +
cpp/cmake_modules/BuildUtils.cmake | 3 +
cpp/cmake_modules/DefineOptions.cmake | 6 +
cpp/cmake_modules/ThirdpartyToolchain.cmake | 202 ++-
cpp/examples/arrow/row-wise-conversion-example.cc | 12 +-
cpp/examples/parquet/CMakeLists.txt | 12 +-
cpp/src/arrow/CMakeLists.txt | 33 +-
cpp/src/arrow/allocator.h | 3 +
cpp/src/arrow/array.cc | 198 ++-
cpp/src/arrow/array.h | 31 +-
cpp/src/arrow/array/builder_binary.cc | 13 +-
cpp/src/arrow/array/builder_binary.h | 2 +-
cpp/src/arrow/array/builder_union.cc | 8 +-
cpp/src/arrow/array_dict_test.cc | 102 +-
cpp/src/arrow/array_test.cc | 38 +-
cpp/src/arrow/array_view_test.cc | 30 +-
cpp/src/arrow/buffer_builder.h | 13 +-
cpp/src/arrow/builder_benchmark.cc | 48 +
cpp/src/arrow/compute/kernels/compare.cc | 5 +-
cpp/src/arrow/csv/column_builder.cc | 8 +-
cpp/src/arrow/csv/column_builder_test.cc | 92 +-
cpp/src/arrow/csv/converter.h | 4 +-
cpp/src/arrow/csv/converter_benchmark.cc | 12 +
cpp/src/arrow/csv/options.h | 7 +-
cpp/src/arrow/csv/reader.cc | 40 +-
cpp/src/arrow/dataset/CMakeLists.txt | 55 +-
cpp/src/arrow/dataset/api.h | 1 +
cpp/src/arrow/dataset/dataset.cc | 15 +
cpp/src/arrow/dataset/dataset.h | 21 +-
cpp/src/arrow/dataset/dataset_internal.h | 196 +++
cpp/src/arrow/dataset/dataset_test.cc | 149 +-
cpp/src/arrow/dataset/file_parquet_test.cc | 82 +
cpp/src/arrow/dataset/filter.cc | 942 ++++++++++++
cpp/src/arrow/dataset/filter.h | 372 ++++-
cpp/src/arrow/dataset/filter_test.cc | 241 +++
cpp/src/arrow/dataset/scanner.cc | 79 +-
cpp/src/arrow/dataset/scanner.h | 52 +-
.../dataset/{dataset_test.cc => scanner_test.cc} | 46 +-
cpp/src/arrow/dataset/test_util.h | 182 +--
cpp/src/arrow/dataset/type_fwd.h | 15 +
cpp/src/arrow/extension_type_test.cc | 59 +
cpp/src/arrow/filesystem/CMakeLists.txt | 11 +
cpp/src/arrow/filesystem/filesystem.cc | 26 +
cpp/src/arrow/filesystem/filesystem.h | 10 +
cpp/src/arrow/filesystem/filesystem_test.cc | 12 +-
cpp/src/arrow/filesystem/localfs.cc | 12 +
cpp/src/arrow/filesystem/localfs.h | 1 +
cpp/src/arrow/filesystem/mockfs.cc | 41 +-
cpp/src/arrow/filesystem/mockfs.h | 1 +
cpp/src/arrow/filesystem/path_util.cc | 7 +
cpp/src/arrow/filesystem/path_util.h | 9 +
cpp/src/arrow/filesystem/s3_internal.h | 186 +++
cpp/src/arrow/filesystem/s3fs.cc | 1264 ++++++++++++++++
cpp/src/arrow/filesystem/{mockfs.h => s3fs.h} | 87 +-
cpp/src/arrow/filesystem/s3fs_narrative_test.cc | 231 +++
cpp/src/arrow/filesystem/s3fs_test.cc | 740 +++++++++
cpp/src/arrow/filesystem/test_util.cc | 192 ++-
cpp/src/arrow/filesystem/test_util.h | 42 +-
cpp/src/arrow/flight/client.cc | 71 +-
cpp/src/arrow/flight/client.h | 25 +
cpp/src/arrow/flight/flight_benchmark.cc | 185 ++-
cpp/src/arrow/flight/flight_test.cc | 172 ++-
cpp/src/arrow/flight/internal.cc | 27 +-
cpp/src/arrow/flight/internal.h | 4 +
cpp/src/arrow/flight/perf_server.cc | 24 +
cpp/src/arrow/flight/server.cc | 38 +-
cpp/src/arrow/flight/server.h | 23 +
cpp/src/arrow/flight/test_util.cc | 71 +-
cpp/src/arrow/flight/test_util.h | 34 +-
cpp/src/arrow/flight/types.cc | 19 +
cpp/src/arrow/flight/types.h | 28 +
cpp/src/arrow/io/buffered.cc | 29 +-
cpp/src/arrow/io/buffered.h | 2 +
cpp/src/arrow/io/compressed.cc | 56 +-
cpp/src/arrow/io/compressed.h | 2 +
cpp/src/arrow/io/compressed_test.cc | 37 +-
cpp/src/arrow/io/file.cc | 108 +-
cpp/src/arrow/io/file.h | 5 +
cpp/src/arrow/io/file_test.cc | 58 +-
cpp/src/arrow/io/interfaces.cc | 14 +
cpp/src/arrow/io/interfaces.h | 22 +
cpp/src/arrow/io/memory.cc | 26 +-
cpp/src/arrow/io/memory.h | 2 +
cpp/src/arrow/io/memory_benchmark.cc | 48 +-
.../{dataset/scanner.cc => io/util_internal.h} | 15 +-
cpp/src/arrow/ipc/metadata_internal.cc | 53 +-
cpp/src/arrow/ipc/metadata_internal.h | 9 +
cpp/src/arrow/ipc/read_write_test.cc | 184 ++-
cpp/src/arrow/ipc/reader.cc | 55 +-
cpp/src/arrow/memory_pool.cc | 337 +++--
cpp/src/arrow/memory_pool.h | 25 +-
cpp/src/arrow/memory_pool_test.cc | 54 +-
cpp/src/arrow/python/arrow_to_pandas.cc | 6 +
cpp/src/arrow/python/extension_type.cc | 26 +-
cpp/src/arrow/python/extension_type.h | 11 +-
cpp/src/arrow/python/flight.cc | 31 +
cpp/src/arrow/python/flight.h | 19 +
cpp/src/arrow/python/helpers.cc | 7 +
cpp/src/arrow/python/helpers.h | 3 +
cpp/src/arrow/python/io.cc | 63 +-
cpp/src/arrow/python/io.h | 2 +
cpp/src/arrow/python/numpy_to_arrow.cc | 2 -
cpp/src/arrow/python/pyarrow_lib.h | 2 -
cpp/src/arrow/python/python_to_arrow.cc | 263 ++--
cpp/src/arrow/python/util/datetime.h | 15 +-
cpp/src/arrow/record_batch.cc | 54 +-
cpp/src/arrow/record_batch.h | 16 +-
cpp/src/arrow/scalar.cc | 27 +
cpp/src/arrow/scalar.h | 18 +
cpp/src/arrow/sparse_tensor.cc | 189 ++-
cpp/src/arrow/sparse_tensor.h | 30 +-
cpp/src/arrow/sparse_tensor_test.cc | 389 +++--
cpp/src/arrow/stl.h | 26 +-
cpp/src/arrow/stl_test.cc | 172 +++
cpp/src/arrow/table.cc | 2 +-
cpp/src/arrow/testing/generator.cc | 94 ++
cpp/src/arrow/testing/generator.h | 234 +++
cpp/src/arrow/testing/gtest_util.h | 71 +-
cpp/src/arrow/testing/util.cc | 79 +-
cpp/src/arrow/testing/util.h | 26 +-
cpp/src/arrow/type.cc | 28 +-
cpp/src/arrow/type.h | 19 +-
cpp/src/arrow/type_benchmark.cc | 6 +-
cpp/src/arrow/type_test.cc | 8 +-
cpp/src/arrow/type_traits.h | 83 +-
cpp/src/arrow/util/CMakeLists.txt | 1 +
cpp/src/arrow/util/bit_util.cc | 2 +
cpp/src/arrow/util/functional.h | 3 +-
cpp/src/arrow/util/hashing.h | 44 +-
cpp/src/arrow/util/io_util.cc | 28 +-
cpp/src/arrow/util/io_util.h | 3 +
cpp/src/arrow/util/io_util_test.cc | 41 +
cpp/src/arrow/util/iterator.h | 145 +-
cpp/src/arrow/util/iterator_test.cc | 124 ++
cpp/src/arrow/util/print.h | 51 +
cpp/src/arrow/vendored/string_view.hpp | 32 +-
cpp/src/arrow/{dataset/api.h => vendored/xxhash.h} | 17 +-
.../src/arrow/vendored/xxhash/README.md | 14 +-
cpp/src/arrow/vendored/xxhash/xxh3.h | 1583 ++++++++++++++++++++
cpp/src/arrow/vendored/xxhash/xxhash.c | 821 +++++-----
cpp/src/arrow/vendored/xxhash/xxhash.h | 328 +++-
cpp/src/arrow/visitor_inline.h | 72 +-
cpp/src/gandiva/CMakeLists.txt | 4 +-
cpp/src/gandiva/annotator.cc | 10 +-
cpp/src/gandiva/bitmap_accumulator.cc | 45 +-
cpp/src/gandiva/bitmap_accumulator.h | 14 +-
cpp/src/gandiva/bitmap_accumulator_test.cc | 57 +-
cpp/src/gandiva/compiled_expr.h | 2 +-
cpp/src/gandiva/decimal_scalar.h | 1 -
cpp/src/gandiva/eval_batch.h | 15 +-
cpp/src/gandiva/filter.cc | 2 +-
cpp/src/gandiva/function_holder_registry.h | 3 +
cpp/src/gandiva/function_registry_math_ops.cc | 10 +-
cpp/src/gandiva/gdv_function_stubs.cc | 22 +
cpp/src/gandiva/in_holder.h | 6 +-
cpp/src/gandiva/llvm_generator.cc | 57 +-
cpp/src/gandiva/llvm_generator.h | 8 +-
cpp/src/gandiva/llvm_generator_test.cc | 4 +-
cpp/src/gandiva/random_generator_holder.cc | 45 +
cpp/src/gandiva/random_generator_holder.h | 60 +
cpp/src/gandiva/random_generator_holder_test.cc | 103 ++
cpp/src/gandiva/tests/filter_test.cc | 45 +
cpp/src/gandiva/tests/huge_table_test.cc | 8 +-
cpp/src/gandiva/tests/projector_test.cc | 38 +
cpp/src/gandiva/tree_expr_builder.cc | 1 +
cpp/src/parquet/CMakeLists.txt | 51 +-
cpp/src/parquet/arrow/arrow_reader_writer_test.cc | 2 +-
cpp/src/parquet/arrow/reader.cc | 58 +-
cpp/src/parquet/column_io_benchmark.cc | 2 +-
cpp/src/parquet/column_reader.cc | 27 +-
cpp/src/parquet/column_writer.cc | 2 +-
cpp/src/parquet/column_writer_test.cc | 2 +-
cpp/src/parquet/encoding.cc | 497 +++---
cpp/src/parquet/encoding.h | 17 +-
cpp/src/parquet/encoding_benchmark.cc | 223 ++-
cpp/src/parquet/encoding_test.cc | 74 +-
cpp/src/parquet/file_deserialize_test.cc | 2 +-
cpp/src/parquet/metadata.cc | 2 +-
cpp/src/parquet/metadata_test.cc | 2 +-
cpp/src/parquet/schema.cc | 2 +-
cpp/src/parquet/schema_internal.h | 6 +-
cpp/src/parquet/schema_test.cc | 10 +-
cpp/src/parquet/statistics_test.cc | 2 +-
cpp/src/parquet/{thrift.h => thrift_internal.h} | 0
cpp/src/plasma/CMakeLists.txt | 1 -
cpp/src/plasma/client.cc | 2 +-
...17c897976c60b0e6e4f4a365c751027244dada7a.tar.gz | Bin 0 -> 454719 bytes
cpp/thirdparty/versions.txt | 25 +-
dev/README.md | 16 +-
dev/archery/archery/utils/cmake.py | 23 +-
dev/merge_arrow_pr.py | 10 +-
dev/release/rat_exclude_files.txt | 1 +
dev/tasks/README.md | 189 +--
.../conda-recipes/.ci_support/linux_python2.7.yaml | 22 +
.../conda-recipes/.ci_support/linux_python3.6.yaml | 22 +
.../conda-recipes/.ci_support/linux_python3.7.yaml | 23 +-
.../conda-recipes/.ci_support/osx_python2.7.yaml | 22 +
.../conda-recipes/.ci_support/osx_python3.6.yaml | 22 +
.../conda-recipes/.ci_support/osx_python3.7.yaml | 22 +
..._compilervs2015cxx_compilervs2015python3.6.yaml | 22 -
...pilervs2015cxx_compilervs2015python3.6vc14.yaml | 51 +
..._compilervs2015cxx_compilervs2015python3.7.yaml | 22 -
...pilervs2015cxx_compilervs2015python3.7vc14.yaml | 51 +
dev/tasks/conda-recipes/arrow-cpp/bld.bat | 6 +-
dev/tasks/conda-recipes/arrow-cpp/build.sh | 12 +-
dev/tasks/conda-recipes/arrow-cpp/meta.yaml | 32 +-
dev/tasks/conda-recipes/azure.linux.yml | 27 +-
dev/tasks/conda-recipes/azure.osx.yml | 30 +-
dev/tasks/conda-recipes/azure.win.yml | 26 +-
dev/tasks/conda-recipes/parquet-cpp/meta.yaml | 8 +-
dev/tasks/conda-recipes/pyarrow/bld.bat | 4 +-
dev/tasks/conda-recipes/pyarrow/build.sh | 13 +-
dev/tasks/conda-recipes/pyarrow/meta.yaml | 30 +-
dev/tasks/crossbow.py | 845 +++++++----
dev/tasks/gandiva-jars/build-cpp-linux.sh | 1 -
dev/tasks/linux-packages/azure.linux.arm64.yml | 82 +
dev/tasks/linux-packages/azure.linux.yml | 66 +
.../linux-packages/debian.ubuntu-xenial/control | 6 +-
.../plasma-store-server.install | 2 +-
dev/tasks/linux-packages/debian/control | 6 +-
.../debian/plasma-store-server.install | 2 +-
dev/tasks/linux-packages/package-task.rb | 17 +-
dev/tasks/linux-packages/travis.linux.arm64.yml | 75 -
dev/tasks/linux-packages/travis.linux.yml | 59 -
dev/tasks/linux-packages/yum/arrow.spec.in | 2 +-
dev/tasks/python-wheels/osx-build.sh | 5 +-
dev/tasks/tasks.yml | 500 ++++++-
dev/tasks/tests.yml | 411 -----
dev/tasks/upload-assets.py | 64 -
docker-compose.yml | 4 +-
docs/source/conf.py | 2 +-
docs/source/cpp/datatypes.rst | 2 +-
docs/source/developers/cpp.rst | 42 +-
docs/source/developers/crossbow.rst | 273 ++++
docs/source/developers/python.rst | 139 +-
docs/source/format/Columnar.rst | 1167 +++++++++++++++
docs/source/format/Guidelines.rst | 27 +-
docs/source/format/IPC.rst | 252 +---
docs/source/format/Integration.rst | 179 +++
docs/source/format/Layout.rst | 782 +---------
docs/source/format/Metadata.rst | 328 +---
docs/source/format/Other.rst | 63 +
r/NEWS.md => docs/source/format/README.md | 16 +-
docs/source/format/README.rst | 53 -
docs/source/format/Versioning.rst | 70 +
docs/source/index.rst | 18 +-
docs/source/python/api/arrays.rst | 1 +
docs/source/python/api/datatypes.rst | 11 +
docs/source/python/api/tables.rst | 3 +-
docs/source/python/benchmarks.rst | 8 +-
docs/source/python/extending_types.rst | 180 +++
format/Flight.proto | 16 +
format/SparseTensor.fbs | 65 +-
integration/hdfs/Dockerfile | 11 +-
integration/integration_test.py | 170 ++-
java/adapter/avro/pom.xml | 1 +
.../main/java/org/apache/arrow/AvroToArrow.java | 33 +-
.../java/org/apache/arrow/AvroToArrowConfig.java | 80 +
.../java/org/apache/arrow/AvroToArrowUtils.java | 358 ++++-
.../apache/arrow/AvroToArrowVectorIterator.java | 165 ++
...leTypeConsumer.java => AvroArraysConsumer.java} | 46 +-
.../arrow/consumers/AvroBooleanConsumer.java | 25 +-
.../apache/arrow/consumers/AvroBytesConsumer.java | 44 +-
.../apache/arrow/consumers/AvroDoubleConsumer.java | 25 +-
...{AvroIntConsumer.java => AvroEnumConsumer.java} | 32 +-
...oDoubleConsumer.java => AvroFixedConsumer.java} | 38 +-
.../apache/arrow/consumers/AvroFloatConsumer.java | 25 +-
.../apache/arrow/consumers/AvroIntConsumer.java | 25 +-
.../apache/arrow/consumers/AvroLongConsumer.java | 25 +-
...lableTypeConsumer.java => AvroMapConsumer.java} | 46 +-
.../apache/arrow/consumers/AvroNullConsumer.java | 9 +-
.../apache/arrow/consumers/AvroStringConsumer.java | 43 +-
...ooleanConsumer.java => AvroStructConsumer.java} | 49 +-
.../apache/arrow/consumers/AvroUnionsConsumer.java | 36 +-
.../arrow/consumers/CompositeAvroConsumer.java | 39 +-
.../java/org/apache/arrow/consumers/Consumer.java | 10 +-
.../arrow/consumers/NullableTypeConsumer.java | 6 +
.../test/java/org/apache/arrow/AvroTestBase.java | 202 +++
.../org/apache/arrow/AvroToArrowIteratorTest.java | 161 ++
.../java/org/apache/arrow/AvroToArrowTest.java | 197 ++-
.../src/test/resources/schema/test_array.avsc} | 30 +-
.../src/test/resources/schema/test_fixed.avsc} | 30 +-
.../avro/src/test/resources/schema/test_map.avsc} | 30 +-
.../test/resources/schema/test_nested_record.avsc} | 38 +-
.../resources/schema/test_primitive_enum.avsc} | 30 +-
java/adapter/jdbc/pom.xml | 7 +
.../arrow/adapter/jdbc/ArrowVectorIterator.java | 72 +-
.../arrow/adapter/jdbc/JdbcToArrowUtils.java | 15 +-
.../arrow/adapter/jdbc/consumer/ArrayConsumer.java | 18 +-
.../adapter/jdbc/consumer/BigIntConsumer.java | 19 +-
.../adapter/jdbc/consumer/BinaryConsumer.java | 56 +-
.../arrow/adapter/jdbc/consumer/BitConsumer.java | 19 +-
.../arrow/adapter/jdbc/consumer/BlobConsumer.java | 2 +-
.../arrow/adapter/jdbc/consumer/ClobConsumer.java | 46 +-
.../jdbc/consumer/CompositeJdbcConsumer.java | 11 +-
.../arrow/adapter/jdbc/consumer/DateConsumer.java | 19 +-
.../adapter/jdbc/consumer/DecimalConsumer.java | 19 +-
.../adapter/jdbc/consumer/DoubleConsumer.java | 19 +-
.../arrow/adapter/jdbc/consumer/FloatConsumer.java | 19 +-
.../arrow/adapter/jdbc/consumer/IntConsumer.java | 19 +-
.../arrow/adapter/jdbc/consumer/JdbcConsumer.java | 2 +-
.../adapter/jdbc/consumer/SmallIntConsumer.java | 19 +-
.../arrow/adapter/jdbc/consumer/TimeConsumer.java | 19 +-
.../adapter/jdbc/consumer/TimestampConsumer.java | 19 +-
.../adapter/jdbc/consumer/TinyIntConsumer.java | 19 +-
.../adapter/jdbc/consumer/VarCharConsumer.java | 34 +-
.../arrow/adapter/jdbc/JdbcFieldInfoTest.java | 1 +
.../arrow/adapter/jdbc/JdbcToArrowConfigTest.java | 1 +
java/adapter/orc/pom.xml | 1 +
java/algorithm/pom.xml | 1 +
.../algorithm/deduplicate/DeduplicationUtils.java | 97 ++
.../deduplicate/VectorRunDeduplicator.java | 109 ++
.../HashTableBasedDictionaryBuilder.java | 20 +-
.../SearchTreeBasedDictionaryBuilder.java | 26 +-
.../algorithm/sort/StableVectorComparator.java | 63 +
.../deduplicate/TestDeduplicationUtils.java | 136 ++
.../deduplicate/TestVectorRunDeduplicator.java | 132 ++
.../algorithm/sort/TestStableVectorComparator.java | 132 ++
java/flight/pom.xml | 1 +
.../org/apache/arrow/flight/AsyncPutListener.java | 6 +-
.../java/org/apache/arrow/flight/FlightClient.java | 9 +
.../org/apache/arrow/flight/FlightDescriptor.java | 2 +-
.../org/apache/arrow/flight/FlightProducer.java | 13 +
.../java/org/apache/arrow/flight/FlightServer.java | 15 +-
.../org/apache/arrow/flight/FlightService.java | 24 +-
.../java/org/apache/arrow/flight/SchemaResult.java | 85 ++
.../java/org/apache/arrow/flight/StreamPipe.java | 13 +-
.../org/apache/arrow/flight/SyncPutListener.java | 6 +-
.../arrow/flight/TestApplicationMetadata.java | 38 +-
.../apache/arrow/flight/TestBasicOperation.java | 8 +
.../org/apache/arrow/flight/TestServerOptions.java | 21 +
.../apache/arrow/flight/auth/TestBasicAuth.java | 2 +-
java/format/pom.xml | 1 +
java/gandiva/pom.xml | 1 +
.../arrow/gandiva/evaluator/ProjectorTest.java | 58 +
java/memory/pom.xml | 1 +
.../java/org/apache/arrow/memory/Accountant.java | 3 +-
.../org/apache/arrow/memory/BaseAllocator.java | 15 +-
.../apache/arrow/memory/util/ArrowBufPointer.java | 13 +-
.../arrow/memory/util/TestArrowBufPointer.java | 5 +
java/performance/pom.xml | 4 +-
.../memory/util/ArrowBufPointerBenchmarks.java | 108 ++
java/plasma/pom.xml | 1 +
java/pom.xml | 37 +
java/tools/pom.xml | 1 +
.../java/org/apache/arrow/tools/EchoServer.java | 2 +-
java/vector/pom.xml | 6 +
.../org/apache/arrow/vector/BaseIntVector.java | 2 +-
.../java/org/apache/arrow/vector/Float4Vector.java | 16 +-
.../java/org/apache/arrow/vector/Float8Vector.java | 16 +-
...BaseIntVector.java => FloatingPointVector.java} | 27 +-
.../java/org/apache/arrow/vector/UInt4Vector.java | 2 +-
.../arrow/vector/compare/ApproxEqualsVisitor.java | 196 +--
.../org/apache/arrow/vector/compare/Range.java | 85 ++
.../arrow/vector/compare/RangeEqualsVisitor.java | 262 ++--
.../arrow/vector/compare/VectorEqualsVisitor.java | 33 +-
.../arrow/vector/complex/BaseListVector.java} | 42 +-
.../vector/complex/BaseRepeatedValueVector.java | 3 +-
.../arrow/vector/complex/FixedSizeListVector.java | 12 +-
.../apache/arrow/vector/complex/ListVector.java | 12 +-
.../apache/arrow/vector/dictionary/Dictionary.java | 2 +-
.../arrow/vector/dictionary/DictionaryEncoder.java | 101 +-
.../vector/dictionary/DictionaryHashTable.java | 15 +-
.../vector/dictionary/ListSubfieldEncoder.java | 131 ++
.../apache/arrow/vector/types/pojo/FieldType.java | 5 +
.../util/ElementAddressableVectorIterator.java | 86 ++
.../apache/arrow/vector/TestDictionaryVector.java | 141 ++
.../org/apache/arrow/vector/TestValueVector.java | 79 +-
.../vector/compare/TestRangeEqualsVisitor.java | 66 +-
.../org/apache/arrow/vector/ipc/BaseFileTest.java | 2 +-
.../arrow/vector/ipc/MessageSerializerTest.java | 2 +-
.../util/TestElementAddressableVectorIterator.java | 135 ++
python/README.md | 7 +-
python/asv-build.sh | 19 +-
python/asv.conf.json | 16 +-
python/benchmarks/parquet.py | 76 +
python/manylinux1/build_arrow.sh | 11 +-
python/manylinux2010/build_arrow.sh | 11 +-
python/pyarrow/__init__.py | 5 +-
python/pyarrow/_csv.pyx | 31 +-
python/pyarrow/_flight.pyx | 126 ++
python/pyarrow/_parquet.pxd | 6 +
python/pyarrow/_parquet.pyx | 13 +-
python/pyarrow/flight.py | 2 +
python/pyarrow/includes/libarrow.pxd | 14 +-
python/pyarrow/includes/libarrow_flight.pxd | 33 +-
python/pyarrow/io.pxi | 10 +-
python/pyarrow/ipc.pxi | 38 +-
python/pyarrow/lib.pxd | 4 +
python/pyarrow/pandas_compat.py | 8 +-
python/pyarrow/parquet.py | 30 +
python/pyarrow/public-api.pxi | 9 +-
python/pyarrow/scalar.pxi | 93 +-
python/pyarrow/table.pxi | 154 +-
python/pyarrow/tests/conftest.py | 4 +-
python/pyarrow/tests/test_array.py | 12 +
python/pyarrow/tests/test_convert_builtin.py | 6 +-
python/pyarrow/tests/test_csv.py | 68 +
python/pyarrow/tests/test_cuda.py | 2 +-
python/pyarrow/tests/test_extension_type.py | 145 +-
python/pyarrow/tests/test_flight.py | 131 +-
python/pyarrow/tests/test_io.py | 42 +
python/pyarrow/tests/test_ipc.py | 15 +-
python/pyarrow/tests/test_pandas.py | 104 +-
python/pyarrow/tests/test_parquet.py | 69 +-
python/pyarrow/tests/test_scalars.py | 49 +
python/pyarrow/tests/test_schema.py | 8 +-
python/pyarrow/tests/test_table.py | 62 +-
python/pyarrow/tests/test_types.py | 8 +-
python/pyarrow/types.pxi | 143 +-
r/DESCRIPTION | 32 +-
r/Dockerfile | 7 +
r/Makefile | 2 +-
r/NAMESPACE | 122 +-
r/NEWS.md | 2 +-
r/R/RecordBatchReader.R | 138 --
r/R/RecordBatchWriter.R | 178 ---
r/R/{ArrayData.R => array-data.R} | 22 +-
r/R/array.R | 105 +-
r/R/arrow-package.R | 2 +-
r/R/arrowExports.R | 24 +-
r/R/buffer.R | 71 +-
r/R/{ChunkedArray.R => chunked-array.R} | 54 +-
r/R/compression.R | 110 +-
r/R/compute.R | 4 +-
r/R/csv.R | 299 ++--
r/R/dictionary.R | 22 +-
r/R/enums.R | 14 +-
r/R/feather.R | 228 ++-
r/R/{Field.R => field.R} | 60 +-
r/R/io.R | 358 ++---
r/R/json.R | 156 +-
r/R/{List.R => list.R} | 10 +-
r/R/{memory_pool.R => memory-pool.R} | 14 +-
r/R/message.R | 59 +-
r/R/parquet.R | 203 +--
r/R/{read_record_batch.R => read-record-batch.R} | 26 +-
r/R/read-table.R | 83 +
r/R/read_table.R | 83 -
r/R/record-batch-reader.R | 96 ++
r/R/record-batch-writer.R | 103 ++
r/R/{RecordBatch.R => record-batch.R} | 60 +-
r/R/{Schema.R => schema.R} | 60 +-
r/R/{Struct.R => struct.R} | 8 +-
r/R/{Table.R => table.R} | 46 +-
r/R/type.R | 241 +--
r/{tests/testthat/test-arrow.R => R/util.R} | 22 +-
r/R/{write_arrow.R => write-arrow.R} | 34 +-
r/README.Rmd | 2 +-
r/README.md | 6 +-
r/_pkgdown.yml | 86 +-
r/configure | 2 +-
r/data-raw/codegen.R | 3 +-
r/man/ArrayData.Rd | 28 +
r/man/BufferOutputStream.Rd | 17 -
r/man/BufferReader.Rd | 14 -
r/man/ChunkedArray.Rd | 44 +
r/man/Codec.Rd | 21 +
r/man/CompressedInputStream.Rd | 16 -
r/man/CompressedOutputStream.Rd | 19 -
r/man/{arrow__DataType.Rd => DataType.Rd} | 5 +-
...{arrow__DictionaryType.Rd => DictionaryType.Rd} | 9 +-
r/man/FeatherTableReader.Rd | 39 +-
r/man/FeatherTableWriter.Rd | 35 +-
r/man/Field.Rd | 39 +
r/man/FileOutputStream.Rd | 17 -
r/man/FixedSizeBufferWriter.Rd | 17 -
...{arrow__FixedWidthType.Rd => FixedWidthType.Rd} | 5 +-
r/man/InputStream.Rd | 46 +
r/man/JsonTableReader.Rd | 17 +
r/man/{arrow___MemoryPool.Rd => MemoryPool.Rd} | 7 +-
r/man/{arrow__ipc__Message.Rd => Message.Rd} | 9 +-
r/man/MessageReader.Rd | 18 +-
r/man/MockOutputStream.Rd | 14 -
r/man/OutputStream.Rd | 47 +
r/man/ParquetFileReader.Rd | 43 +
r/man/ParquetReaderProperties.Rd | 30 +
r/man/ReadableFile.Rd | 17 -
r/man/{arrow__RecordBatch.Rd => RecordBatch.Rd} | 7 +-
r/man/RecordBatchFileReader.Rd | 14 -
r/man/RecordBatchFileWriter.Rd | 23 -
r/man/RecordBatchReader.Rd | 39 +
r/man/RecordBatchStreamReader.Rd | 14 -
r/man/RecordBatchStreamWriter.Rd | 23 -
r/man/RecordBatchWriter.Rd | 46 +
r/man/Schema.Rd | 38 +
r/man/Table.Rd | 26 +
r/man/array.Rd | 52 +-
r/man/arrow__Array.Rd | 57 -
r/man/arrow__ArrayData.Rd | 28 -
r/man/arrow__Buffer.Rd | 21 -
r/man/arrow__ChunkedArray.Rd | 17 -
r/man/arrow__Column.Rd | 17 -
r/man/arrow__Field.Rd | 17 -
r/man/arrow__RecordBatchReader.Rd | 17 -
r/man/arrow__Schema.Rd | 29 -
r/man/arrow__Table.Rd | 17 -
r/man/arrow__io__BufferOutputStream.Rd | 18 -
r/man/arrow__io__BufferReader.Rd | 17 -
r/man/arrow__io__FileOutputStream.Rd | 17 -
r/man/arrow__io__FixedSizeBufferWriter.Rd | 17 -
r/man/arrow__io__InputStream.Rd | 17 -
r/man/arrow__io__MemoryMappedFile.Rd | 20 -
r/man/arrow__io__MockOutputStream.Rd | 17 -
r/man/arrow__io__OutputStream.Rd | 19 -
r/man/arrow__io__RandomAccessFile.Rd | 17 -
r/man/arrow__io__Readable.Rd | 17 -
r/man/arrow__io__ReadableFile.Rd | 17 -
r/man/arrow__ipc__MessageReader.Rd | 17 -
r/man/arrow__ipc__RecordBatchFileReader.Rd | 17 -
r/man/arrow__ipc__RecordBatchFileWriter.Rd | 40 -
r/man/arrow__ipc__RecordBatchStreamReader.Rd | 17 -
r/man/arrow__ipc__RecordBatchStreamWriter.Rd | 40 -
r/man/arrow__ipc__RecordBatchWriter.Rd | 28 -
r/man/arrow__json__TableReader.Rd | 18 -
r/man/buffer.Rd | 25 +-
r/man/chunked_array.Rd | 16 -
r/man/compression.Rd | 31 +
r/man/compression_codec.Rd | 14 -
r/man/csv_convert_options.Rd | 10 +-
r/man/csv_read_options.Rd | 9 +-
r/man/csv_table_reader.Rd | 2 +-
r/man/data-type.Rd | 4 +-
r/man/default_memory_pool.Rd | 8 +-
r/man/dictionary.Rd | 2 +-
r/man/enums.Rd | 2 +-
r/man/field.Rd | 25 -
r/man/make_readable_file.Rd | 20 +
r/man/mmap_create.Rd | 2 +-
r/man/parquet_arrow_reader_properties.Rd | 15 -
r/man/parquet_file_reader.Rd | 18 -
r/man/read_delim_arrow.Rd | 35 +-
r/man/read_feather.Rd | 8 +-
r/man/read_json_arrow.Rd | 8 +-
r/man/read_parquet.Rd | 17 +-
r/man/read_record_batch.Rd | 12 +-
r/man/read_schema.Rd | 2 +-
r/man/read_table.Rd | 24 +-
r/man/record_batch.Rd | 10 +-
r/man/schema.Rd | 20 -
r/man/table.Rd | 19 -
r/man/write_arrow.Rd | 14 +-
r/man/write_feather.Rd | 4 +-
r/man/write_feather_RecordBatch.Rd | 17 -
r/man/write_parquet.Rd | 10 +-
r/src/array_from_vector.cpp | 53 +-
.../{array__to_vector.cpp => array_to_vector.cpp} | 0
r/src/arrowExports.cpp | 108 +-
r/src/csv.cpp | 14 +
r/src/datatype.cpp | 4 +-
r/src/recordbatch.cpp | 2 +-
r/src/table.cpp | 14 +-
r/tests/testthat/helper-arrow.R | 2 +-
r/tests/testthat/test-Array.R | 152 +-
r/tests/testthat/test-RecordBatch.R | 25 +-
r/tests/testthat/test-Table.R | 38 +-
.../{test-arraydata.R => test-array-data.R} | 4 +-
r/tests/testthat/test-arrow.R | 17 +
.../{test-bufferreader.R => test-buffer-reader.R} | 18 +-
r/tests/testthat/test-buffer.R | 41 +-
.../{test-chunkedarray.R => test-chunked-array.R} | 38 +-
r/tests/testthat/test-compressed.R | 23 +-
r/tests/testthat/{test-arrow-csv.R => test-csv.R} | 41 +-
r/tests/testthat/test-data-type.R | 2 +-
r/tests/testthat/test-feather.R | 6 +-
r/tests/testthat/test-field.R | 2 +-
r/tests/testthat/test-json.R | 12 +-
...{test-messagereader.R => test-message-reader.R} | 46 +-
r/tests/testthat/test-message.R | 22 +-
...ead_record_batch.R => test-read-record-batch.R} | 16 +-
r/tests/testthat/test-read-write.R | 14 +-
...ordbatchreader.R => test-record-batch-reader.R} | 36 +-
r/tests/testthat/test-schema.R | 24 +-
...-cputhreadpoolcapacity.R => test-thread-pool.R} | 0
r/tests/testthat/test-type.R | 6 +-
r/vignettes/arrow.Rmd | 86 ++
.../red-arrow/benchmark/raw-records/decimal128.yml | 4 +-
.../red-arrow/benchmark/raw-records/dictionary.yml | 22 +-
ruby/red-arrow/benchmark/raw-records/int64.yml | 4 +-
ruby/red-arrow/benchmark/raw-records/list.yml | 10 +-
ruby/red-arrow/benchmark/raw-records/timestamp.yml | 15 +-
.../red-arrow/benchmark/values/boolean.yml | 27 +-
.../red-arrow/benchmark/values/decimal128.yml | 42 +-
.../int64.yml => values/dictionary.yml} | 47 +-
.../red-arrow/benchmark/values/int64.yml | 27 +-
.../{raw-records/int64.yml => values/list.yml} | 41 +-
.../red-arrow/benchmark/values/string.yml | 28 +-
.../int64.yml => values/timestamp.yml} | 48 +-
ruby/red-arrow/ext/arrow/arrow.cpp | 13 +
ruby/red-arrow/ext/arrow/converters.cpp | 42 +
ruby/red-arrow/ext/arrow/converters.hpp | 626 ++++++++
ruby/red-arrow/ext/arrow/raw-records.cpp | 635 +-------
ruby/red-arrow/ext/arrow/red-arrow.hpp | 11 +
ruby/red-arrow/ext/arrow/values.cpp | 154 ++
ruby/red-arrow/lib/arrow/array.rb | 4 +
ruby/red-arrow/lib/arrow/loader.rb | 8 +
ruby/red-arrow/red-arrow.gemspec | 2 +-
ruby/red-arrow/test/values/test-basic-arrays.rb | 284 ++++
.../test/values/test-dense-union-array.rb | 487 ++++++
ruby/red-arrow/test/values/test-list-array.rb | 497 ++++++
.../test/values/test-sparse-union-array.rb | 477 ++++++
ruby/red-arrow/test/values/test-struct-array.rb | 452 ++++++
rust/arrow/benches/arithmetic_kernels.rs | 20 +
rust/arrow/examples/builders.rs | 91 +-
rust/arrow/src/array/array.rs | 143 +-
rust/arrow/src/array/builder.rs | 38 +-
rust/arrow/src/buffer.rs | 41 +-
rust/arrow/src/compute/kernels/arithmetic.rs | 135 +-
rust/arrow/src/compute/kernels/comparison.rs | 101 +-
rust/arrow/src/compute/util.rs | 119 +-
rust/arrow/src/datatypes.rs | 40 +
rust/datafusion/Cargo.toml | 2 +-
rust/datafusion/src/datasource/csv.rs | 19 +-
rust/datafusion/src/execution/aggregate.rs | 36 +-
rust/datafusion/src/execution/context.rs | 173 ++-
rust/datafusion/src/execution/physical_plan/csv.rs | 44 +-
.../src/execution/physical_plan/datasource.rs | 73 +
rust/datafusion/src/execution/physical_plan/mod.rs | 1 +
.../src/execution/physical_plan/projection.rs | 57 +-
rust/datafusion/src/execution/projection.rs | 4 +-
rust/datafusion/src/execution/table_impl.rs | 24 +-
rust/datafusion/src/lib.rs | 3 +
rust/datafusion/src/logicalplan.rs | 6 +-
rust/datafusion/src/optimizer/type_coercion.rs | 14 +
rust/datafusion/src/test/mod.rs | 101 ++
rust/rust-toolchain | 2 +-
testing | 2 +-
650 files changed, 29238 insertions(+), 10784 deletions(-)
create mode 100644 .github/workflows/windows-msvc-cpp.yml
copy c_glib/Gemfile => ci/conda_env_crossbow.txt (89%)
create mode 100644 cpp/src/arrow/dataset/dataset_internal.h
create mode 100644 cpp/src/arrow/dataset/filter.cc
create mode 100644 cpp/src/arrow/dataset/filter_test.cc
copy cpp/src/arrow/dataset/{dataset_test.cc => scanner_test.cc} (55%)
create mode 100644 cpp/src/arrow/filesystem/s3_internal.h
create mode 100644 cpp/src/arrow/filesystem/s3fs.cc
copy cpp/src/arrow/filesystem/{mockfs.h => s3fs.h} (50%)
create mode 100644 cpp/src/arrow/filesystem/s3fs_narrative_test.cc
create mode 100644 cpp/src/arrow/filesystem/s3fs_test.cc
copy cpp/src/arrow/{dataset/scanner.cc => io/util_internal.h} (79%)
create mode 100644 cpp/src/arrow/testing/generator.cc
create mode 100644 cpp/src/arrow/testing/generator.h
create mode 100644 cpp/src/arrow/util/iterator_test.cc
create mode 100644 cpp/src/arrow/util/print.h
copy cpp/src/arrow/{dataset/api.h => vendored/xxhash.h} (77%)
copy r/NEWS.md => cpp/src/arrow/vendored/xxhash/README.md (50%)
create mode 100644 cpp/src/arrow/vendored/xxhash/xxh3.h
create mode 100644 cpp/src/gandiva/random_generator_holder.cc
create mode 100644 cpp/src/gandiva/random_generator_holder.h
create mode 100644 cpp/src/gandiva/random_generator_holder_test.cc
rename cpp/src/parquet/{thrift.h => thrift_internal.h} (100%)
create mode 100644 cpp/thirdparty/jemalloc/17c897976c60b0e6e4f4a365c751027244dada7a.tar.gz
delete mode 100644 dev/tasks/conda-recipes/.ci_support/win_c_compilervs2015cxx_compilervs2015python3.6.yaml
create mode 100644 dev/tasks/conda-recipes/.ci_support/win_c_compilervs2015cxx_compilervs2015python3.6vc14.yaml
delete mode 100644 dev/tasks/conda-recipes/.ci_support/win_c_compilervs2015cxx_compilervs2015python3.7.yaml
create mode 100644 dev/tasks/conda-recipes/.ci_support/win_c_compilervs2015cxx_compilervs2015python3.7vc14.yaml
create mode 100644 dev/tasks/linux-packages/azure.linux.arm64.yml
create mode 100644 dev/tasks/linux-packages/azure.linux.yml
delete mode 100644 dev/tasks/linux-packages/travis.linux.arm64.yml
delete mode 100644 dev/tasks/linux-packages/travis.linux.yml
delete mode 100644 dev/tasks/tests.yml
delete mode 100755 dev/tasks/upload-assets.py
create mode 100644 docs/source/developers/crossbow.rst
create mode 100644 docs/source/format/Columnar.rst
create mode 100644 docs/source/format/Integration.rst
create mode 100644 docs/source/format/Other.rst
copy r/NEWS.md => docs/source/format/README.md (50%)
delete mode 100644 docs/source/format/README.rst
create mode 100644 docs/source/format/Versioning.rst
create mode 100644 java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowConfig.java
create mode 100644 java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowVectorIterator.java
copy java/adapter/avro/src/main/java/org/apache/arrow/consumers/{NullableTypeConsumer.java => AvroArraysConsumer.java} (56%)
copy java/adapter/avro/src/main/java/org/apache/arrow/consumers/{AvroIntConsumer.java => AvroEnumConsumer.java} (69%)
copy java/adapter/avro/src/main/java/org/apache/arrow/consumers/{AvroDoubleConsumer.java => AvroFixedConsumer.java} (62%)
copy java/adapter/avro/src/main/java/org/apache/arrow/consumers/{NullableTypeConsumer.java => AvroMapConsumer.java} (56%)
copy java/adapter/avro/src/main/java/org/apache/arrow/consumers/{AvroBooleanConsumer.java => AvroStructConsumer.java} (54%)
create mode 100644 java/adapter/avro/src/test/java/org/apache/arrow/AvroTestBase.java
create mode 100644 java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowIteratorTest.java
copy java/{vector/src/main/java/org/apache/arrow/vector/compare/VectorEqualsVisitor.java => adapter/avro/src/test/resources/schema/test_array.avsc} (52%)
copy java/{vector/src/main/java/org/apache/arrow/vector/compare/VectorEqualsVisitor.java => adapter/avro/src/test/resources/schema/test_fixed.avsc} (52%)
copy java/{vector/src/main/java/org/apache/arrow/vector/compare/VectorEqualsVisitor.java => adapter/avro/src/test/resources/schema/test_map.avsc} (52%)
copy java/{vector/src/main/java/org/apache/arrow/vector/compare/VectorEqualsVisitor.java => adapter/avro/src/test/resources/schema/test_nested_record.avsc} (53%)
copy java/{vector/src/main/java/org/apache/arrow/vector/compare/VectorEqualsVisitor.java => adapter/avro/src/test/resources/schema/test_primitive_enum.avsc} (52%)
create mode 100644 java/algorithm/src/main/java/org/apache/arrow/algorithm/deduplicate/DeduplicationUtils.java
create mode 100644 java/algorithm/src/main/java/org/apache/arrow/algorithm/deduplicate/VectorRunDeduplicator.java
create mode 100644 java/algorithm/src/main/java/org/apache/arrow/algorithm/sort/StableVectorComparator.java
create mode 100644 java/algorithm/src/test/java/org/apache/arrow/algorithm/deduplicate/TestDeduplicationUtils.java
create mode 100644 java/algorithm/src/test/java/org/apache/arrow/algorithm/deduplicate/TestVectorRunDeduplicator.java
create mode 100644 java/algorithm/src/test/java/org/apache/arrow/algorithm/sort/TestStableVectorComparator.java
create mode 100644 java/flight/src/main/java/org/apache/arrow/flight/SchemaResult.java
create mode 100644 java/performance/src/test/java/org/apache/arrow/memory/util/ArrowBufPointerBenchmarks.java
copy java/vector/src/main/java/org/apache/arrow/vector/{BaseIntVector.java => FloatingPointVector.java} (54%)
create mode 100644 java/vector/src/main/java/org/apache/arrow/vector/compare/Range.java
copy java/{adapter/avro/src/main/java/org/apache/arrow/consumers/AvroNullConsumer.java => vector/src/main/java/org/apache/arrow/vector/complex/BaseListVector.java} (53%)
create mode 100644 java/vector/src/main/java/org/apache/arrow/vector/dictionary/ListSubfieldEncoder.java
create mode 100644 java/vector/src/main/java/org/apache/arrow/vector/util/ElementAddressableVectorIterator.java
create mode 100644 java/vector/src/test/java/org/apache/arrow/vector/util/TestElementAddressableVectorIterator.java
delete mode 100644 r/R/RecordBatchReader.R
delete mode 100644 r/R/RecordBatchWriter.R
rename r/R/{ArrayData.R => array-data.R} (78%)
rename r/R/{ChunkedArray.R => chunked-array.R} (50%)
rename r/R/{Field.R => field.R} (70%)
rename r/R/{List.R => list.R} (72%)
rename r/R/{memory_pool.R => memory-pool.R} (79%)
rename r/R/{read_record_batch.R => read-record-batch.R} (54%)
create mode 100644 r/R/read-table.R
delete mode 100644 r/R/read_table.R
create mode 100644 r/R/record-batch-reader.R
create mode 100644 r/R/record-batch-writer.R
rename r/R/{RecordBatch.R => record-batch.R} (61%)
rename r/R/{Schema.R => schema.R} (65%)
rename r/R/{Struct.R => struct.R} (81%)
rename r/R/{Table.R => table.R} (65%)
copy r/{tests/testthat/test-arrow.R => R/util.R} (62%)
rename r/R/{write_arrow.R => write-arrow.R} (67%)
create mode 100644 r/man/ArrayData.Rd
delete mode 100644 r/man/BufferOutputStream.Rd
delete mode 100644 r/man/BufferReader.Rd
create mode 100644 r/man/ChunkedArray.Rd
create mode 100644 r/man/Codec.Rd
delete mode 100644 r/man/CompressedInputStream.Rd
delete mode 100644 r/man/CompressedOutputStream.Rd
rename r/man/{arrow__DataType.Rd => DataType.Rd} (75%)
rename r/man/{arrow__DictionaryType.Rd => DictionaryType.Rd} (52%)
create mode 100644 r/man/Field.Rd
delete mode 100644 r/man/FileOutputStream.Rd
delete mode 100644 r/man/FixedSizeBufferWriter.Rd
rename r/man/{arrow__FixedWidthType.Rd => FixedWidthType.Rd} (72%)
create mode 100644 r/man/InputStream.Rd
create mode 100644 r/man/JsonTableReader.Rd
rename r/man/{arrow___MemoryPool.Rd => MemoryPool.Rd} (59%)
rename r/man/{arrow__ipc__Message.Rd => Message.Rd} (54%)
delete mode 100644 r/man/MockOutputStream.Rd
create mode 100644 r/man/OutputStream.Rd
create mode 100644 r/man/ParquetFileReader.Rd
create mode 100644 r/man/ParquetReaderProperties.Rd
delete mode 100644 r/man/ReadableFile.Rd
rename r/man/{arrow__RecordBatch.Rd => RecordBatch.Rd} (59%)
delete mode 100644 r/man/RecordBatchFileReader.Rd
delete mode 100644 r/man/RecordBatchFileWriter.Rd
create mode 100644 r/man/RecordBatchReader.Rd
delete mode 100644 r/man/RecordBatchStreamReader.Rd
delete mode 100644 r/man/RecordBatchStreamWriter.Rd
create mode 100644 r/man/RecordBatchWriter.Rd
create mode 100644 r/man/Schema.Rd
create mode 100644 r/man/Table.Rd
delete mode 100644 r/man/arrow__Array.Rd
delete mode 100644 r/man/arrow__ArrayData.Rd
delete mode 100644 r/man/arrow__Buffer.Rd
delete mode 100644 r/man/arrow__ChunkedArray.Rd
delete mode 100644 r/man/arrow__Column.Rd
delete mode 100644 r/man/arrow__Field.Rd
delete mode 100644 r/man/arrow__RecordBatchReader.Rd
delete mode 100644 r/man/arrow__Schema.Rd
delete mode 100644 r/man/arrow__Table.Rd
delete mode 100644 r/man/arrow__io__BufferOutputStream.Rd
delete mode 100644 r/man/arrow__io__BufferReader.Rd
delete mode 100644 r/man/arrow__io__FileOutputStream.Rd
delete mode 100644 r/man/arrow__io__FixedSizeBufferWriter.Rd
delete mode 100644 r/man/arrow__io__InputStream.Rd
delete mode 100644 r/man/arrow__io__MemoryMappedFile.Rd
delete mode 100644 r/man/arrow__io__MockOutputStream.Rd
delete mode 100644 r/man/arrow__io__OutputStream.Rd
delete mode 100644 r/man/arrow__io__RandomAccessFile.Rd
delete mode 100644 r/man/arrow__io__Readable.Rd
delete mode 100644 r/man/arrow__io__ReadableFile.Rd
delete mode 100644 r/man/arrow__ipc__MessageReader.Rd
delete mode 100644 r/man/arrow__ipc__RecordBatchFileReader.Rd
delete mode 100644 r/man/arrow__ipc__RecordBatchFileWriter.Rd
delete mode 100644 r/man/arrow__ipc__RecordBatchStreamReader.Rd
delete mode 100644 r/man/arrow__ipc__RecordBatchStreamWriter.Rd
delete mode 100644 r/man/arrow__ipc__RecordBatchWriter.Rd
delete mode 100644 r/man/arrow__json__TableReader.Rd
delete mode 100644 r/man/chunked_array.Rd
create mode 100644 r/man/compression.Rd
delete mode 100644 r/man/compression_codec.Rd
delete mode 100644 r/man/field.Rd
create mode 100644 r/man/make_readable_file.Rd
delete mode 100644 r/man/parquet_arrow_reader_properties.Rd
delete mode 100644 r/man/parquet_file_reader.Rd
delete mode 100644 r/man/schema.Rd
delete mode 100644 r/man/table.Rd
delete mode 100644 r/man/write_feather_RecordBatch.Rd
rename r/src/{array__to_vector.cpp => array_to_vector.cpp} (100%)
copy r/tests/testthat/{test-arraydata.R => test-array-data.R} (95%)
rename r/tests/testthat/{test-bufferreader.R => test-buffer-reader.R} (75%)
rename r/tests/testthat/{test-chunkedarray.R => test-chunked-array.R} (91%)
rename r/tests/testthat/{test-arrow-csv.R => test-csv.R} (77%)
rename r/tests/testthat/{test-messagereader.R => test-message-reader.R} (64%)
rename r/tests/testthat/{test-read_record_batch.R => test-read-record-batch.R} (86%)
rename r/tests/testthat/{test-recordbatchreader.R => test-record-batch-reader.R} (62%)
rename r/tests/testthat/{test-cputhreadpoolcapacity.R => test-thread-pool.R} (100%)
create mode 100644 r/vignettes/arrow.Rmd
copy r/tests/testthat/test-arraydata.R => ruby/red-arrow/benchmark/values/boolean.yml (61%)
rename r/R/Column.R => ruby/red-arrow/benchmark/values/decimal128.yml (58%)
copy ruby/red-arrow/benchmark/{raw-records/int64.yml => values/dictionary.yml} (59%)
copy r/tests/testthat/test-arraydata.R => ruby/red-arrow/benchmark/values/int64.yml (61%)
copy ruby/red-arrow/benchmark/{raw-records/int64.yml => values/list.yml} (59%)
rename r/tests/testthat/test-arraydata.R => ruby/red-arrow/benchmark/values/string.yml (62%)
copy ruby/red-arrow/benchmark/{raw-records/int64.yml => values/timestamp.yml} (59%)
create mode 100644 ruby/red-arrow/ext/arrow/converters.cpp
create mode 100644 ruby/red-arrow/ext/arrow/converters.hpp
create mode 100644 ruby/red-arrow/ext/arrow/values.cpp
create mode 100644 ruby/red-arrow/test/values/test-basic-arrays.rb
create mode 100644 ruby/red-arrow/test/values/test-dense-union-array.rb
create mode 100644 ruby/red-arrow/test/values/test-list-array.rb
create mode 100644 ruby/red-arrow/test/values/test-sparse-union-array.rb
create mode 100644 ruby/red-arrow/test/values/test-struct-array.rb
create mode 100644 rust/datafusion/src/execution/physical_plan/datasource.rs
create mode 100644 rust/datafusion/src/test/mod.rs
[arrow] 03/03: ARROW-6315: [Java] Make change to ensure flatbuffer
reads are aligned
Posted by we...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch ARROW-6313-flatbuffer-alignment
in repository https://gitbox.apache.org/repos/asf/arrow.git
commit 0352456387e0d02036029de4fbb6d49324eb779e
Author: tianchen <ni...@alibaba-inc.com>
AuthorDate: Fri Sep 6 20:36:38 2019 -0700
ARROW-6315: [Java] Make change to ensure flatbuffer reads are aligned
Implements the IPC message format alignment changes for [ARROW-6315](https://issues.apache.org/jira/browse/ARROW-6315).
i. MessageReader can read messages with the old alignment
ii. ArrowWriter could choose produces messages with the new alignment or the old format.
Closes #5229 from tianchen92/ARROW-align-java and squashes the following commits:
1eb71d27c <Bryan Cutler> ARROW-6461: Prevent EchoServer from closing the client socket after writing
cd4fd050e <tianchen> fix small bugs
9a690e47d <tianchen> fix comments and styles
5ee858c56 <tianchen> Make change to ensure flatbuffer reads are aligned
Lead-authored-by: tianchen <ni...@alibaba-inc.com>
Co-authored-by: Bryan Cutler <cu...@gmail.com>
Signed-off-by: Micah Kornfield <em...@gmail.com>
---
.../org/apache/arrow/tools/EchoServerTest.java | 2 +-
.../apache/arrow/vector/ipc/ArrowFileWriter.java | 13 ++-
.../apache/arrow/vector/ipc/ArrowStreamWriter.java | 22 ++++-
.../org/apache/arrow/vector/ipc/ArrowWriter.java | 17 +++-
.../org/apache/arrow/vector/ipc/WriteChannel.java | 9 ++
.../apache/arrow/vector/ipc/message/IpcOption.java | 28 ++++++
.../vector/ipc/message/MessageSerializer.java | 108 ++++++++++++++++++---
.../arrow/vector/ipc/MessageSerializerTest.java | 14 +--
.../arrow/vector/ipc/TestArrowReaderWriter.java | 52 +++++++++-
.../apache/arrow/vector/ipc/TestArrowStream.java | 2 +-
.../arrow/vector/ipc/TestArrowStreamPipe.java | 2 +-
11 files changed, 231 insertions(+), 38 deletions(-)
diff --git a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
index 219926a..bfb136c 100644
--- a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
+++ b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
@@ -127,7 +127,7 @@ public class EchoServerTest {
}
Assert.assertFalse(reader.loadNextBatch());
assertEquals(0, reader.getVectorSchemaRoot().getRowCount());
- assertEquals(reader.bytesRead(), writer.bytesWritten());
+ assertEquals(reader.bytesRead() + 4, writer.bytesWritten());
}
}
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java
index 395a617..936ab6d 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java
@@ -29,6 +29,7 @@ import org.apache.arrow.vector.ipc.message.ArrowBlock;
import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
import org.apache.arrow.vector.ipc.message.ArrowFooter;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.IpcOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +48,11 @@ public class ArrowFileWriter extends ArrowWriter {
super(root, provider, out);
}
+ public ArrowFileWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out,
+ IpcOption option) {
+ super(root, provider, out, option);
+ }
+
@Override
protected void startInternal(WriteChannel out) throws IOException {
ArrowMagic.writeMagic(out, true);
@@ -68,7 +74,12 @@ public class ArrowFileWriter extends ArrowWriter {
@Override
protected void endInternal(WriteChannel out) throws IOException {
- out.writeIntLittleEndian(0);
+ if (option.write_legacy_ipc_format) {
+ out.writeIntLittleEndian(0);
+ } else {
+ out.writeLongLittleEndian(0);
+ }
+
long footerStart = out.getCurrentPosition();
out.write(new ArrowFooter(schema, dictionaryBlocks, recordBlocks), false);
int footerLength = (int) (out.getCurrentPosition() - footerStart);
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java
index ec0f42e..e74323b 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java
@@ -24,6 +24,7 @@ import java.nio.channels.WritableByteChannel;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
+import org.apache.arrow.vector.ipc.message.IpcOption;
/**
* Writer for the Arrow stream format to send ArrowRecordBatches over a WriteChannel.
@@ -44,14 +45,23 @@ public class ArrowStreamWriter extends ArrowWriter {
/**
* Construct an ArrowStreamWriter with an optional DictionaryProvider for the WritableByteChannel.
+ */
+ public ArrowStreamWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) {
+ this(root, provider, out, new IpcOption());
+ }
+
+ /**
+ * Construct an ArrowStreamWriter with an optional DictionaryProvider for the WritableByteChannel.
*
* @param root Existing VectorSchemaRoot with vectors to be written.
* @param provider DictionaryProvider for any vectors that are dictionary encoded.
* (Optional, can be null)
+ * @param option IPC write options
* @param out WritableByteChannel for writing.
*/
- public ArrowStreamWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) {
- super(root, provider, out);
+ public ArrowStreamWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out,
+ IpcOption option) {
+ super(root, provider, out, option);
}
/**
@@ -60,8 +70,12 @@ public class ArrowStreamWriter extends ArrowWriter {
* @param out Open WriteChannel with an active Arrow stream.
* @throws IOException on error
*/
- public static void writeEndOfStream(WriteChannel out) throws IOException {
- out.writeIntLittleEndian(0);
+ public void writeEndOfStream(WriteChannel out) throws IOException {
+ if (option.write_legacy_ipc_format) {
+ out.writeIntLittleEndian(0);
+ } else {
+ out.writeLongLittleEndian(0);
+ }
}
@Override
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java
index 6366f2f..52ab3de 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java
@@ -33,6 +33,7 @@ import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.message.ArrowBlock;
import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.IpcOption;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
@@ -59,16 +60,24 @@ public abstract class ArrowWriter implements AutoCloseable {
private boolean dictWritten = false;
+ protected IpcOption option;
+
+ protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) {
+ this (root, provider, out, new IpcOption());
+ }
+
/**
* Note: fields are not closed when the writer is closed.
*
* @param root the vectors to write to the output
* @param provider where to find the dictionaries
* @param out the output where to write
+ * @param option IPC write options
*/
- protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) {
+ protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out, IpcOption option) {
this.unloader = new VectorUnloader(root);
this.out = new WriteChannel(out);
+ this.option = option;
List<Field> fields = new ArrayList<>(root.getSchema().getFields().size());
Set<Long> dictionaryIdsUsed = new HashSet<>();
@@ -112,14 +121,14 @@ public abstract class ArrowWriter implements AutoCloseable {
}
protected ArrowBlock writeDictionaryBatch(ArrowDictionaryBatch batch) throws IOException {
- ArrowBlock block = MessageSerializer.serialize(out, batch);
+ ArrowBlock block = MessageSerializer.serialize(out, batch, option);
LOGGER.debug("DictionaryRecordBatch at {}, metadata: {}, body: {}",
block.getOffset(), block.getMetadataLength(), block.getBodyLength());
return block;
}
protected ArrowBlock writeRecordBatch(ArrowRecordBatch batch) throws IOException {
- ArrowBlock block = MessageSerializer.serialize(out, batch);
+ ArrowBlock block = MessageSerializer.serialize(out, batch, option);
LOGGER.debug("RecordBatch at {}, metadata: {}, body: {}",
block.getOffset(), block.getMetadataLength(), block.getBodyLength());
return block;
@@ -140,7 +149,7 @@ public abstract class ArrowWriter implements AutoCloseable {
startInternal(out);
// write the schema - for file formats this is duplicated in the footer, but matches
// the streaming format
- MessageSerializer.serialize(out, schema);
+ MessageSerializer.serialize(out, schema, option);
}
}
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java
index eef36d3..2d36c93 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java
@@ -102,6 +102,15 @@ public class WriteChannel implements AutoCloseable {
}
/**
+ * Writes <code>v</code> in little-endian format to the underlying channel.
+ */
+ public long writeLongLittleEndian(long v) throws IOException {
+ byte[] outBuffer = new byte[8];
+ MessageSerializer.longToBytes(v, outBuffer);
+ return write(outBuffer);
+ }
+
+ /**
* Writes the buffer to the underlying channel.
*/
public void write(ArrowBuf buffer) throws IOException {
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/IpcOption.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/IpcOption.java
new file mode 100644
index 0000000..81a0603
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/IpcOption.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.ipc.message;
+
+/**
+ * IPC options, now only use for write.
+ */
+public class IpcOption {
+
+ // Write the pre-0.15.0 encapsulated IPC message format
+ // consisting of a 4-byte prefix instead of 8 byte
+ public boolean write_legacy_ipc_format = false;
+}
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
index 4016802..34ea077 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
@@ -56,6 +56,9 @@ import io.netty.buffer.ArrowBuf;
*/
public class MessageSerializer {
+ // This 0xFFFFFFFF value is the first 4 bytes of a valid IPC message
+ public static final int IPC_CONTINUATION_TOKEN = -1;
+
/**
* Convert an array of 4 bytes to a little endian i32 value.
*
@@ -83,6 +86,28 @@ public class MessageSerializer {
}
/**
+ * Convert a long to a 8 byte array.
+ *
+ * @param value long value input
+ * @param bytes existing byte array with minimum length of 8 to contain the conversion output
+ */
+ public static void longToBytes(long value, byte[] bytes) {
+ bytes[7] = (byte) (value >>> 56);
+ bytes[6] = (byte) (value >>> 48);
+ bytes[5] = (byte) (value >>> 40);
+ bytes[4] = (byte) (value >>> 32);
+ bytes[3] = (byte) (value >>> 24);
+ bytes[2] = (byte) (value >>> 16);
+ bytes[1] = (byte) (value >>> 8);
+ bytes[0] = (byte) (value);
+ }
+
+ public static int writeMessageBuffer(WriteChannel out, int messageLength, ByteBuffer messageBuffer)
+ throws IOException {
+ return writeMessageBuffer(out, messageLength, messageBuffer, new IpcOption());
+ }
+
+ /**
* Write the serialized Message metadata, prefixed by the length, to the output Channel. This
* ensures that it aligns to an 8 byte boundary and will adjust the message length to include
* any padding used for alignment.
@@ -91,22 +116,36 @@ public class MessageSerializer {
* @param messageLength Number of bytes in the message buffer, written as little Endian prefix
* @param messageBuffer Message metadata buffer to be written, this does not include any
* message body data which should be subsequently written to the Channel
+ * @param option IPC write options
* @return Number of bytes written
* @throws IOException on error
*/
- public static int writeMessageBuffer(WriteChannel out, int messageLength, ByteBuffer messageBuffer)
+ public static int writeMessageBuffer(WriteChannel out, int messageLength, ByteBuffer messageBuffer, IpcOption option)
throws IOException {
- // ensure that message aligns to 8 byte padding - 4 bytes for size, then message body
- if ((messageLength + 4) % 8 != 0) {
- messageLength += 8 - (messageLength + 4) % 8;
+ // if write the pre-0.15.0 encapsulated IPC message format consisting of a 4-byte prefix instead of 8 byte
+ int prefixSize = option.write_legacy_ipc_format ? 4 : 8;
+
+ // ensure that message aligns to 8 byte padding - prefix_size bytes, then message body
+ if ((messageLength + prefixSize ) % 8 != 0) {
+ messageLength += 8 - (messageLength + prefixSize) % 8;
+ }
+ if (!option.write_legacy_ipc_format) {
+ out.writeIntLittleEndian(IPC_CONTINUATION_TOKEN);
}
out.writeIntLittleEndian(messageLength);
out.write(messageBuffer);
out.align();
// any bytes written are already captured by our size modification above
- return messageLength + 4;
+ return messageLength + prefixSize;
+ }
+
+ /**
+ * Serialize a schema object.
+ */
+ public static long serialize(WriteChannel out, Schema schema) throws IOException {
+ return serialize(out, schema, new IpcOption());
}
/**
@@ -117,7 +156,7 @@ public class MessageSerializer {
* @return the number of bytes written
* @throws IOException if something went wrong
*/
- public static long serialize(WriteChannel out, Schema schema) throws IOException {
+ public static long serialize(WriteChannel out, Schema schema, IpcOption option) throws IOException {
long start = out.getCurrentPosition();
assert start % 8 == 0;
@@ -125,7 +164,7 @@ public class MessageSerializer {
int messageLength = serializedMessage.remaining();
- int bytesWritten = writeMessageBuffer(out, messageLength, serializedMessage);
+ int bytesWritten = writeMessageBuffer(out, messageLength, serializedMessage, option);
assert bytesWritten % 8 == 0;
return bytesWritten;
}
@@ -182,13 +221,20 @@ public class MessageSerializer {
/**
* Serializes an ArrowRecordBatch. Returns the offset and length of the written batch.
+ */
+ public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch) throws IOException {
+ return serialize(out, batch, new IpcOption());
+ }
+
+ /**
+ * Serializes an ArrowRecordBatch. Returns the offset and length of the written batch.
*
* @param out where to write the batch
* @param batch the object to serialize to out
* @return the serialized block metadata
* @throws IOException if something went wrong
*/
- public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch) throws IOException {
+ public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch, IpcOption option) throws IOException {
long start = out.getCurrentPosition();
int bodyLength = batch.computeBodyLength();
@@ -198,8 +244,14 @@ public class MessageSerializer {
int metadataLength = serializedMessage.remaining();
+ int prefixSize = 4;
+ if (!option.write_legacy_ipc_format) {
+ out.writeIntLittleEndian(IPC_CONTINUATION_TOKEN);
+ prefixSize = 8;
+ }
+
// calculate alignment bytes so that metadata length points to the correct location after alignment
- int padding = (int) ((start + metadataLength + 4) % 8);
+ int padding = (int) ((start + metadataLength + prefixSize) % 8);
if (padding != 0) {
metadataLength += (8 - padding);
}
@@ -214,7 +266,7 @@ public class MessageSerializer {
assert bufferLength % 8 == 0;
// Metadata size in the Block account for the size prefix
- return new ArrowBlock(start, metadataLength + 4, bufferLength);
+ return new ArrowBlock(start, metadataLength + prefixSize, bufferLength);
}
/**
@@ -305,7 +357,7 @@ public class MessageSerializer {
*/
public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, ArrowBlock block, BufferAllocator alloc)
throws IOException {
- // Metadata length contains integer prefix plus byte padding
+ // Metadata length contains prefix_size bytes plus byte padding
long totalLen = block.getMetadataLength() + block.getBodyLength();
if (totalLen > Integer.MAX_VALUE) {
@@ -317,7 +369,9 @@ public class MessageSerializer {
throw new IOException("Unexpected end of input trying to read batch.");
}
- ArrowBuf metadataBuffer = buffer.slice(4, block.getMetadataLength() - 4);
+ int prefixSize = buffer.getInt(0) == IPC_CONTINUATION_TOKEN ? 8 : 4;
+
+ ArrowBuf metadataBuffer = buffer.slice(prefixSize, block.getMetadataLength() - prefixSize);
Message messageFB =
Message.getRootAsMessage(metadataBuffer.nioBuffer().asReadOnlyBuffer());
@@ -375,15 +429,21 @@ public class MessageSerializer {
return deserializeRecordBatch(serializedMessage.getMessage(), underlying);
}
+ public static ArrowBlock serialize(WriteChannel out, ArrowDictionaryBatch batch) throws IOException {
+ return serialize(out, batch, new IpcOption());
+ }
+
/**
* Serializes a dictionary ArrowRecordBatch. Returns the offset and length of the written batch.
*
* @param out where to serialize
* @param batch the batch to serialize
+ * @param option options for IPC
* @return the metadata of the serialized block
* @throws IOException if something went wrong
*/
- public static ArrowBlock serialize(WriteChannel out, ArrowDictionaryBatch batch) throws IOException {
+ public static ArrowBlock serialize(WriteChannel out, ArrowDictionaryBatch batch, IpcOption option)
+ throws IOException {
long start = out.getCurrentPosition();
int bodyLength = batch.computeBodyLength();
assert bodyLength % 8 == 0;
@@ -392,8 +452,14 @@ public class MessageSerializer {
int metadataLength = serializedMessage.remaining();
+ int prefixSize = 4;
+ if (!option.write_legacy_ipc_format) {
+ out.writeIntLittleEndian(IPC_CONTINUATION_TOKEN);
+ prefixSize = 8;
+ }
+
// calculate alignment bytes so that metadata length points to the correct location after alignment
- int padding = (int) ((start + metadataLength + 4) % 8);
+ int padding = (int) ((start + metadataLength + prefixSize) % 8);
if (padding != 0) {
metadataLength += (8 - padding);
}
@@ -409,7 +475,7 @@ public class MessageSerializer {
assert bufferLength % 8 == 0;
// Metadata size in the Block account for the size prefix
- return new ArrowBlock(start, metadataLength + 4, bufferLength);
+ return new ArrowBlock(start, metadataLength + prefixSize, bufferLength);
}
/**
@@ -491,7 +557,9 @@ public class MessageSerializer {
throw new IOException("Unexpected end of input trying to read batch.");
}
- ArrowBuf metadataBuffer = buffer.slice(4, block.getMetadataLength() - 4);
+ int prefixSize = buffer.getInt(0) == IPC_CONTINUATION_TOKEN ? 8 : 4;
+
+ ArrowBuf metadataBuffer = buffer.slice(prefixSize, block.getMetadataLength() - prefixSize);
Message messageFB =
Message.getRootAsMessage(metadataBuffer.nioBuffer().asReadOnlyBuffer());
@@ -584,7 +652,15 @@ public class MessageSerializer {
// Read the message size. There is an i32 little endian prefix.
ByteBuffer buffer = ByteBuffer.allocate(4);
if (in.readFully(buffer) == 4) {
+
int messageLength = MessageSerializer.bytesToInt(buffer.array());
+ if (messageLength == IPC_CONTINUATION_TOKEN) {
+ buffer.clear();
+ // ARROW-6313, if the first 4 bytes are continuation message, read the next 4 for the length
+ if (in.readFully(buffer) == 4) {
+ messageLength = MessageSerializer.bytesToInt(buffer.array());
+ }
+ }
// Length of 0 indicates end of stream
if (messageLength != 0) {
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java
index 789da1f..1cbd5bb 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java
@@ -95,7 +95,7 @@ public class MessageSerializerTest {
buffer.putInt(3);
buffer.flip();
bytesWritten = MessageSerializer.writeMessageBuffer(out, 4, buffer);
- assertEquals(8, bytesWritten);
+ assertEquals(16, bytesWritten);
ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
ReadChannel in = new ReadChannel(Channels.newChannel(inputStream));
@@ -103,15 +103,17 @@ public class MessageSerializerTest {
in.readFully(result);
result.rewind();
- // First message size, 2 int values, 4 bytes of zero padding
- assertEquals(12, result.getInt());
+ // First message continuation, size, and 2 int values
+ assertEquals(MessageSerializer.IPC_CONTINUATION_TOKEN, result.getInt());
+ assertEquals(8, result.getInt());
assertEquals(1, result.getInt());
assertEquals(2, result.getInt());
- assertEquals(0, result.getInt());
- // Second message size and 1 int value
- assertEquals(4, result.getInt());
+ // Second message continuation, size, 1 int value and 4 bytes padding
+ assertEquals(MessageSerializer.IPC_CONTINUATION_TOKEN, result.getInt());
+ assertEquals(8, result.getInt());
assertEquals(3, result.getInt());
+ assertEquals(0, result.getInt());
}
@Test
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java
index 5d1f792..58ad669 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -42,6 +43,7 @@ import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.Collections2;
import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.TestUtils;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorLoader;
@@ -54,6 +56,7 @@ import org.apache.arrow.vector.ipc.message.ArrowBlock;
import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.IpcOption;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
@@ -75,7 +78,7 @@ public class TestArrowReaderWriter {
private Dictionary dictionary2;
private Schema schema;
- private Schema encodedchema;
+ private Schema encodedSchema;
@Before
public void init() {
@@ -161,7 +164,8 @@ public class TestArrowReaderWriter {
// deserialize the buffer.
ByteBuffer headerBuffer = ByteBuffer.allocate(recordBatches.get(0).getMetadataLength());
headerBuffer.put(byteArray, (int) recordBatches.get(0).getOffset(), headerBuffer.capacity());
- headerBuffer.position(4);
+ // new format prefix_size ==8
+ headerBuffer.position(8);
Message messageFB = Message.getRootAsMessage(headerBuffer);
RecordBatch recordBatchFB = (RecordBatch) messageFB.header(new RecordBatch());
assertEquals(2, recordBatchFB.buffersLength());
@@ -335,7 +339,7 @@ public class TestArrowReaderWriter {
try (ArrowStreamReader reader = new ArrowStreamReader(
new ByteArrayReadableSeekableByteChannel(outStream.toByteArray()), allocator)) {
Schema readSchema = reader.getVectorSchemaRoot().getSchema();
- assertEquals(encodedchema, readSchema);
+ assertEquals(encodedSchema, readSchema);
assertEquals(2, reader.getDictionaryVectors().size());
assertTrue(reader.loadNextBatch());
assertTrue(reader.loadNextBatch());
@@ -401,8 +405,48 @@ public class TestArrowReaderWriter {
schemaFields.add(DictionaryUtility.toMessageFormat(encodedVectorA2.getField(), provider, new HashSet<>()));
schema = new Schema(schemaFields);
- encodedchema = new Schema(Arrays.asList(encodedVectorA1.getField(), encodedVectorA2.getField()));
+ encodedSchema = new Schema(Arrays.asList(encodedVectorA1.getField(), encodedVectorA2.getField()));
return batches;
}
+
+ @Test
+ public void testLegacyIpcBackwardsCompatibility() throws Exception {
+ Schema schema = new Schema(asList(Field.nullable("field", new ArrowType.Int(32, true))));
+ IntVector vector = new IntVector("vector", allocator);
+ final int valueCount = 2;
+ vector.setValueCount(valueCount);
+ vector.setSafe(0, 1);
+ vector.setSafe(1, 2);
+ ArrowRecordBatch batch = new ArrowRecordBatch(valueCount, asList(new ArrowFieldNode(valueCount, 0)),
+ asList(vector.getValidityBuffer(), vector.getDataBuffer()));
+
+ ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+ WriteChannel out = new WriteChannel(newChannel(outStream));
+
+ // write legacy ipc format
+ IpcOption option = new IpcOption();
+ option.write_legacy_ipc_format = true;
+ MessageSerializer.serialize(out, schema, option);
+ MessageSerializer.serialize(out, batch);
+
+ ReadChannel in = new ReadChannel(newChannel(new ByteArrayInputStream(outStream.toByteArray())));
+ Schema readSchema = MessageSerializer.deserializeSchema(in);
+ assertEquals(schema, readSchema);
+ ArrowRecordBatch readBatch = MessageSerializer.deserializeRecordBatch(in, allocator);
+ assertEquals(batch.getLength(), readBatch.getLength());
+ assertEquals(batch.computeBodyLength(), readBatch.computeBodyLength());
+
+ // write ipc format with continuation
+ option.write_legacy_ipc_format = false;
+ MessageSerializer.serialize(out, schema, option);
+ MessageSerializer.serialize(out, batch);
+
+ ReadChannel in2 = new ReadChannel(newChannel(new ByteArrayInputStream(outStream.toByteArray())));
+ Schema readSchema2 = MessageSerializer.deserializeSchema(in2);
+ assertEquals(schema, readSchema2);
+ ArrowRecordBatch readBatch2 = MessageSerializer.deserializeRecordBatch(in2, allocator);
+ assertEquals(batch.getLength(), readBatch2.getLength());
+ assertEquals(batch.computeBodyLength(), readBatch2.computeBodyLength());
+ }
}
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStream.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStream.java
index 92e5276..5d8f5df 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStream.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStream.java
@@ -117,7 +117,7 @@ public class TestArrowStream extends BaseFileTest {
assertTrue(reader.loadNextBatch());
}
// TODO figure out why reader isn't getting padding bytes
- assertEquals(bytesWritten, reader.bytesRead() + 4);
+ assertEquals(bytesWritten, reader.bytesRead() + 8);
assertFalse(reader.loadNextBatch());
assertEquals(0, reader.getVectorSchemaRoot().getRowCount());
}
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStreamPipe.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStreamPipe.java
index 422a63f..07f4017 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStreamPipe.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStreamPipe.java
@@ -156,6 +156,6 @@ public class TestArrowStreamPipe {
writer.join();
assertEquals(NUM_BATCHES, reader.getBatchesRead());
- assertEquals(writer.bytesWritten(), reader.bytesRead());
+ assertEquals(writer.bytesWritten(), reader.bytesRead() + 4);
}
}
[arrow] 01/03: ARROW-6314: [C++] Implement IPC message format
alignment changes,
provide backwards compatibility and "legacy" option to emit old message
format
Posted by we...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch ARROW-6313-flatbuffer-alignment
in repository https://gitbox.apache.org/repos/asf/arrow.git
commit 9514e2e225cdb84da09476cb5707dbb9443239eb
Author: Wes McKinney <we...@apache.org>
AuthorDate: Wed Aug 28 11:58:52 2019 -0500
ARROW-6314: [C++] Implement IPC message format alignment changes, provide backwards compatibility and "legacy" option to emit old message format
This also moves the alignment multiple to `IpcOptions` and adds the `IpcOptions` argument to more functions.
Closes #5211 from wesm/ARROW-6314 and squashes the following commits:
df3b910bd <Wes McKinney> Fix MSVC narrowing warning
62758b614 <Wes McKinney> Code review comments. Copy metadata always in prefix_length==4 legacy case
857a57155 <Antoine Pitrou> Fix CUDA IPC
71b2fad4f <Wes McKinney> Add tests exercising backwards compatibility write and read path
4777d2bc8 <Wes McKinney> Implement backwards compatibility and compatibility mode, pass IpcOptions in more APIs
1a3843215 <Wes McKinney> Revert changes to submodule
69883cf63 <Micah Kornfield> verify 8 bytes alignment fixes ubsan for ipc
Lead-authored-by: Wes McKinney <we...@apache.org>
Co-authored-by: Antoine Pitrou <an...@python.org>
Co-authored-by: Micah Kornfield <em...@gmail.com>
Signed-off-by: Wes McKinney <we...@apache.org>
---
cpp/src/arrow/gpu/cuda_arrow_ipc.cc | 26 +-----
cpp/src/arrow/ipc/message.cc | 154 ++++++++++++++++++++++++++++-----
cpp/src/arrow/ipc/message.h | 47 +++++++---
cpp/src/arrow/ipc/metadata_internal.cc | 32 -------
cpp/src/arrow/ipc/metadata_internal.h | 16 ----
cpp/src/arrow/ipc/options.h | 8 ++
cpp/src/arrow/ipc/read_write_test.cc | 119 ++++++++++++++++++-------
cpp/src/arrow/ipc/writer.cc | 69 +++++++--------
cpp/src/arrow/ipc/writer.h | 22 ++---
python/pyarrow/includes/libarrow.pxd | 7 +-
python/pyarrow/ipc.pxi | 5 +-
11 files changed, 309 insertions(+), 196 deletions(-)
diff --git a/cpp/src/arrow/gpu/cuda_arrow_ipc.cc b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc
index 34488a1..0fb81bc 100644
--- a/cpp/src/arrow/gpu/cuda_arrow_ipc.cc
+++ b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc
@@ -63,31 +63,7 @@ Status SerializeRecordBatch(const RecordBatch& batch, CudaContext* ctx,
Status ReadMessage(CudaBufferReader* reader, MemoryPool* pool,
std::unique_ptr<ipc::Message>* out) {
- int32_t message_length = 0;
- int64_t bytes_read = 0;
-
- RETURN_NOT_OK(reader->Read(sizeof(int32_t), &bytes_read,
- reinterpret_cast<uint8_t*>(&message_length)));
- if (bytes_read != sizeof(int32_t)) {
- *out = nullptr;
- return Status::OK();
- }
-
- if (message_length == 0) {
- // Optional 0 EOS control message
- *out = nullptr;
- return Status::OK();
- }
-
- std::shared_ptr<Buffer> metadata;
- RETURN_NOT_OK(AllocateBuffer(pool, message_length, &metadata));
- RETURN_NOT_OK(reader->Read(message_length, &bytes_read, metadata->mutable_data()));
- if (bytes_read != message_length) {
- return Status::IOError("Expected ", message_length, " metadata bytes, but only got ",
- bytes_read);
- }
-
- return ipc::Message::ReadFrom(metadata, reader, out);
+ return ipc::ReadMessageCopy(reader, pool, out);
}
Status ReadRecordBatch(const std::shared_ptr<Schema>& schema,
diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc
index dad1f98..a281b0d 100644
--- a/cpp/src/arrow/ipc/message.cc
+++ b/cpp/src/arrow/ipc/message.cc
@@ -32,10 +32,14 @@
#include "arrow/ipc/util.h"
#include "arrow/status.h"
#include "arrow/util/logging.h"
+#include "arrow/util/ubsan.h"
namespace arrow {
namespace ipc {
+// This 0xFFFFFFFF value is the first 4 bytes of a valid IPC message
+constexpr int32_t kIpcContinuationToken = -1;
+
class Message::MessageImpl {
public:
explicit MessageImpl(const std::shared_ptr<Buffer>& metadata,
@@ -142,12 +146,19 @@ bool Message::Equals(const Message& other) const {
}
}
-Status Message::ReadFrom(const std::shared_ptr<Buffer>& metadata, io::InputStream* stream,
- std::unique_ptr<Message>* out) {
+Status CheckMetadataAndGetBodyLength(const Buffer& metadata, int64_t* body_length) {
+ // Check metadata memory alignment in debug builds
+ DCHECK_EQ(0, reinterpret_cast<uintptr_t>(metadata.data()) % 8);
const flatbuf::Message* fb_message;
- RETURN_NOT_OK(internal::VerifyMessage(metadata->data(), metadata->size(), &fb_message));
+ RETURN_NOT_OK(internal::VerifyMessage(metadata.data(), metadata.size(), &fb_message));
+ *body_length = fb_message->bodyLength();
+ return Status::OK();
+}
- int64_t body_length = fb_message->bodyLength();
+Status Message::ReadFrom(const std::shared_ptr<Buffer>& metadata, io::InputStream* stream,
+ std::unique_ptr<Message>* out) {
+ int64_t body_length = -1;
+ RETURN_NOT_OK(CheckMetadataAndGetBodyLength(*metadata, &body_length));
std::shared_ptr<Buffer> body;
RETURN_NOT_OK(stream->Read(body_length, &body));
@@ -161,9 +172,8 @@ Status Message::ReadFrom(const std::shared_ptr<Buffer>& metadata, io::InputStrea
Status Message::ReadFrom(const int64_t offset, const std::shared_ptr<Buffer>& metadata,
io::RandomAccessFile* file, std::unique_ptr<Message>* out) {
- const flatbuf::Message* fb_message;
- RETURN_NOT_OK(internal::VerifyMessage(metadata->data(), metadata->size(), &fb_message));
- int64_t body_length = fb_message->bodyLength();
+ int64_t body_length = -1;
+ RETURN_NOT_OK(CheckMetadataAndGetBodyLength(*metadata, &body_length));
std::shared_ptr<Buffer> body;
RETURN_NOT_OK(file->ReadAt(offset, body_length, &body));
@@ -184,10 +194,10 @@ Status WritePadding(io::OutputStream* stream, int64_t nbytes) {
return Status::OK();
}
-Status Message::SerializeTo(io::OutputStream* stream, int32_t alignment,
+Status Message::SerializeTo(io::OutputStream* stream, const IpcOptions& options,
int64_t* output_length) const {
int32_t metadata_length = 0;
- RETURN_NOT_OK(internal::WriteMessage(*metadata(), alignment, stream, &metadata_length));
+ RETURN_NOT_OK(WriteMessage(*metadata(), options, stream, &metadata_length));
*output_length = metadata_length;
@@ -237,15 +247,47 @@ Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile
" metadata bytes but got ", buffer->size());
}
- int32_t flatbuffer_size = *reinterpret_cast<const int32_t*>(buffer->data());
+ const int32_t continuation = util::SafeLoadAs<int32_t>(buffer->data());
+
+ // The size of the Flatbuffer including padding
+ int32_t flatbuffer_length = -1;
+ int32_t prefix_size = -1;
+ if (continuation == kIpcContinuationToken) {
+ if (metadata_length < 8) {
+ return Status::Invalid(
+ "Corrupted IPC message, had continuation token "
+ " but length ",
+ metadata_length);
+ }
- if (flatbuffer_size + static_cast<int>(sizeof(int32_t)) > metadata_length) {
- return Status::Invalid("flatbuffer size ", metadata_length,
+ // Valid IPC message, parse the message length now
+ flatbuffer_length = util::SafeLoadAs<int32_t>(buffer->data() + 4);
+ prefix_size = 8;
+ } else if (continuation == 0) {
+ // EOS
+ *message = nullptr;
+ return Status::OK();
+ } else {
+ // ARROW-6314: Backwards compatibility for reading old IPC
+ // messages produced prior to version 0.15.0
+ flatbuffer_length = continuation;
+ prefix_size = 4;
+ }
+
+ if (flatbuffer_length + prefix_size != metadata_length) {
+ return Status::Invalid("flatbuffer size ", flatbuffer_length,
" invalid. File offset: ", offset,
", metadata length: ", metadata_length);
}
- auto metadata = SliceBuffer(buffer, 4, buffer->size() - 4);
+ std::shared_ptr<Buffer> metadata =
+ SliceBuffer(buffer, prefix_size, buffer->size() - prefix_size);
+ if (prefix_size == 4) {
+ // ARROW-6314: For old messages we copy the metadata to fix UBSAN
+ // issues with Flatbuffers. For new messages, they are already
+ // aligned
+ RETURN_NOT_OK(metadata->Copy(0, metadata->size(), &metadata));
+ }
return Message::ReadFrom(offset + metadata_length, metadata, file, message);
}
@@ -269,39 +311,105 @@ Status CheckAligned(io::FileInterface* stream, int32_t alignment) {
int64_t current_position;
ARROW_RETURN_NOT_OK(stream->Tell(¤t_position));
if (current_position % alignment != 0) {
- return Status::Invalid("Stream is not aligned");
+ return Status::Invalid("Stream is not aligned pos: ", current_position,
+ " alignment: ", alignment);
} else {
return Status::OK();
}
}
-Status ReadMessage(io::InputStream* file, std::unique_ptr<Message>* message) {
- int32_t message_length = 0;
+namespace {
+
+Status ReadMessage(io::InputStream* file, MemoryPool* pool, bool copy_metadata,
+ std::unique_ptr<Message>* message) {
+ int32_t continuation = 0;
int64_t bytes_read = 0;
RETURN_NOT_OK(file->Read(sizeof(int32_t), &bytes_read,
- reinterpret_cast<uint8_t*>(&message_length)));
+ reinterpret_cast<uint8_t*>(&continuation)));
if (bytes_read != sizeof(int32_t)) {
+ // EOS
*message = nullptr;
return Status::OK();
}
- if (message_length == 0) {
- // Optional 0 EOS control message
+ int32_t flatbuffer_length = -1;
+ bool legacy_format = false;
+ if (continuation == kIpcContinuationToken) {
+ // Valid IPC message, read the message length now
+ RETURN_NOT_OK(file->Read(sizeof(int32_t), &bytes_read,
+ reinterpret_cast<uint8_t*>(&flatbuffer_length)));
+ } else if (continuation == 0) {
+ // EOS
*message = nullptr;
return Status::OK();
+ } else {
+ // ARROW-6314: Backwards compatibility for reading old IPC
+ // messages produced prior to version 0.15.0
+ flatbuffer_length = continuation;
+ legacy_format = true;
}
std::shared_ptr<Buffer> metadata;
- RETURN_NOT_OK(file->Read(message_length, &metadata));
- if (metadata->size() != message_length) {
- return Status::Invalid("Expected to read ", message_length, " metadata bytes, but ",
- "only read ", metadata->size());
+ if (legacy_format || copy_metadata) {
+ DCHECK_NE(pool, nullptr);
+ RETURN_NOT_OK(AllocateBuffer(pool, flatbuffer_length, &metadata));
+ RETURN_NOT_OK(file->Read(flatbuffer_length, &bytes_read, metadata->mutable_data()));
+ } else {
+ RETURN_NOT_OK(file->Read(flatbuffer_length, &metadata));
+ bytes_read = metadata->size();
+ }
+ if (bytes_read != flatbuffer_length) {
+ return Status::Invalid("Expected to read ", flatbuffer_length,
+ " metadata bytes, but ", "only read ", bytes_read);
}
return Message::ReadFrom(metadata, file, message);
}
+} // namespace
+
+Status ReadMessage(io::InputStream* file, std::unique_ptr<Message>* out) {
+ return ReadMessage(file, default_memory_pool(), /*copy_metadata=*/false, out);
+}
+
+Status ReadMessageCopy(io::InputStream* file, MemoryPool* pool,
+ std::unique_ptr<Message>* out) {
+ return ReadMessage(file, pool, /*copy_metadata=*/true, out);
+}
+
+Status WriteMessage(const Buffer& message, const IpcOptions& options,
+ io::OutputStream* file, int32_t* message_length) {
+ const int32_t prefix_size = options.write_legacy_ipc_format ? 4 : 8;
+ const int32_t flatbuffer_size = static_cast<int32_t>(message.size());
+
+ int32_t padded_message_length = static_cast<int32_t>(
+ PaddedLength(flatbuffer_size + prefix_size, options.alignment));
+
+ int32_t padding = padded_message_length - flatbuffer_size - prefix_size;
+
+ // The returned message size includes the length prefix, the flatbuffer,
+ // plus padding
+ *message_length = padded_message_length;
+
+ // ARROW-6314: Write continuation / padding token
+ if (!options.write_legacy_ipc_format) {
+ RETURN_NOT_OK(file->Write(&kIpcContinuationToken, sizeof(int32_t)));
+ }
+
+ // Write the flatbuffer size prefix including padding
+ int32_t padded_flatbuffer_size = padded_message_length - prefix_size;
+ RETURN_NOT_OK(file->Write(&padded_flatbuffer_size, sizeof(int32_t)));
+
+ // Write the flatbuffer
+ RETURN_NOT_OK(file->Write(message.data(), flatbuffer_size));
+ if (padding > 0) {
+ RETURN_NOT_OK(file->Write(kPaddingBytes, padding));
+ }
+
+ return Status::OK();
+}
+
// ----------------------------------------------------------------------
// Implement InputStream message reader
diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h
index 9c152d7..89be45e 100644
--- a/cpp/src/arrow/ipc/message.h
+++ b/cpp/src/arrow/ipc/message.h
@@ -17,8 +17,7 @@
// C++ object model and user API for interprocess schema messaging
-#ifndef ARROW_IPC_MESSAGE_H
-#define ARROW_IPC_MESSAGE_H
+#pragma once
#include <cstdint>
#include <memory>
@@ -32,6 +31,7 @@
namespace arrow {
class Buffer;
+class MemoryPool;
namespace io {
@@ -137,13 +137,10 @@ class ARROW_EXPORT Message {
/// \brief Write length-prefixed metadata and body to output stream
///
/// \param[in] file output stream to write to
- /// \param[in] alignment byte alignment for metadata, usually 8 or
- /// 64. Whether the body is padded depends on the metadata; if the body
- /// buffer is smaller than the size indicated in the metadata, then extra
- /// padding bytes will be written
+ /// \param[in] options IPC writing options including alignment
/// \param[out] output_length the number of bytes written
/// \return Status
- Status SerializeTo(io::OutputStream* file, int32_t alignment,
+ Status SerializeTo(io::OutputStream* file, const IpcOptions& options,
int64_t* output_length) const;
/// \brief Return true if the Message metadata passes Flatbuffer validation
@@ -223,15 +220,39 @@ Status AlignStream(io::OutputStream* stream, int32_t alignment = 8);
ARROW_EXPORT
Status CheckAligned(io::FileInterface* stream, int32_t alignment = 8);
-/// \brief Read encapsulated RPC message (metadata and body) from InputStream
+/// \brief Read encapsulated IPC message (metadata and body) from InputStream
///
-/// Read length-prefixed message with as-yet unknown length. Returns null if
-/// there are not enough bytes available or the message length is 0 (e.g. EOS
-/// in a stream)
+/// Returns null if there are not enough bytes available or the
+/// message length is 0 (e.g. EOS in a stream)
ARROW_EXPORT
Status ReadMessage(io::InputStream* stream, std::unique_ptr<Message>* message);
+/// \brief Read encapsulated IPC message (metadata and body) from InputStream
+///
+/// Like ReadMessage, except that the metadata is copied in a new buffer.
+/// This is necessary if the stream returns non-CPU buffers.
+ARROW_EXPORT
+Status ReadMessageCopy(io::InputStream* stream, MemoryPool* pool,
+ std::unique_ptr<Message>* message);
+
+/// Write encapsulated IPC message Does not make assumptions about
+/// whether the stream is aligned already. Can write legacy (pre
+/// version 0.15.0) IPC message if option set
+///
+/// continuation: 0xFFFFFFFF
+/// message_size: int32
+/// message: const void*
+/// padding
+///
+/// \param[in] message a buffer containing the metadata to write
+/// \param[in] options IPC writing options, including alignment and
+/// legacy message support
+/// \param[in,out] file the OutputStream to write to
+/// \param[out] message_length the total size of the payload written including
+/// padding
+/// \return Status
+Status WriteMessage(const Buffer& message, const IpcOptions& options,
+ io::OutputStream* file, int32_t* message_length);
+
} // namespace ipc
} // namespace arrow
-
-#endif // ARROW_IPC_MESSAGE_H
diff --git a/cpp/src/arrow/ipc/metadata_internal.cc b/cpp/src/arrow/ipc/metadata_internal.cc
index 6810351..dff3369 100644
--- a/cpp/src/arrow/ipc/metadata_internal.cc
+++ b/cpp/src/arrow/ipc/metadata_internal.cc
@@ -1282,38 +1282,6 @@ Status GetSparseTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>
return ConcreteTypeFromFlatbuffer(sparse_tensor->type_type(), type_data, {}, type);
}
-// ----------------------------------------------------------------------
-// Implement message writing
-
-Status WriteMessage(const Buffer& message, int32_t alignment, io::OutputStream* file,
- int32_t* message_length) {
- // ARROW-3212: We do not make assumptions that the output stream is aligned
- int32_t padded_message_length = static_cast<int32_t>(message.size()) + 4;
- const int32_t remainder = padded_message_length % alignment;
- if (remainder != 0) {
- padded_message_length += alignment - remainder;
- }
-
- // The returned message size includes the length prefix, the flatbuffer,
- // plus padding
- *message_length = padded_message_length;
-
- // Write the flatbuffer size prefix including padding
- int32_t flatbuffer_size = padded_message_length - 4;
- RETURN_NOT_OK(file->Write(&flatbuffer_size, sizeof(int32_t)));
-
- // Write the flatbuffer
- RETURN_NOT_OK(file->Write(message.data(), message.size()));
-
- // Write any padding
- int32_t padding = padded_message_length - static_cast<int32_t>(message.size()) - 4;
- if (padding > 0) {
- RETURN_NOT_OK(file->Write(kPaddingBytes, padding));
- }
-
- return Status::OK();
-}
-
} // namespace internal
} // namespace ipc
} // namespace arrow
diff --git a/cpp/src/arrow/ipc/metadata_internal.h b/cpp/src/arrow/ipc/metadata_internal.h
index 828affd..420cfb1 100644
--- a/cpp/src/arrow/ipc/metadata_internal.h
+++ b/cpp/src/arrow/ipc/metadata_internal.h
@@ -128,22 +128,6 @@ static inline Status VerifyMessage(const uint8_t* data, int64_t size,
return Status::OK();
}
-/// Write a serialized message metadata with a length-prefix and padding to an
-/// 8-byte offset. Does not make assumptions about whether the stream is
-/// aligned already
-///
-/// <message_size: int32><message: const void*><padding>
-///
-/// \param[in] message a buffer containing the metadata to write
-/// \param[in] alignment the size multiple of the total message size including
-/// length prefix, metadata, and padding. Usually 8 or 64
-/// \param[in,out] file the OutputStream to write to
-/// \param[out] message_length the total size of the payload written including
-/// padding
-/// \return Status
-Status WriteMessage(const Buffer& message, int32_t alignment, io::OutputStream* file,
- int32_t* message_length);
-
// Serialize arrow::Schema as a Flatbuffer
//
// \param[in] schema a Schema instance
diff --git a/cpp/src/arrow/ipc/options.h b/cpp/src/arrow/ipc/options.h
index d380402..3570c06 100644
--- a/cpp/src/arrow/ipc/options.h
+++ b/cpp/src/arrow/ipc/options.h
@@ -36,6 +36,14 @@ struct ARROW_EXPORT IpcOptions {
// The maximum permitted schema nesting depth.
int max_recursion_depth = kMaxNestingDepth;
+ // Write padding after memory buffers to this multiple of
+ // bytes. Generally 8 or 64
+ int32_t alignment = 8;
+
+ /// \brief Write the pre-0.15.0 encapsulated IPC message format
+ /// consisting of a 4-byte prefix instead of 8 byte
+ bool write_legacy_ipc_format = false;
+
static IpcOptions Defaults();
};
diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc
index 9cbeacf..efd0a88 100644
--- a/cpp/src/arrow/ipc/read_write_test.cc
+++ b/cpp/src/arrow/ipc/read_write_test.cc
@@ -106,27 +106,69 @@ TEST(TestMessage, SerializeTo) {
std::shared_ptr<io::BufferOutputStream> stream;
- {
- const int32_t alignment = 8;
-
+ auto CheckWithAlignment = [&](int32_t alignment) {
+ IpcOptions options;
+ options.alignment = alignment;
+ const int32_t prefix_size = 8;
ASSERT_OK(io::BufferOutputStream::Create(1 << 10, default_memory_pool(), &stream));
- ASSERT_OK(message->SerializeTo(stream.get(), alignment, &output_length));
+ ASSERT_OK(message->SerializeTo(stream.get(), options, &output_length));
ASSERT_OK(stream->Tell(&position));
- ASSERT_EQ(BitUtil::RoundUp(metadata->size() + 4, alignment) + body_length,
+ ASSERT_EQ(BitUtil::RoundUp(metadata->size() + prefix_size, alignment) + body_length,
output_length);
ASSERT_EQ(output_length, position);
- }
+ };
- {
- const int32_t alignment = 64;
+ CheckWithAlignment(8);
+ CheckWithAlignment(64);
+}
- ASSERT_OK(io::BufferOutputStream::Create(1 << 10, default_memory_pool(), &stream));
- ASSERT_OK(message->SerializeTo(stream.get(), alignment, &output_length));
- ASSERT_OK(stream->Tell(&position));
- ASSERT_EQ(BitUtil::RoundUp(metadata->size() + 4, alignment) + body_length,
- output_length);
- ASSERT_EQ(output_length, position);
- }
+void BuffersOverlapEquals(const Buffer& left, const Buffer& right) {
+ ASSERT_GT(left.size(), 0);
+ ASSERT_GT(right.size(), 0);
+ ASSERT_TRUE(left.Equals(right, std::min(left.size(), right.size())));
+}
+
+TEST(TestMessage, LegacyIpcBackwardsCompatibility) {
+ std::shared_ptr<RecordBatch> batch;
+ ASSERT_OK(MakeIntBatchSized(36, &batch));
+
+ auto RoundtripWithOptions = [&](const IpcOptions& arg_options,
+ std::shared_ptr<Buffer>* out_serialized,
+ std::unique_ptr<Message>* out) {
+ internal::IpcPayload payload;
+ ASSERT_OK(internal::GetRecordBatchPayload(*batch, arg_options, default_memory_pool(),
+ &payload));
+
+ std::shared_ptr<io::BufferOutputStream> stream;
+ ASSERT_OK(io::BufferOutputStream::Create(1 << 20, default_memory_pool(), &stream));
+
+ int32_t metadata_length = -1;
+ ASSERT_OK(
+ internal::WriteIpcPayload(payload, arg_options, stream.get(), &metadata_length));
+
+ ASSERT_OK(stream->Finish(out_serialized));
+ io::BufferReader io_reader(*out_serialized);
+ ASSERT_OK(ReadMessage(&io_reader, out));
+ };
+
+ std::shared_ptr<Buffer> serialized, legacy_serialized;
+ std::unique_ptr<Message> message, legacy_message;
+
+ IpcOptions options;
+ RoundtripWithOptions(options, &serialized, &message);
+
+ // First 4 bytes 0xFFFFFFFF Continuation marker
+ ASSERT_EQ(-1, util::SafeLoadAs<int32_t>(serialized->data()));
+
+ options.write_legacy_ipc_format = true;
+ RoundtripWithOptions(options, &legacy_serialized, &legacy_message);
+
+ // Check that the continuation marker is not written
+ ASSERT_NE(-1, util::SafeLoadAs<int32_t>(legacy_serialized->data()));
+
+ // Have to use the smaller size to exclude padding
+ BuffersOverlapEquals(*legacy_message->metadata(), *message->metadata());
+ ASSERT_TRUE(legacy_message->body()->Equals(*message->body()));
}
TEST(TestMessage, Verify) {
@@ -635,13 +677,14 @@ TEST_F(RecursionLimits, StressLimit) {
#endif // !defined(_WIN32) || defined(NDEBUG)
struct FileWriterHelper {
- Status Init(const std::shared_ptr<Schema>& schema) {
+ Status Init(const std::shared_ptr<Schema>& schema, const IpcOptions& options) {
num_batches_written_ = 0;
RETURN_NOT_OK(AllocateResizableBuffer(0, &buffer_));
sink_.reset(new io::BufferOutputStream(buffer_));
-
- return RecordBatchFileWriter::Open(sink_.get(), schema, &writer_);
+ ARROW_ASSIGN_OR_RAISE(writer_,
+ RecordBatchFileWriter::Open(sink_.get(), schema, options));
+ return Status::OK();
}
Status WriteBatch(const std::shared_ptr<RecordBatch>& batch) {
@@ -680,11 +723,12 @@ struct FileWriterHelper {
};
struct StreamWriterHelper {
- Status Init(const std::shared_ptr<Schema>& schema) {
+ Status Init(const std::shared_ptr<Schema>& schema, const IpcOptions& options) {
RETURN_NOT_OK(AllocateResizableBuffer(0, &buffer_));
sink_.reset(new io::BufferOutputStream(buffer_));
-
- return RecordBatchStreamWriter::Open(sink_.get(), schema, &writer_);
+ ARROW_ASSIGN_OR_RAISE(writer_,
+ RecordBatchStreamWriter::Open(sink_.get(), schema, options));
+ return Status::OK();
}
Status WriteBatch(const std::shared_ptr<RecordBatch>& batch) {
@@ -718,7 +762,7 @@ class ReaderWriterMixin {
// Check simple RecordBatch roundtripping
template <typename Param>
- void TestRoundTrip(Param&& param) {
+ void TestRoundTrip(Param&& param, const IpcOptions& options) {
std::shared_ptr<RecordBatch> batch1;
std::shared_ptr<RecordBatch> batch2;
ASSERT_OK(param(&batch1)); // NOLINT clang-tidy gtest issue
@@ -727,7 +771,7 @@ class ReaderWriterMixin {
BatchVector in_batches = {batch1, batch2};
BatchVector out_batches;
- ASSERT_OK(RoundTripHelper(in_batches, &out_batches));
+ ASSERT_OK(RoundTripHelper(in_batches, options, &out_batches));
ASSERT_EQ(out_batches.size(), in_batches.size());
// Compare batches
@@ -741,7 +785,7 @@ class ReaderWriterMixin {
ASSERT_OK(MakeDictionary(&batch));
BatchVector out_batches;
- ASSERT_OK(RoundTripHelper({batch}, &out_batches));
+ ASSERT_OK(RoundTripHelper({batch}, IpcOptions::Defaults(), &out_batches));
ASSERT_EQ(out_batches.size(), 1);
// TODO(wesm): This was broken in ARROW-3144. I'm not sure how to
@@ -764,7 +808,7 @@ class ReaderWriterMixin {
schema = schema->WithMetadata(key_value_metadata({"some_key"}, {"some_value"}));
WriterHelper writer_helper;
- ASSERT_OK(writer_helper.Init(schema));
+ ASSERT_OK(writer_helper.Init(schema, IpcOptions::Defaults()));
// Writing a record batch with a different schema
ASSERT_RAISES(Invalid, writer_helper.WriteBatch(batch_ints));
// Writing a record batch with the same schema (except metadata)
@@ -781,9 +825,10 @@ class ReaderWriterMixin {
}
private:
- Status RoundTripHelper(const BatchVector& in_batches, BatchVector* out_batches) {
+ Status RoundTripHelper(const BatchVector& in_batches, const IpcOptions& options,
+ BatchVector* out_batches) {
WriterHelper writer_helper;
- RETURN_NOT_OK(writer_helper.Init(in_batches[0]->schema()));
+ RETURN_NOT_OK(writer_helper.Init(in_batches[0]->schema(), options));
for (const auto& batch : in_batches) {
RETURN_NOT_OK(writer_helper.WriteBatch(batch));
}
@@ -813,9 +858,21 @@ class TestFileFormat : public ReaderWriterMixin<FileWriterHelper>,
class TestStreamFormat : public ReaderWriterMixin<StreamWriterHelper>,
public ::testing::TestWithParam<MakeRecordBatch*> {};
-TEST_P(TestFileFormat, RoundTrip) { TestRoundTrip(*GetParam()); }
+TEST_P(TestFileFormat, RoundTrip) {
+ TestRoundTrip(*GetParam(), IpcOptions::Defaults());
-TEST_P(TestStreamFormat, RoundTrip) { TestRoundTrip(*GetParam()); }
+ IpcOptions options;
+ options.write_legacy_ipc_format = true;
+ TestRoundTrip(*GetParam(), options);
+}
+
+TEST_P(TestStreamFormat, RoundTrip) {
+ TestRoundTrip(*GetParam(), IpcOptions::Defaults());
+
+ IpcOptions options;
+ options.write_legacy_ipc_format = true;
+ TestRoundTrip(*GetParam(), options);
+}
INSTANTIATE_TEST_CASE_P(GenericIpcRoundTripTests, TestIpcRoundTrip, BATCH_CASES());
INSTANTIATE_TEST_CASE_P(FileRoundTripTests, TestFileFormat, BATCH_CASES());
@@ -912,13 +969,15 @@ void SpliceMessages(std::shared_ptr<Buffer> stream,
continue;
}
+ IpcOptions options;
internal::IpcPayload payload;
payload.type = msg->type();
payload.metadata = msg->metadata();
payload.body_buffers.push_back(msg->body());
payload.body_length = msg->body()->size();
int32_t unused_metadata_length = -1;
- ASSERT_OK(internal::WriteIpcPayload(payload, out.get(), &unused_metadata_length));
+ ASSERT_OK(
+ internal::WriteIpcPayload(payload, options, out.get(), &unused_metadata_length));
}
ASSERT_OK(out->Finish(spliced_stream));
}
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 81332a6..7127300 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -529,10 +529,9 @@ class DictionaryWriter : public RecordBatchSerializer {
int64_t dictionary_id_;
};
-Status WriteIpcPayload(const IpcPayload& payload, io::OutputStream* dst,
- int32_t* metadata_length) {
- RETURN_NOT_OK(internal::WriteMessage(*payload.metadata, kArrowIpcAlignment, dst,
- metadata_length));
+Status WriteIpcPayload(const IpcPayload& payload, const IpcOptions& options,
+ io::OutputStream* dst, int32_t* metadata_length) {
+ RETURN_NOT_OK(WriteMessage(*payload.metadata, options, dst, metadata_length));
#ifndef NDEBUG
RETURN_NOT_OK(CheckAligned(dst));
@@ -604,7 +603,7 @@ Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
// The body size is computed in the payload
*body_length = payload.body_length;
- return internal::WriteIpcPayload(payload, dst, metadata_length);
+ return internal::WriteIpcPayload(payload, options, dst, metadata_length);
}
Status WriteRecordBatchStream(const std::vector<std::shared_ptr<RecordBatch>>& batches,
@@ -625,7 +624,9 @@ Status WriteTensorHeader(const Tensor& tensor, io::OutputStream* dst,
int32_t* metadata_length) {
std::shared_ptr<Buffer> metadata;
RETURN_NOT_OK(internal::WriteTensorMessage(tensor, 0, &metadata));
- return internal::WriteMessage(*metadata, kTensorAlignment, dst, metadata_length);
+ IpcOptions options;
+ options.alignment = kTensorAlignment;
+ return WriteMessage(*metadata, options, dst, metadata_length);
}
Status WriteStridedTensorData(int dim_index, int64_t offset, int elem_size,
@@ -818,19 +819,7 @@ Status WriteSparseTensor(const SparseTensor& sparse_tensor, io::OutputStream* ds
RETURN_NOT_OK(writer.Assemble(sparse_tensor));
*body_length = payload.body_length;
- return internal::WriteIpcPayload(payload, dst, metadata_length);
-}
-
-Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,
- int64_t buffer_start_offset, io::OutputStream* dst,
- int32_t* metadata_length, int64_t* body_length, MemoryPool* pool) {
- auto options = IpcOptions::Defaults();
- internal::IpcPayload payload;
- RETURN_NOT_OK(GetDictionaryPayload(dictionary_id, dictionary, options, pool, &payload));
-
- // The body size is computed in the payload
- *body_length = payload.body_length;
- return internal::WriteIpcPayload(payload, dst, metadata_length);
+ return internal::WriteIpcPayload(payload, IpcOptions::Defaults(), dst, metadata_length);
}
Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) {
@@ -1022,20 +1011,24 @@ class StreamBookKeeper {
return Status::OK();
}
+ Status WriteEOS() {
+ // End of stream marker
+ constexpr int64_t kEos = 0;
+ return Write(&kEos, sizeof(kEos));
+ }
+
protected:
io::OutputStream* sink_;
int64_t position_;
};
-// End of stream marker
-constexpr int32_t kEos = 0;
-
/// A IpcPayloadWriter implementation that writes to a IPC stream
/// (with an end-of-stream marker)
class PayloadStreamWriter : public internal::IpcPayloadWriter,
protected StreamBookKeeper {
public:
- explicit PayloadStreamWriter(io::OutputStream* sink) : StreamBookKeeper(sink) {}
+ PayloadStreamWriter(const IpcOptions& options, io::OutputStream* sink)
+ : StreamBookKeeper(sink), options_(options) {}
~PayloadStreamWriter() override = default;
@@ -1046,23 +1039,24 @@ class PayloadStreamWriter : public internal::IpcPayloadWriter,
#endif
int32_t metadata_length = 0; // unused
- RETURN_NOT_OK(WriteIpcPayload(payload, sink_, &metadata_length));
+ RETURN_NOT_OK(WriteIpcPayload(payload, options_, sink_, &metadata_length));
RETURN_NOT_OK(UpdatePositionCheckAligned());
return Status::OK();
}
- Status Close() override {
- // Write 0 EOS message
- return Write(&kEos, sizeof(int32_t));
- }
+ Status Close() override { return WriteEOS(); }
+
+ private:
+ IpcOptions options_;
};
/// A IpcPayloadWriter implementation that writes to a IPC file
/// (with a footer as defined in File.fbs)
class PayloadFileWriter : public internal::IpcPayloadWriter, protected StreamBookKeeper {
public:
- PayloadFileWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema)
- : StreamBookKeeper(sink), schema_(schema) {}
+ PayloadFileWriter(const IpcOptions& options, const std::shared_ptr<Schema>& schema,
+ io::OutputStream* sink)
+ : StreamBookKeeper(sink), options_(options), schema_(schema) {}
~PayloadFileWriter() override = default;
@@ -1074,7 +1068,7 @@ class PayloadFileWriter : public internal::IpcPayloadWriter, protected StreamBoo
// Metadata length must include padding, it's computed by WriteIpcPayload()
FileBlock block = {position_, 0, payload.body_length};
- RETURN_NOT_OK(WriteIpcPayload(payload, sink_, &block.metadata_length));
+ RETURN_NOT_OK(WriteIpcPayload(payload, options_, sink_, &block.metadata_length));
RETURN_NOT_OK(UpdatePositionCheckAligned());
// Record position and size of some message types, to list them in the footer
@@ -1107,7 +1101,7 @@ class PayloadFileWriter : public internal::IpcPayloadWriter, protected StreamBoo
Status Close() override {
// Write 0 EOS message for compatibility with sequential readers
- RETURN_NOT_OK(Write(&kEos, sizeof(int32_t)));
+ RETURN_NOT_OK(WriteEOS());
// Write file footer
RETURN_NOT_OK(UpdatePosition());
@@ -1128,6 +1122,7 @@ class PayloadFileWriter : public internal::IpcPayloadWriter, protected StreamBoo
}
protected:
+ IpcOptions options_;
std::shared_ptr<Schema> schema_;
std::vector<FileBlock> dictionaries_;
std::vector<FileBlock> record_batches_;
@@ -1141,9 +1136,9 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl
RecordBatchStreamWriterImpl(io::OutputStream* sink,
const std::shared_ptr<Schema>& schema,
const IpcOptions& options)
- : RecordBatchPayloadWriter(
- std::unique_ptr<internal::IpcPayloadWriter>(new PayloadStreamWriter(sink)),
- schema, options) {}
+ : RecordBatchPayloadWriter(std::unique_ptr<internal::IpcPayloadWriter>(
+ new PayloadStreamWriter(options, sink)),
+ schema, options) {}
~RecordBatchStreamWriterImpl() = default;
};
@@ -1153,7 +1148,7 @@ class RecordBatchFileWriter::RecordBatchFileWriterImpl : public RecordBatchPaylo
RecordBatchFileWriterImpl(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
const IpcOptions& options)
: RecordBatchPayloadWriter(std::unique_ptr<internal::IpcPayloadWriter>(
- new PayloadFileWriter(sink, schema)),
+ new PayloadFileWriter(options, schema, sink)),
schema, options) {}
~RecordBatchFileWriterImpl() = default;
@@ -1277,7 +1272,7 @@ Status SerializeSchema(const Schema& schema, DictionaryMemo* dictionary_memo,
RETURN_NOT_OK(io::BufferOutputStream::Create(1024, pool, &stream));
auto options = IpcOptions::Defaults();
- auto payload_writer = make_unique<PayloadStreamWriter>(stream.get());
+ auto payload_writer = make_unique<PayloadStreamWriter>(options, stream.get());
RecordBatchPayloadWriter writer(std::move(payload_writer), schema, options,
dictionary_memo);
// Write schema and populate fields (but not dictionaries) in dictionary_memo
diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h
index 75030c2..c673b0a 100644
--- a/cpp/src/arrow/ipc/writer.h
+++ b/cpp/src/arrow/ipc/writer.h
@@ -177,7 +177,9 @@ class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter {
std::unique_ptr<RecordBatchFileWriterImpl> file_impl_;
};
-/// \brief Low-level API for writing a record batch (without schema) to an OutputStream
+/// \brief Low-level API for writing a record batch (without schema)
+/// to an OutputStream as encapsulated IPC message. See Arrow format
+/// documentation for more detail.
///
/// \param[in] batch the record batch to write
/// \param[in] buffer_start_offset the start offset to use in the buffer metadata,
@@ -189,20 +191,6 @@ class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter {
/// \param[in] options options for serialization
/// \param[in] pool the memory pool to allocate memory from
/// \return Status
-///
-/// Write the RecordBatch (collection of equal-length Arrow arrays) to the
-/// output stream in a contiguous block. The record batch metadata is written as
-/// a flatbuffer (see format/Message.fbs -- the RecordBatch message type)
-/// prefixed by its size, followed by each of the memory buffers in the batch
-/// written end to end (with appropriate alignment and padding):
-///
-/// \code
-/// <int32: metadata size> <uint8*: metadata> <buffers ...>
-/// \endcode
-///
-/// Finally, the absolute offsets (relative to the start of the output stream)
-/// to the end of the body and end of the metadata / data header (suffixed by
-/// the header size) is returned in out-variables
ARROW_EXPORT
Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
io::OutputStream* dst, int32_t* metadata_length,
@@ -392,8 +380,8 @@ Status GetRecordBatchPayload(const RecordBatch& batch, const IpcOptions& options
MemoryPool* pool, IpcPayload* out);
ARROW_EXPORT
-Status WriteIpcPayload(const IpcPayload& payload, io::OutputStream* dst,
- int32_t* metadata_length);
+Status WriteIpcPayload(const IpcPayload& payload, const IpcOptions& options,
+ io::OutputStream* dst, int32_t* metadata_length);
} // namespace internal
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 2c05ec5..87393fb 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -964,6 +964,11 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
MessageType_V4" arrow::ipc::MetadataVersion::V4"
cdef cppclass CIpcOptions" arrow::ipc::IpcOptions":
+ c_bool allow_64bit
+ int max_recursion_depth
+ int32_t alignment
+ c_bool write_legacy_ipc_format
+
@staticmethod
CIpcOptions Defaults()
@@ -989,7 +994,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
MetadataVersion metadata_version()
MessageType type()
- CStatus SerializeTo(OutputStream* stream, int32_t alignment,
+ CStatus SerializeTo(OutputStream* stream, const CIpcOptions& options,
int64_t* output_length)
c_string FormatMessageType(MessageType type)
diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi
index b1aca23..6710f63 100644
--- a/python/pyarrow/ipc.pxi
+++ b/python/pyarrow/ipc.pxi
@@ -76,13 +76,14 @@ cdef class Message:
"""
cdef:
int64_t output_length = 0
- int32_t c_alignment = alignment
OutputStream* out
+ CIpcOptions options
+ options.alignment = alignment
out = sink.get_output_stream().get()
with nogil:
check_status(self.message.get()
- .SerializeTo(out, c_alignment, &output_length))
+ .SerializeTo(out, options, &output_length))
def serialize(self, alignment=8, memory_pool=None):
"""
[arrow] 02/03: ARROW-6316: [Go] implement new ARROW format with
32b-aligned buffers
Posted by we...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch ARROW-6313-flatbuffer-alignment
in repository https://gitbox.apache.org/repos/asf/arrow.git
commit fa96d4fcd52d7da135197bc7c4d219172c513a04
Author: Sebastien Binet <bi...@cern.ch>
AuthorDate: Thu Sep 5 09:53:34 2019 -0500
ARROW-6316: [Go] implement new ARROW format with 32b-aligned buffers
Closes #5251 from sbinet/issue-6316 and squashes the following commits:
7427340b8 <Sebastien Binet> ARROW-6316: implement new ARROW format with 32b-aligned buffers
Authored-by: Sebastien Binet <bi...@cern.ch>
Signed-off-by: Wes McKinney <we...@apache.org>
---
go/arrow/ipc/ipc.go | 5 +++--
go/arrow/ipc/message.go | 27 +++++++++++++++++++++++----
go/arrow/ipc/metadata.go | 31 +++++++++++++++++++++++++------
3 files changed, 51 insertions(+), 12 deletions(-)
diff --git a/go/arrow/ipc/ipc.go b/go/arrow/ipc/ipc.go
index e688974..e5c7d1d 100644
--- a/go/arrow/ipc/ipc.go
+++ b/go/arrow/ipc/ipc.go
@@ -37,8 +37,9 @@ const (
)
var (
- paddingBytes [kArrowAlignment]byte
- kEOS = [4]byte{0, 0, 0, 0} // end of stream message
+ paddingBytes [kArrowAlignment]byte
+ kEOS = [8]byte{0, 0, 0, 0, 0, 0, 0, 0} // end of stream message
+ kIPCContToken uint32 = 0xFFFFFFFF // 32b continuation indicator for FlatBuffers 8b alignment
)
func paddedLength(nbytes int64, alignment int32) int64 {
diff --git a/go/arrow/ipc/message.go b/go/arrow/ipc/message.go
index bb12dbb..bfd9494 100644
--- a/go/arrow/ipc/message.go
+++ b/go/arrow/ipc/message.go
@@ -181,12 +181,31 @@ func (r *MessageReader) Message() (*Message, error) {
var buf = make([]byte, 4)
_, err := io.ReadFull(r.r, buf)
if err != nil {
- return nil, errors.Wrap(err, "arrow/ipc: could not read message length")
+ return nil, errors.Wrap(err, "arrow/ipc: could not read continuation indicator")
}
- msgLen := int32(binary.LittleEndian.Uint32(buf))
- if msgLen == 0 {
- // optional 0 EOS control message
+ var (
+ cid = binary.LittleEndian.Uint32(buf)
+ msgLen int32
+ )
+ switch cid {
+ case 0:
+ // EOS message.
return nil, io.EOF // FIXME(sbinet): send nil instead? or a special EOS error?
+ case kIPCContToken:
+ _, err = io.ReadFull(r.r, buf)
+ if err != nil {
+ return nil, errors.Wrap(err, "arrow/ipc: could not read message length")
+ }
+ msgLen = int32(binary.LittleEndian.Uint32(buf))
+ if msgLen == 0 {
+ // optional 0 EOS control message
+ return nil, io.EOF // FIXME(sbinet): send nil instead? or a special EOS error?
+ }
+
+ default:
+ // ARROW-6314: backwards compatibility for reading old IPC
+ // messages produced prior to version 0.15.0
+ msgLen = int32(cid)
}
buf = make([]byte, msgLen)
diff --git a/go/arrow/ipc/metadata.go b/go/arrow/ipc/metadata.go
index 7b4e3da..034d3e1 100644
--- a/go/arrow/ipc/metadata.go
+++ b/go/arrow/ipc/metadata.go
@@ -88,7 +88,19 @@ func (blk fileBlock) NewMessage() (*Message, error) {
if err != nil {
return nil, errors.Wrap(err, "arrow/ipc: could not read message metadata")
}
- meta := memory.NewBufferBytes(buf[4:]) // drop buf-size already known from blk.Meta
+
+ prefix := 0
+ switch binary.LittleEndian.Uint32(buf) {
+ case 0:
+ case kIPCContToken:
+ prefix = 8
+ default:
+ // ARROW-6314: backwards compatibility for reading old IPC
+ // messages produced prior to version 0.15.0
+ prefix = 4
+ }
+
+ meta := memory.NewBufferBytes(buf[prefix:]) // drop buf-size already known from blk.Meta
buf = make([]byte, blk.Body)
_, err = io.ReadFull(r, buf)
@@ -1002,19 +1014,26 @@ func writeMessage(msg *memory.Buffer, alignment int32, w io.Writer) (int, error)
)
// ARROW-3212: we do not make any assumption on whether the output stream is aligned or not.
- paddedMsgLen := int32(msg.Len()) + 4
+ paddedMsgLen := int32(msg.Len()) + 8
remainder := paddedMsgLen % alignment
if remainder != 0 {
paddedMsgLen += alignment - remainder
}
+ tmp := make([]byte, 4)
+
+ // write continuation indicator, to address 8-byte alignment requirement from FlatBuffers.
+ binary.LittleEndian.PutUint32(tmp, kIPCContToken)
+ _, err = w.Write(tmp)
+ if err != nil {
+ return 0, errors.Wrap(err, "arrow/ipc: could not write continuation bit indicator")
+ }
+
// the returned message size includes the length prefix, the flatbuffer, + padding
n = int(paddedMsgLen)
- tmp := make([]byte, 4)
-
// write the flatbuffer size prefix, including padding
- sizeFB := paddedMsgLen - 4
+ sizeFB := paddedMsgLen - 8
binary.LittleEndian.PutUint32(tmp, uint32(sizeFB))
_, err = w.Write(tmp)
if err != nil {
@@ -1028,7 +1047,7 @@ func writeMessage(msg *memory.Buffer, alignment int32, w io.Writer) (int, error)
}
// write any padding
- padding := paddedMsgLen - int32(msg.Len()) - 4
+ padding := paddedMsgLen - int32(msg.Len()) - 8
if padding > 0 {
_, err = w.Write(paddingBytes[:padding])
if err != nil {