You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/09/11 22:07:33 UTC

[arrow] branch ARROW-6313-flatbuffer-alignment updated (4f9b887 -> 0352456)

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a change to branch ARROW-6313-flatbuffer-alignment
in repository https://gitbox.apache.org/repos/asf/arrow.git.


    omit 4f9b887  ARROW-6315: [Java] Make change to ensure flatbuffer reads are aligned
    omit 8bcef56  ARROW-6316: [Go] implement new ARROW format with 32b-aligned buffers
    omit 397d0f8  ARROW-6314: [C++] Implement IPC message format alignment changes, provide backwards compatibility and "legacy" option to emit old message format
     add 1c8c218  ARROW-6113: [Java] Support vector deduplicate function
     add e0fedf6  ARROW-6304: [Java][Doc] Add a description to each module
     add 6f7cc9b  ARROW-6136: [FlightRPC][Java] don't double-close response stream
     add 4e77035  ARROW-6297: [Java] Compare ArrowBufPointers by unsinged integers
     add b4eeab0  ARROW-6306: [Java] Support stable sort by stable comparators
     add d4d4a12  ARROW-6334: [Java] Improve the dictionary builder API to return the position of the value in the dictionary
     add e29732b  ARROW-6351: [Ruby] Improve Arrow#values performance
     add a1dbba8  ARROW-6354: [C++] Fix failing build when ARROW_PARQUET=OFF
     add 67d46c7  ARROW-4511: [Format][Docs] Revamp Format documentation, consolidate columnar format docs into a more coherent single document. Add Versioning/Stability page
     add e5ccef5  ARROW-5522: [Packaging][Documentation] Comments out of date in python/manylinux1/build_arrow.sh
     add 443ac07  ARROW-4648: [Doc] Add documentation about C++ file naming
     add 63dbc12  ARROW-6376: [Developer] Use target ref of PR when merging instead of hard-coding "master"
     add 05bc63c  ARROW-6263: [Python] Use RecordBatch::Validate in RecordBatch.from_arrays. Normalize API vs. Table.from_arrays. Add record_batch factory function
     add 115de28  ARROW-6373: [C++] Make FixedWidthBinaryBuilder consistent with other fixed width builders in zeroing memory when appending null batches
     add 53384de  ARROW-6372: [Rust][Datafusion] Casting from Un-signed to Signed Integers not supported
     add 7ec1731  ARROW-453: [C++] Filesystem implementation for Amazon S3
     add b9d8cd5  ARROW-6381: [C++] BufferOutputStream::Write does extra work that slows down small writes
     add ab712d2  ARROW-6384: [C++] Bump dependency versions
     add bcf5897  ARROW-6348: [R] arrow::read_csv_arrow namespace error when package not loaded
     add 407973b  ARROW-6371: [Doc] Row to columnar conversion example mentions arrow::Column in comments
     add 99cdb7e  ARROW-6144: [C++][Gandiva] Implement random functions in Gandiva
     add 99b0e30  ARROW-6387: [Archery] Errors with make
     add beea8f9  ARROW-6231: [C++] Allow generating CSV column names
     add 157b179  ARROW-6078: [Java] Implement dictionary-encoded subfields for List type
     add 2f3ea96  ARROW-6094: [FlightRPC] Add Flight RPC method getFlightSchema
     add c2762a6  ARROW-2769: [Python] Deprecate and rename add_metadata methods
     add f104f2d  ARROW-6265: [Java] Avro adapter implement Array/Map/Fixed type
     add 5a8285d  ARROW-4095: [C++] Optimize DictionaryArray::Transpose() for trivial transpositions
     add 0b41e53  ARROW-6397: [C++][CI] Generate minio server connect string
     add 8cacb2f  ARROW-6031: [Java] Support iterating a vector by ArrowBufPointer
     add 7c46c27  ARROW-6247: [Java] Provide a common interface for float4 and float8 vectors
     add 7d7e25e  ARROW-6402: [C++] Suppress sign-compare warning with g++ 9.2.1
     add a1b477f  ARROW-5300: [C++] Remove the ARROW_NO_DEFAULT_MEMORY_POOL macro
     add 2164e3b  ARROW-4398: [C++][Python][Parquet] Improve BYTE_ARRAY PLAIN encoding write performance. Add BYTE_ARRAY write benchmarks
     add 95640a0  ARROW-6347: [GLib] Add garrow_array_diff()
     add 7d63dfe  ARROW-6406: [C++] Fix jemalloc URL for offline build in thirdparty/versions.txt
     add a179933  ARROW-6392: [FlightRPC][Python] check type of list_flights result
     add b74b027  ARROW-6403: [Python] Expose FileReader::ReadRowGroups() to Python
     add a985483  ARROW-4752: [Rust] Add explicit SIMD vectorization for the divide kernel
     add 149d4cb  ARROW-6383: [Java] Report outstanding child allocators on close
     add 32ef12c  ARROW-6063: [FlightRPC] implement half-closed semantics for DoPut
     add 1c42e6f  ARROW-6141: [C++] Enable memory-mapping a file region
     add ab908cc  ARROW-6411: [Python][Parquet] Improve performance of DictEncoder::PutIndices
     add 9517ade  ARROW-6269: [C++] check decimal precision in IPC code
     add c39e350  ARROW-5610: [Python] define extension types in Python
     add 48d9069  ARROW-6422: [Gandiva] Fix double-conversion linker issue
     add 561f86d  ARROW-6412: [C++] Improve TCP port allocation in tests
     add 327057e  ARROW-6424: [C++] Fix IPC fuzzing test name
     add 6b714a1  ARROW-6423: [C++] Fix crash when trying to instantiate Snappy CompressedOutputStream
     add 4a7dd43  ARROW-6415: [R] Remove usage of R CMD config CXXCPP
     add d818299  ARROW-6358: [C++] Add FileSystem::DeleteDirContents
     add 96928d5  ARROW-6296: [Java] Cleanup JDBC interfaces and eliminate one memcopy for binary/varchar fields
     add 26f631f  ARROW-4836: [C++] Support Tell() on compressed streams
     add ea309dd  ARROW-6450: [C++] Use 2x reallocation strategy in BufferBuilder instead of 1.5x
     add 6330b2f  ARROW-6355: [Java] Make range equal visitor reusable
     add c0656af  ARROW-6432: [CI][Crossbow] Remove alpine nightly crossbow jobs
     add a6e0599  ARROW-6451: [Format] Add clarifications to Columnar.rst about the contents of "null" slots in Varbinary or List arrays
     add eebae5f  ARROW-6416: [Python] Improve API & documentation regarding chunksizes
     add 131ae4d  ARROW-6454: [LICENSE] Add LLVM's license due to static linkage
     add ab8ee21  ARROW-6418: [C++][Plasma] Remove cmake project directive for plasma
     add 552820a  ARROW-5876: [C++][Python] add basic auth flight proto message to C++ and Python
     add 243d488  ARROW-6431: [Python] Test suite fails without pandas installed
     add 314e9f0  ARROW-6457: [C++] Always set CMAKE_BUILD_TYPE if it is not defined
     add d0abe12  ARROW-6441: [Packaging][RPM] Follow plasma-store-server name change
     add 1138b9b  ARROW-6440: [Packaging][deb] Follow plasma-store-server name change
     add 934c18e  ARROW-6462: [C++] Fix build error on CentOS 6 x86_64 with bundled double-conversion
     add d2be6a5  ARROW-6453: [C++] More informative error messages with S3
     add 8cfa163  ARROW-6447: [C++] Allow rest of arrow_objlib to build in parallel while memory_pool.cc is waiting on jemalloc_ep
     add 4a5d10e  ARROW-5558: [C++] Support Array::View on arrays with non-zero offset
     add 26d72f3  ARROW-6318: [Integration] Run tests against pregenerated files
     add 45e41ca  ARROW-6417: [C++][Parquet] Miscellaneous optimizations yielding slightly better Parquet binary read performance
     add b829f53  ARROW-6385: [C++] Use xxh3 instead of custom hashing code for non-tiny strings
     add 5931d59  ARROW-6443: [CI][Crossbow] Nightly conda osx builds fail
     add 2620ed1  ARROW-6242: [C++][Dataset] Implement Dataset, Scanner and ScannerBuilder
     add f0efc3b  ARROW-6461: [Java] Prevent EchoServer from closing the client socket after writing
     add 40d08a7  ARROW-6433: [Java][CI] Fix java docker image
     add c0dbf71  ARROW-6369: [C++] Handle Array.to_pandas case for type=list<bool>
     add 200e308  ARROW-6120: [C++] Forbid use of <iostream> in public header files
     add b8ebc9d  ARROW-6475: [C++] Don't try to dictionary encode dictionary arrays
     add 53c5af0  ARROW-6478: [C++] Revert to jemalloc stable-4 until we understand 5.2.x performance issues
     add e29e267  ARROW-6435: [Python] Use pandas null coding consistently on List and Struct types
     add a89300b  ARROW-6476: [Java][CI] Fix java docker build script
     add 1137de9  ARROW-5292: [C++] Work around symbol visibility issues so building static libraries is not necessary when building unit tests on WIN32 platform
     add d7ef11f  ARROW-6171: [R][CI] Fix R library search path
     add fb51ecf  ARROW-6356: [Java] Avro adapter implement Enum type and nested Record
     add 03e6c0b  ARROW-4880: [Python] Rehabilitate ASV benchmark build scripts
     add 1f893a8  ARROW-3933: [C++][Parquet] Handle non-nullable struct children when reading Parquet file, better error messages
     add 9ca682b  ARROW-5125: [Python] Round-trip extreme dates on windows
     add 6ed87b1  ARROW-6477: [Packaging][Crossbow] Use Azure Pipelines to build linux packages
     add 0158ae1  ARROW-6446: [OSX][Python][Wheel] Turn off ORC feature in the wheel building scripts
     add d6bc0b6  ARROW-6326: [C++] Nullable fields when converting std::tuple to Table
     add 72f4cac  ARROW-6100: [Rust] Pin to specific nightly rust for reproducible/stable builds
     add 3145e9b  ARROW-6408: [Rust] use "if cfg!" pattern
     add dadfd48  ARROW-6487: [Rust] [DataFusion] Introduce common test module
     add 7acada9  ARROW-6427: [GLib] Add support for column names autogeneration CSV read option
     add e9f35a8  ARROW-6331: [Java] Incorporate ErrorProne into the java build
     add 7749d88  ARROW-6489: [Developer][Documentation] Fix merge script and readme
     add 1f57ba1  ARROW-6491: [Java][Hotfix] fix master fail caused by ErrorProne
     add 04c8e89  ARROW-6490: [Java][Memory] Log error for leak in allocator close
     add 92f16e3  ARROW-6368: [C++][Dataset] Add interface for "projecting" RecordBatch from one schema to another, inserting null values where needed
     add 9c2694e  ARROW-5374: [Python][C++] Improve ipc.read_record_batch docstring, fix IPC message type error messages generated in C++
     add 62403aa  ARROW-3643: [Rust] optimize BooleanBufferBuilder::append_slice
     add 1bdb4a0  ARROW-5722: [Rust] Implement Debug for List/Struct/BinaryArray
     add ef426a7  ARROW-6365: [R] Should be able to coerce numeric to integer with schema
     add 44e7f1d  ARROW-6492: [Python] Handle pandas_metadata created by fastparquet with missing field_name
     add 19681ff  ARROW-6465: [Python] Improvement to Windows build instructions
     add 74d8296  ARROW-3762: [Python] Add large_memory unit test exercising BYTE_ARRAY overflow edge cases from ARROW-3762
     add 4f7ead8  ARROW-6413: [R] Support autogenerating column names
     add c97c64b  ARROW-3651: [Python] Handle 'datetime' logical type when reconstructing pandas columns from custom metadata
     add dd29b04  ARROW-6300: [C++] Add Abort() method to streams
     add 165b02d  ARROW-5471: [C++][Gandiva] Array offset is ignored in Gandiva projector
     add a32112f  ARROW-6292: [C++] Add option to use the mimalloc allocator
     add c0baff4  ARROW-5743: [C++] Add cmake option and macros for enabling large memory tests
     add 1c746c5  ARROW-6484: [Java] Enable create indexType for DictionaryEncoding according to dictionary value count
     add 1dfa258  ARROW-6502: [GLib][CI] Pin gobject-introspection gem to 3.3.7
     add 3f2a33f  ARROW-6480: [Crossbow] Summary report e-mailer with polling logic
     add b1025c2  ARROW-6481: [C++] Avoid copying large ConvertOptions
     add 0fbaff6  ARROW-5736: [Format][C++] Support small bit-width indices in sparse tensor
     add 9dec79b  ARROW-5505: [R] Normalize file and class names, stop masking base R functions, add vignette, improve documentation
     add 6d25dfd  ARROW-5646: [Crossbow][Documentation] Move the user guide to the Sphinx documentation
     add 3437c97  ARROW-6524: [Developer][Packaging] Nightly build report's subject should contain Arrow
     add 1d27386  ARROW-6506: [C++] Fix validation of ExtensionArray with struct storage type
     add 28bfd2b  ARROW-6101: [Rust] [DataFusion] Parallel execution of physical query plan
     add b23c989  ARROW-6426: [FlightRPC][C++][Java] Expose gRPC configuration knobs
     add 6f72457  ARROW-5450: [Python] Always return datetime.datetime in TimestampValue.as_py for units other than nanoseconds
     add 54dfc00  ARROW-1324: [C++] Add support for bundled Boost with MSVC
     add 2cedd6b  ARROW-6015: [Python] Add note to python/README.md about installing Visual C++ Redistributable on Windows when using pip
     add a6b118b  ARROW-6522: [Python] Fix failing pandas tests on older pandas / older python
     add d2b6d11  ARROW-6243: [C++][Dataset] Filter expressions
     add ddfebf6  ARROW-6360: [R] Update support for compression
     new 9514e2e  ARROW-6314: [C++] Implement IPC message format alignment changes, provide backwards compatibility and "legacy" option to emit old message format
     new fa96d4f  ARROW-6316: [Go] implement new ARROW format with 32b-aligned buffers
     new 0352456  ARROW-6315: [Java] Make change to ensure flatbuffer reads are aligned

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (4f9b887)
            \
             N -- N -- N   refs/heads/ARROW-6313-flatbuffer-alignment (0352456)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/windows-msvc-cpp.yml             |   52 +
 .travis.yml                                        |    1 +
 LICENSE.txt                                        |   74 +
 appveyor.yml                                       |    3 +
 c_glib/Gemfile                                     |    2 +-
 c_glib/arrow-glib/basic-array.cpp                  |   29 +
 c_glib/arrow-glib/basic-array.h                    |    5 +-
 c_glib/arrow-glib/reader.cpp                       |   29 +-
 c_glib/arrow-glib/record-batch.cpp                 |   15 +-
 c_glib/test/test-array.rb                          |   27 +
 c_glib/test/test-csv-reader.rb                     |   17 +
 ci/appveyor-cpp-setup.bat                          |    5 +
 ci/conda_env_cpp.yml                               |    1 +
 c_glib/Gemfile => ci/conda_env_crossbow.txt        |   14 +-
 ci/cpp-msvc-build-main.bat                         |    2 +
 ci/docker_build_cpp.sh                             |    4 +-
 ci/docker_build_java.sh                            |    6 +-
 ci/travis_before_script_c_glib.sh                  |    4 +-
 ci/travis_before_script_cpp.sh                     |    6 +
 ci/travis_install_linux.sh                         |    8 +
 ci/travis_script_integration.sh                    |    3 +-
 cpp/CMakeLists.txt                                 |   28 +-
 cpp/build-support/lint_cpp_cli.py                  |    1 +
 cpp/build-support/run_cpplint.py                   |    1 +
 cpp/cmake_modules/BuildUtils.cmake                 |    3 +
 cpp/cmake_modules/DefineOptions.cmake              |    6 +
 cpp/cmake_modules/ThirdpartyToolchain.cmake        |  202 ++-
 cpp/examples/arrow/row-wise-conversion-example.cc  |   12 +-
 cpp/examples/parquet/CMakeLists.txt                |   12 +-
 cpp/src/arrow/CMakeLists.txt                       |   33 +-
 cpp/src/arrow/allocator.h                          |    3 +
 cpp/src/arrow/array.cc                             |  198 ++-
 cpp/src/arrow/array.h                              |   31 +-
 cpp/src/arrow/array/builder_binary.cc              |   13 +-
 cpp/src/arrow/array/builder_binary.h               |    2 +-
 cpp/src/arrow/array/builder_union.cc               |    8 +-
 cpp/src/arrow/array_dict_test.cc                   |  102 +-
 cpp/src/arrow/array_test.cc                        |   38 +-
 cpp/src/arrow/array_view_test.cc                   |   30 +-
 cpp/src/arrow/buffer_builder.h                     |   13 +-
 cpp/src/arrow/builder_benchmark.cc                 |   48 +
 cpp/src/arrow/compute/kernels/compare.cc           |    5 +-
 cpp/src/arrow/csv/column_builder.cc                |    8 +-
 cpp/src/arrow/csv/column_builder_test.cc           |   92 +-
 cpp/src/arrow/csv/converter.h                      |    4 +-
 cpp/src/arrow/csv/converter_benchmark.cc           |   12 +
 cpp/src/arrow/csv/options.h                        |    7 +-
 cpp/src/arrow/csv/reader.cc                        |   40 +-
 cpp/src/arrow/dataset/CMakeLists.txt               |   55 +-
 cpp/src/arrow/dataset/api.h                        |    1 +
 cpp/src/arrow/dataset/dataset.cc                   |   15 +
 cpp/src/arrow/dataset/dataset.h                    |   21 +-
 cpp/src/arrow/dataset/dataset_internal.h           |  196 +++
 cpp/src/arrow/dataset/dataset_test.cc              |  149 +-
 cpp/src/arrow/dataset/file_parquet_test.cc         |   82 +
 cpp/src/arrow/dataset/filter.cc                    |  942 ++++++++++++
 cpp/src/arrow/dataset/filter.h                     |  372 ++++-
 cpp/src/arrow/dataset/filter_test.cc               |  241 +++
 cpp/src/arrow/dataset/scanner.cc                   |   79 +-
 cpp/src/arrow/dataset/scanner.h                    |   52 +-
 .../dataset/{dataset_test.cc => scanner_test.cc}   |   46 +-
 cpp/src/arrow/dataset/test_util.h                  |  182 +--
 cpp/src/arrow/dataset/type_fwd.h                   |   15 +
 cpp/src/arrow/extension_type_test.cc               |   59 +
 cpp/src/arrow/filesystem/CMakeLists.txt            |   11 +
 cpp/src/arrow/filesystem/filesystem.cc             |   26 +
 cpp/src/arrow/filesystem/filesystem.h              |   10 +
 cpp/src/arrow/filesystem/filesystem_test.cc        |   12 +-
 cpp/src/arrow/filesystem/localfs.cc                |   12 +
 cpp/src/arrow/filesystem/localfs.h                 |    1 +
 cpp/src/arrow/filesystem/mockfs.cc                 |   41 +-
 cpp/src/arrow/filesystem/mockfs.h                  |    1 +
 cpp/src/arrow/filesystem/path_util.cc              |    7 +
 cpp/src/arrow/filesystem/path_util.h               |    9 +
 cpp/src/arrow/filesystem/s3_internal.h             |  186 +++
 cpp/src/arrow/filesystem/s3fs.cc                   | 1264 ++++++++++++++++
 cpp/src/arrow/filesystem/{mockfs.h => s3fs.h}      |   87 +-
 cpp/src/arrow/filesystem/s3fs_narrative_test.cc    |  231 +++
 cpp/src/arrow/filesystem/s3fs_test.cc              |  740 +++++++++
 cpp/src/arrow/filesystem/test_util.cc              |  192 ++-
 cpp/src/arrow/filesystem/test_util.h               |   42 +-
 cpp/src/arrow/flight/client.cc                     |   71 +-
 cpp/src/arrow/flight/client.h                      |   25 +
 cpp/src/arrow/flight/flight_benchmark.cc           |  185 ++-
 cpp/src/arrow/flight/flight_test.cc                |  172 ++-
 cpp/src/arrow/flight/internal.cc                   |   27 +-
 cpp/src/arrow/flight/internal.h                    |    4 +
 cpp/src/arrow/flight/perf_server.cc                |   24 +
 cpp/src/arrow/flight/server.cc                     |   38 +-
 cpp/src/arrow/flight/server.h                      |   23 +
 cpp/src/arrow/flight/test_util.cc                  |   71 +-
 cpp/src/arrow/flight/test_util.h                   |   34 +-
 cpp/src/arrow/flight/types.cc                      |   19 +
 cpp/src/arrow/flight/types.h                       |   28 +
 cpp/src/arrow/io/buffered.cc                       |   29 +-
 cpp/src/arrow/io/buffered.h                        |    2 +
 cpp/src/arrow/io/compressed.cc                     |   56 +-
 cpp/src/arrow/io/compressed.h                      |    2 +
 cpp/src/arrow/io/compressed_test.cc                |   37 +-
 cpp/src/arrow/io/file.cc                           |  108 +-
 cpp/src/arrow/io/file.h                            |    5 +
 cpp/src/arrow/io/file_test.cc                      |   58 +-
 cpp/src/arrow/io/interfaces.cc                     |   14 +
 cpp/src/arrow/io/interfaces.h                      |   22 +
 cpp/src/arrow/io/memory.cc                         |   26 +-
 cpp/src/arrow/io/memory.h                          |    2 +
 cpp/src/arrow/io/memory_benchmark.cc               |   48 +-
 .../{dataset/scanner.cc => io/util_internal.h}     |   15 +-
 cpp/src/arrow/ipc/metadata_internal.cc             |   53 +-
 cpp/src/arrow/ipc/metadata_internal.h              |    9 +
 cpp/src/arrow/ipc/read_write_test.cc               |  184 ++-
 cpp/src/arrow/ipc/reader.cc                        |   55 +-
 cpp/src/arrow/memory_pool.cc                       |  337 +++--
 cpp/src/arrow/memory_pool.h                        |   25 +-
 cpp/src/arrow/memory_pool_test.cc                  |   54 +-
 cpp/src/arrow/python/arrow_to_pandas.cc            |    6 +
 cpp/src/arrow/python/extension_type.cc             |   26 +-
 cpp/src/arrow/python/extension_type.h              |   11 +-
 cpp/src/arrow/python/flight.cc                     |   31 +
 cpp/src/arrow/python/flight.h                      |   19 +
 cpp/src/arrow/python/helpers.cc                    |    7 +
 cpp/src/arrow/python/helpers.h                     |    3 +
 cpp/src/arrow/python/io.cc                         |   63 +-
 cpp/src/arrow/python/io.h                          |    2 +
 cpp/src/arrow/python/numpy_to_arrow.cc             |    2 -
 cpp/src/arrow/python/pyarrow_lib.h                 |    2 -
 cpp/src/arrow/python/python_to_arrow.cc            |  263 ++--
 cpp/src/arrow/python/util/datetime.h               |   15 +-
 cpp/src/arrow/record_batch.cc                      |   54 +-
 cpp/src/arrow/record_batch.h                       |   16 +-
 cpp/src/arrow/scalar.cc                            |   27 +
 cpp/src/arrow/scalar.h                             |   18 +
 cpp/src/arrow/sparse_tensor.cc                     |  189 ++-
 cpp/src/arrow/sparse_tensor.h                      |   30 +-
 cpp/src/arrow/sparse_tensor_test.cc                |  389 +++--
 cpp/src/arrow/stl.h                                |   26 +-
 cpp/src/arrow/stl_test.cc                          |  172 +++
 cpp/src/arrow/table.cc                             |    2 +-
 cpp/src/arrow/testing/generator.cc                 |   94 ++
 cpp/src/arrow/testing/generator.h                  |  234 +++
 cpp/src/arrow/testing/gtest_util.h                 |   71 +-
 cpp/src/arrow/testing/util.cc                      |   79 +-
 cpp/src/arrow/testing/util.h                       |   26 +-
 cpp/src/arrow/type.cc                              |   28 +-
 cpp/src/arrow/type.h                               |   19 +-
 cpp/src/arrow/type_benchmark.cc                    |    6 +-
 cpp/src/arrow/type_test.cc                         |    8 +-
 cpp/src/arrow/type_traits.h                        |   83 +-
 cpp/src/arrow/util/CMakeLists.txt                  |    1 +
 cpp/src/arrow/util/bit_util.cc                     |    2 +
 cpp/src/arrow/util/functional.h                    |    3 +-
 cpp/src/arrow/util/hashing.h                       |   44 +-
 cpp/src/arrow/util/io_util.cc                      |   28 +-
 cpp/src/arrow/util/io_util.h                       |    3 +
 cpp/src/arrow/util/io_util_test.cc                 |   41 +
 cpp/src/arrow/util/iterator.h                      |  145 +-
 cpp/src/arrow/util/iterator_test.cc                |  124 ++
 cpp/src/arrow/util/print.h                         |   51 +
 cpp/src/arrow/vendored/string_view.hpp             |   32 +-
 cpp/src/arrow/{dataset/api.h => vendored/xxhash.h} |   17 +-
 .../src/arrow/vendored/xxhash/README.md            |   14 +-
 cpp/src/arrow/vendored/xxhash/xxh3.h               | 1583 ++++++++++++++++++++
 cpp/src/arrow/vendored/xxhash/xxhash.c             |  821 +++++-----
 cpp/src/arrow/vendored/xxhash/xxhash.h             |  328 +++-
 cpp/src/arrow/visitor_inline.h                     |   72 +-
 cpp/src/gandiva/CMakeLists.txt                     |    4 +-
 cpp/src/gandiva/annotator.cc                       |   10 +-
 cpp/src/gandiva/bitmap_accumulator.cc              |   45 +-
 cpp/src/gandiva/bitmap_accumulator.h               |   14 +-
 cpp/src/gandiva/bitmap_accumulator_test.cc         |   57 +-
 cpp/src/gandiva/compiled_expr.h                    |    2 +-
 cpp/src/gandiva/decimal_scalar.h                   |    1 -
 cpp/src/gandiva/eval_batch.h                       |   15 +-
 cpp/src/gandiva/filter.cc                          |    2 +-
 cpp/src/gandiva/function_holder_registry.h         |    3 +
 cpp/src/gandiva/function_registry_math_ops.cc      |   10 +-
 cpp/src/gandiva/gdv_function_stubs.cc              |   22 +
 cpp/src/gandiva/in_holder.h                        |    6 +-
 cpp/src/gandiva/llvm_generator.cc                  |   57 +-
 cpp/src/gandiva/llvm_generator.h                   |    8 +-
 cpp/src/gandiva/llvm_generator_test.cc             |    4 +-
 cpp/src/gandiva/random_generator_holder.cc         |   45 +
 cpp/src/gandiva/random_generator_holder.h          |   60 +
 cpp/src/gandiva/random_generator_holder_test.cc    |  103 ++
 cpp/src/gandiva/tests/filter_test.cc               |   45 +
 cpp/src/gandiva/tests/huge_table_test.cc           |    8 +-
 cpp/src/gandiva/tests/projector_test.cc            |   38 +
 cpp/src/gandiva/tree_expr_builder.cc               |    1 +
 cpp/src/parquet/CMakeLists.txt                     |   51 +-
 cpp/src/parquet/arrow/arrow_reader_writer_test.cc  |    2 +-
 cpp/src/parquet/arrow/reader.cc                    |   58 +-
 cpp/src/parquet/column_io_benchmark.cc             |    2 +-
 cpp/src/parquet/column_reader.cc                   |   27 +-
 cpp/src/parquet/column_writer.cc                   |    2 +-
 cpp/src/parquet/column_writer_test.cc              |    2 +-
 cpp/src/parquet/encoding.cc                        |  497 +++---
 cpp/src/parquet/encoding.h                         |   17 +-
 cpp/src/parquet/encoding_benchmark.cc              |  223 ++-
 cpp/src/parquet/encoding_test.cc                   |   74 +-
 cpp/src/parquet/file_deserialize_test.cc           |    2 +-
 cpp/src/parquet/metadata.cc                        |    2 +-
 cpp/src/parquet/metadata_test.cc                   |    2 +-
 cpp/src/parquet/schema.cc                          |    2 +-
 cpp/src/parquet/schema_internal.h                  |    6 +-
 cpp/src/parquet/schema_test.cc                     |   10 +-
 cpp/src/parquet/statistics_test.cc                 |    2 +-
 cpp/src/parquet/{thrift.h => thrift_internal.h}    |    0
 cpp/src/plasma/CMakeLists.txt                      |    1 -
 cpp/src/plasma/client.cc                           |    2 +-
 ...17c897976c60b0e6e4f4a365c751027244dada7a.tar.gz |  Bin 0 -> 454719 bytes
 cpp/thirdparty/versions.txt                        |   25 +-
 dev/README.md                                      |   16 +-
 dev/archery/archery/utils/cmake.py                 |   23 +-
 dev/merge_arrow_pr.py                              |   10 +-
 dev/release/rat_exclude_files.txt                  |    1 +
 dev/tasks/README.md                                |  189 +--
 .../conda-recipes/.ci_support/linux_python2.7.yaml |   22 +
 .../conda-recipes/.ci_support/linux_python3.6.yaml |   22 +
 .../conda-recipes/.ci_support/linux_python3.7.yaml |   23 +-
 .../conda-recipes/.ci_support/osx_python2.7.yaml   |   22 +
 .../conda-recipes/.ci_support/osx_python3.6.yaml   |   22 +
 .../conda-recipes/.ci_support/osx_python3.7.yaml   |   22 +
 ..._compilervs2015cxx_compilervs2015python3.6.yaml |   22 -
 ...pilervs2015cxx_compilervs2015python3.6vc14.yaml |   51 +
 ..._compilervs2015cxx_compilervs2015python3.7.yaml |   22 -
 ...pilervs2015cxx_compilervs2015python3.7vc14.yaml |   51 +
 dev/tasks/conda-recipes/arrow-cpp/bld.bat          |    6 +-
 dev/tasks/conda-recipes/arrow-cpp/build.sh         |   12 +-
 dev/tasks/conda-recipes/arrow-cpp/meta.yaml        |   32 +-
 dev/tasks/conda-recipes/azure.linux.yml            |   27 +-
 dev/tasks/conda-recipes/azure.osx.yml              |   30 +-
 dev/tasks/conda-recipes/azure.win.yml              |   26 +-
 dev/tasks/conda-recipes/parquet-cpp/meta.yaml      |    8 +-
 dev/tasks/conda-recipes/pyarrow/bld.bat            |    4 +-
 dev/tasks/conda-recipes/pyarrow/build.sh           |   13 +-
 dev/tasks/conda-recipes/pyarrow/meta.yaml          |   30 +-
 dev/tasks/crossbow.py                              |  845 +++++++----
 dev/tasks/gandiva-jars/build-cpp-linux.sh          |    1 -
 dev/tasks/linux-packages/azure.linux.arm64.yml     |   82 +
 dev/tasks/linux-packages/azure.linux.yml           |   66 +
 .../linux-packages/debian.ubuntu-xenial/control    |    6 +-
 .../plasma-store-server.install                    |    2 +-
 dev/tasks/linux-packages/debian/control            |    6 +-
 .../debian/plasma-store-server.install             |    2 +-
 dev/tasks/linux-packages/package-task.rb           |   17 +-
 dev/tasks/linux-packages/travis.linux.arm64.yml    |   75 -
 dev/tasks/linux-packages/travis.linux.yml          |   59 -
 dev/tasks/linux-packages/yum/arrow.spec.in         |    2 +-
 dev/tasks/python-wheels/osx-build.sh               |    5 +-
 dev/tasks/tasks.yml                                |  500 ++++++-
 dev/tasks/tests.yml                                |  411 -----
 dev/tasks/upload-assets.py                         |   64 -
 docker-compose.yml                                 |    4 +-
 docs/source/conf.py                                |    2 +-
 docs/source/cpp/datatypes.rst                      |    2 +-
 docs/source/developers/cpp.rst                     |   42 +-
 docs/source/developers/crossbow.rst                |  273 ++++
 docs/source/developers/python.rst                  |  139 +-
 docs/source/format/Columnar.rst                    | 1167 +++++++++++++++
 docs/source/format/Guidelines.rst                  |   27 +-
 docs/source/format/IPC.rst                         |  252 +---
 docs/source/format/Integration.rst                 |  179 +++
 docs/source/format/Layout.rst                      |  782 +---------
 docs/source/format/Metadata.rst                    |  328 +---
 docs/source/format/Other.rst                       |   63 +
 r/NEWS.md => docs/source/format/README.md          |   16 +-
 docs/source/format/README.rst                      |   53 -
 docs/source/format/Versioning.rst                  |   70 +
 docs/source/index.rst                              |   18 +-
 docs/source/python/api/arrays.rst                  |    1 +
 docs/source/python/api/datatypes.rst               |   11 +
 docs/source/python/api/tables.rst                  |    3 +-
 docs/source/python/benchmarks.rst                  |    8 +-
 docs/source/python/extending_types.rst             |  180 +++
 format/Flight.proto                                |   16 +
 format/SparseTensor.fbs                            |   65 +-
 integration/hdfs/Dockerfile                        |   11 +-
 integration/integration_test.py                    |  170 ++-
 java/adapter/avro/pom.xml                          |    1 +
 .../main/java/org/apache/arrow/AvroToArrow.java    |   33 +-
 .../java/org/apache/arrow/AvroToArrowConfig.java   |   80 +
 .../java/org/apache/arrow/AvroToArrowUtils.java    |  358 ++++-
 .../apache/arrow/AvroToArrowVectorIterator.java    |  165 ++
 ...leTypeConsumer.java => AvroArraysConsumer.java} |   46 +-
 .../arrow/consumers/AvroBooleanConsumer.java       |   25 +-
 .../apache/arrow/consumers/AvroBytesConsumer.java  |   44 +-
 .../apache/arrow/consumers/AvroDoubleConsumer.java |   25 +-
 ...{AvroIntConsumer.java => AvroEnumConsumer.java} |   32 +-
 ...oDoubleConsumer.java => AvroFixedConsumer.java} |   38 +-
 .../apache/arrow/consumers/AvroFloatConsumer.java  |   25 +-
 .../apache/arrow/consumers/AvroIntConsumer.java    |   25 +-
 .../apache/arrow/consumers/AvroLongConsumer.java   |   25 +-
 ...lableTypeConsumer.java => AvroMapConsumer.java} |   46 +-
 .../apache/arrow/consumers/AvroNullConsumer.java   |    9 +-
 .../apache/arrow/consumers/AvroStringConsumer.java |   43 +-
 ...ooleanConsumer.java => AvroStructConsumer.java} |   49 +-
 .../apache/arrow/consumers/AvroUnionsConsumer.java |   36 +-
 .../arrow/consumers/CompositeAvroConsumer.java     |   39 +-
 .../java/org/apache/arrow/consumers/Consumer.java  |   10 +-
 .../arrow/consumers/NullableTypeConsumer.java      |    6 +
 .../test/java/org/apache/arrow/AvroTestBase.java   |  202 +++
 .../org/apache/arrow/AvroToArrowIteratorTest.java  |  161 ++
 .../java/org/apache/arrow/AvroToArrowTest.java     |  197 ++-
 .../src/test/resources/schema/test_array.avsc}     |   30 +-
 .../src/test/resources/schema/test_fixed.avsc}     |   30 +-
 .../avro/src/test/resources/schema/test_map.avsc}  |   30 +-
 .../test/resources/schema/test_nested_record.avsc} |   38 +-
 .../resources/schema/test_primitive_enum.avsc}     |   30 +-
 java/adapter/jdbc/pom.xml                          |    7 +
 .../arrow/adapter/jdbc/ArrowVectorIterator.java    |   72 +-
 .../arrow/adapter/jdbc/JdbcToArrowUtils.java       |   15 +-
 .../arrow/adapter/jdbc/consumer/ArrayConsumer.java |   18 +-
 .../adapter/jdbc/consumer/BigIntConsumer.java      |   19 +-
 .../adapter/jdbc/consumer/BinaryConsumer.java      |   56 +-
 .../arrow/adapter/jdbc/consumer/BitConsumer.java   |   19 +-
 .../arrow/adapter/jdbc/consumer/BlobConsumer.java  |    2 +-
 .../arrow/adapter/jdbc/consumer/ClobConsumer.java  |   46 +-
 .../jdbc/consumer/CompositeJdbcConsumer.java       |   11 +-
 .../arrow/adapter/jdbc/consumer/DateConsumer.java  |   19 +-
 .../adapter/jdbc/consumer/DecimalConsumer.java     |   19 +-
 .../adapter/jdbc/consumer/DoubleConsumer.java      |   19 +-
 .../arrow/adapter/jdbc/consumer/FloatConsumer.java |   19 +-
 .../arrow/adapter/jdbc/consumer/IntConsumer.java   |   19 +-
 .../arrow/adapter/jdbc/consumer/JdbcConsumer.java  |    2 +-
 .../adapter/jdbc/consumer/SmallIntConsumer.java    |   19 +-
 .../arrow/adapter/jdbc/consumer/TimeConsumer.java  |   19 +-
 .../adapter/jdbc/consumer/TimestampConsumer.java   |   19 +-
 .../adapter/jdbc/consumer/TinyIntConsumer.java     |   19 +-
 .../adapter/jdbc/consumer/VarCharConsumer.java     |   34 +-
 .../arrow/adapter/jdbc/JdbcFieldInfoTest.java      |    1 +
 .../arrow/adapter/jdbc/JdbcToArrowConfigTest.java  |    1 +
 java/adapter/orc/pom.xml                           |    1 +
 java/algorithm/pom.xml                             |    1 +
 .../algorithm/deduplicate/DeduplicationUtils.java  |   97 ++
 .../deduplicate/VectorRunDeduplicator.java         |  109 ++
 .../HashTableBasedDictionaryBuilder.java           |   20 +-
 .../SearchTreeBasedDictionaryBuilder.java          |   26 +-
 .../algorithm/sort/StableVectorComparator.java     |   63 +
 .../deduplicate/TestDeduplicationUtils.java        |  136 ++
 .../deduplicate/TestVectorRunDeduplicator.java     |  132 ++
 .../algorithm/sort/TestStableVectorComparator.java |  132 ++
 java/flight/pom.xml                                |    1 +
 .../org/apache/arrow/flight/AsyncPutListener.java  |    6 +-
 .../java/org/apache/arrow/flight/FlightClient.java |    9 +
 .../org/apache/arrow/flight/FlightDescriptor.java  |    2 +-
 .../org/apache/arrow/flight/FlightProducer.java    |   13 +
 .../java/org/apache/arrow/flight/FlightServer.java |   15 +-
 .../org/apache/arrow/flight/FlightService.java     |   24 +-
 .../java/org/apache/arrow/flight/SchemaResult.java |   85 ++
 .../java/org/apache/arrow/flight/StreamPipe.java   |   13 +-
 .../org/apache/arrow/flight/SyncPutListener.java   |    6 +-
 .../arrow/flight/TestApplicationMetadata.java      |   38 +-
 .../apache/arrow/flight/TestBasicOperation.java    |    8 +
 .../org/apache/arrow/flight/TestServerOptions.java |   21 +
 .../apache/arrow/flight/auth/TestBasicAuth.java    |    2 +-
 java/format/pom.xml                                |    1 +
 java/gandiva/pom.xml                               |    1 +
 .../arrow/gandiva/evaluator/ProjectorTest.java     |   58 +
 java/memory/pom.xml                                |    1 +
 .../java/org/apache/arrow/memory/Accountant.java   |    3 +-
 .../org/apache/arrow/memory/BaseAllocator.java     |   15 +-
 .../apache/arrow/memory/util/ArrowBufPointer.java  |   13 +-
 .../arrow/memory/util/TestArrowBufPointer.java     |    5 +
 java/performance/pom.xml                           |    4 +-
 .../memory/util/ArrowBufPointerBenchmarks.java     |  108 ++
 java/plasma/pom.xml                                |    1 +
 java/pom.xml                                       |   37 +
 java/tools/pom.xml                                 |    1 +
 .../java/org/apache/arrow/tools/EchoServer.java    |    2 +-
 java/vector/pom.xml                                |    6 +
 .../org/apache/arrow/vector/BaseIntVector.java     |    2 +-
 .../java/org/apache/arrow/vector/Float4Vector.java |   16 +-
 .../java/org/apache/arrow/vector/Float8Vector.java |   16 +-
 ...BaseIntVector.java => FloatingPointVector.java} |   27 +-
 .../java/org/apache/arrow/vector/UInt4Vector.java  |    2 +-
 .../arrow/vector/compare/ApproxEqualsVisitor.java  |  196 +--
 .../org/apache/arrow/vector/compare/Range.java     |   85 ++
 .../arrow/vector/compare/RangeEqualsVisitor.java   |  262 ++--
 .../arrow/vector/compare/VectorEqualsVisitor.java  |   33 +-
 .../arrow/vector/complex/BaseListVector.java}      |   42 +-
 .../vector/complex/BaseRepeatedValueVector.java    |    3 +-
 .../arrow/vector/complex/FixedSizeListVector.java  |   12 +-
 .../apache/arrow/vector/complex/ListVector.java    |   12 +-
 .../apache/arrow/vector/dictionary/Dictionary.java |    2 +-
 .../arrow/vector/dictionary/DictionaryEncoder.java |  101 +-
 .../vector/dictionary/DictionaryHashTable.java     |   15 +-
 .../vector/dictionary/ListSubfieldEncoder.java     |  131 ++
 .../apache/arrow/vector/types/pojo/FieldType.java  |    5 +
 .../util/ElementAddressableVectorIterator.java     |   86 ++
 .../apache/arrow/vector/TestDictionaryVector.java  |  141 ++
 .../org/apache/arrow/vector/TestValueVector.java   |   79 +-
 .../vector/compare/TestRangeEqualsVisitor.java     |   66 +-
 .../org/apache/arrow/vector/ipc/BaseFileTest.java  |    2 +-
 .../arrow/vector/ipc/MessageSerializerTest.java    |    2 +-
 .../util/TestElementAddressableVectorIterator.java |  135 ++
 python/README.md                                   |    7 +-
 python/asv-build.sh                                |   19 +-
 python/asv.conf.json                               |   16 +-
 python/benchmarks/parquet.py                       |   76 +
 python/manylinux1/build_arrow.sh                   |   11 +-
 python/manylinux2010/build_arrow.sh                |   11 +-
 python/pyarrow/__init__.py                         |    5 +-
 python/pyarrow/_csv.pyx                            |   31 +-
 python/pyarrow/_flight.pyx                         |  126 ++
 python/pyarrow/_parquet.pxd                        |    6 +
 python/pyarrow/_parquet.pyx                        |   13 +-
 python/pyarrow/flight.py                           |    2 +
 python/pyarrow/includes/libarrow.pxd               |   14 +-
 python/pyarrow/includes/libarrow_flight.pxd        |   33 +-
 python/pyarrow/io.pxi                              |   10 +-
 python/pyarrow/ipc.pxi                             |   38 +-
 python/pyarrow/lib.pxd                             |    4 +
 python/pyarrow/pandas_compat.py                    |    8 +-
 python/pyarrow/parquet.py                          |   30 +
 python/pyarrow/public-api.pxi                      |    9 +-
 python/pyarrow/scalar.pxi                          |   93 +-
 python/pyarrow/table.pxi                           |  154 +-
 python/pyarrow/tests/conftest.py                   |    4 +-
 python/pyarrow/tests/test_array.py                 |   12 +
 python/pyarrow/tests/test_convert_builtin.py       |    6 +-
 python/pyarrow/tests/test_csv.py                   |   68 +
 python/pyarrow/tests/test_cuda.py                  |    2 +-
 python/pyarrow/tests/test_extension_type.py        |  145 +-
 python/pyarrow/tests/test_flight.py                |  131 +-
 python/pyarrow/tests/test_io.py                    |   42 +
 python/pyarrow/tests/test_ipc.py                   |   15 +-
 python/pyarrow/tests/test_pandas.py                |  104 +-
 python/pyarrow/tests/test_parquet.py               |   69 +-
 python/pyarrow/tests/test_scalars.py               |   49 +
 python/pyarrow/tests/test_schema.py                |    8 +-
 python/pyarrow/tests/test_table.py                 |   62 +-
 python/pyarrow/tests/test_types.py                 |    8 +-
 python/pyarrow/types.pxi                           |  143 +-
 r/DESCRIPTION                                      |   32 +-
 r/Dockerfile                                       |    7 +
 r/Makefile                                         |    2 +-
 r/NAMESPACE                                        |  122 +-
 r/NEWS.md                                          |    2 +-
 r/R/RecordBatchReader.R                            |  138 --
 r/R/RecordBatchWriter.R                            |  178 ---
 r/R/{ArrayData.R => array-data.R}                  |   22 +-
 r/R/array.R                                        |  105 +-
 r/R/arrow-package.R                                |    2 +-
 r/R/arrowExports.R                                 |   24 +-
 r/R/buffer.R                                       |   71 +-
 r/R/{ChunkedArray.R => chunked-array.R}            |   54 +-
 r/R/compression.R                                  |  110 +-
 r/R/compute.R                                      |    4 +-
 r/R/csv.R                                          |  299 ++--
 r/R/dictionary.R                                   |   22 +-
 r/R/enums.R                                        |   14 +-
 r/R/feather.R                                      |  228 ++-
 r/R/{Field.R => field.R}                           |   60 +-
 r/R/io.R                                           |  358 ++---
 r/R/json.R                                         |  156 +-
 r/R/{List.R => list.R}                             |   10 +-
 r/R/{memory_pool.R => memory-pool.R}               |   14 +-
 r/R/message.R                                      |   59 +-
 r/R/parquet.R                                      |  203 +--
 r/R/{read_record_batch.R => read-record-batch.R}   |   26 +-
 r/R/read-table.R                                   |   83 +
 r/R/read_table.R                                   |   83 -
 r/R/record-batch-reader.R                          |   96 ++
 r/R/record-batch-writer.R                          |  103 ++
 r/R/{RecordBatch.R => record-batch.R}              |   60 +-
 r/R/{Schema.R => schema.R}                         |   60 +-
 r/R/{Struct.R => struct.R}                         |    8 +-
 r/R/{Table.R => table.R}                           |   46 +-
 r/R/type.R                                         |  241 +--
 r/{tests/testthat/test-arrow.R => R/util.R}        |   22 +-
 r/R/{write_arrow.R => write-arrow.R}               |   34 +-
 r/README.Rmd                                       |    2 +-
 r/README.md                                        |    6 +-
 r/_pkgdown.yml                                     |   86 +-
 r/configure                                        |    2 +-
 r/data-raw/codegen.R                               |    3 +-
 r/man/ArrayData.Rd                                 |   28 +
 r/man/BufferOutputStream.Rd                        |   17 -
 r/man/BufferReader.Rd                              |   14 -
 r/man/ChunkedArray.Rd                              |   44 +
 r/man/Codec.Rd                                     |   21 +
 r/man/CompressedInputStream.Rd                     |   16 -
 r/man/CompressedOutputStream.Rd                    |   19 -
 r/man/{arrow__DataType.Rd => DataType.Rd}          |    5 +-
 ...{arrow__DictionaryType.Rd => DictionaryType.Rd} |    9 +-
 r/man/FeatherTableReader.Rd                        |   39 +-
 r/man/FeatherTableWriter.Rd                        |   35 +-
 r/man/Field.Rd                                     |   39 +
 r/man/FileOutputStream.Rd                          |   17 -
 r/man/FixedSizeBufferWriter.Rd                     |   17 -
 ...{arrow__FixedWidthType.Rd => FixedWidthType.Rd} |    5 +-
 r/man/InputStream.Rd                               |   46 +
 r/man/JsonTableReader.Rd                           |   17 +
 r/man/{arrow___MemoryPool.Rd => MemoryPool.Rd}     |    7 +-
 r/man/{arrow__ipc__Message.Rd => Message.Rd}       |    9 +-
 r/man/MessageReader.Rd                             |   18 +-
 r/man/MockOutputStream.Rd                          |   14 -
 r/man/OutputStream.Rd                              |   47 +
 r/man/ParquetFileReader.Rd                         |   43 +
 r/man/ParquetReaderProperties.Rd                   |   30 +
 r/man/ReadableFile.Rd                              |   17 -
 r/man/{arrow__RecordBatch.Rd => RecordBatch.Rd}    |    7 +-
 r/man/RecordBatchFileReader.Rd                     |   14 -
 r/man/RecordBatchFileWriter.Rd                     |   23 -
 r/man/RecordBatchReader.Rd                         |   39 +
 r/man/RecordBatchStreamReader.Rd                   |   14 -
 r/man/RecordBatchStreamWriter.Rd                   |   23 -
 r/man/RecordBatchWriter.Rd                         |   46 +
 r/man/Schema.Rd                                    |   38 +
 r/man/Table.Rd                                     |   26 +
 r/man/array.Rd                                     |   52 +-
 r/man/arrow__Array.Rd                              |   57 -
 r/man/arrow__ArrayData.Rd                          |   28 -
 r/man/arrow__Buffer.Rd                             |   21 -
 r/man/arrow__ChunkedArray.Rd                       |   17 -
 r/man/arrow__Column.Rd                             |   17 -
 r/man/arrow__Field.Rd                              |   17 -
 r/man/arrow__RecordBatchReader.Rd                  |   17 -
 r/man/arrow__Schema.Rd                             |   29 -
 r/man/arrow__Table.Rd                              |   17 -
 r/man/arrow__io__BufferOutputStream.Rd             |   18 -
 r/man/arrow__io__BufferReader.Rd                   |   17 -
 r/man/arrow__io__FileOutputStream.Rd               |   17 -
 r/man/arrow__io__FixedSizeBufferWriter.Rd          |   17 -
 r/man/arrow__io__InputStream.Rd                    |   17 -
 r/man/arrow__io__MemoryMappedFile.Rd               |   20 -
 r/man/arrow__io__MockOutputStream.Rd               |   17 -
 r/man/arrow__io__OutputStream.Rd                   |   19 -
 r/man/arrow__io__RandomAccessFile.Rd               |   17 -
 r/man/arrow__io__Readable.Rd                       |   17 -
 r/man/arrow__io__ReadableFile.Rd                   |   17 -
 r/man/arrow__ipc__MessageReader.Rd                 |   17 -
 r/man/arrow__ipc__RecordBatchFileReader.Rd         |   17 -
 r/man/arrow__ipc__RecordBatchFileWriter.Rd         |   40 -
 r/man/arrow__ipc__RecordBatchStreamReader.Rd       |   17 -
 r/man/arrow__ipc__RecordBatchStreamWriter.Rd       |   40 -
 r/man/arrow__ipc__RecordBatchWriter.Rd             |   28 -
 r/man/arrow__json__TableReader.Rd                  |   18 -
 r/man/buffer.Rd                                    |   25 +-
 r/man/chunked_array.Rd                             |   16 -
 r/man/compression.Rd                               |   31 +
 r/man/compression_codec.Rd                         |   14 -
 r/man/csv_convert_options.Rd                       |   10 +-
 r/man/csv_read_options.Rd                          |    9 +-
 r/man/csv_table_reader.Rd                          |    2 +-
 r/man/data-type.Rd                                 |    4 +-
 r/man/default_memory_pool.Rd                       |    8 +-
 r/man/dictionary.Rd                                |    2 +-
 r/man/enums.Rd                                     |    2 +-
 r/man/field.Rd                                     |   25 -
 r/man/make_readable_file.Rd                        |   20 +
 r/man/mmap_create.Rd                               |    2 +-
 r/man/parquet_arrow_reader_properties.Rd           |   15 -
 r/man/parquet_file_reader.Rd                       |   18 -
 r/man/read_delim_arrow.Rd                          |   35 +-
 r/man/read_feather.Rd                              |    8 +-
 r/man/read_json_arrow.Rd                           |    8 +-
 r/man/read_parquet.Rd                              |   17 +-
 r/man/read_record_batch.Rd                         |   12 +-
 r/man/read_schema.Rd                               |    2 +-
 r/man/read_table.Rd                                |   24 +-
 r/man/record_batch.Rd                              |   10 +-
 r/man/schema.Rd                                    |   20 -
 r/man/table.Rd                                     |   19 -
 r/man/write_arrow.Rd                               |   14 +-
 r/man/write_feather.Rd                             |    4 +-
 r/man/write_feather_RecordBatch.Rd                 |   17 -
 r/man/write_parquet.Rd                             |   10 +-
 r/src/array_from_vector.cpp                        |   53 +-
 .../{array__to_vector.cpp => array_to_vector.cpp}  |    0
 r/src/arrowExports.cpp                             |  108 +-
 r/src/csv.cpp                                      |   14 +
 r/src/datatype.cpp                                 |    4 +-
 r/src/recordbatch.cpp                              |    2 +-
 r/src/table.cpp                                    |   14 +-
 r/tests/testthat/helper-arrow.R                    |    2 +-
 r/tests/testthat/test-Array.R                      |  152 +-
 r/tests/testthat/test-RecordBatch.R                |   25 +-
 r/tests/testthat/test-Table.R                      |   38 +-
 .../{test-arraydata.R => test-array-data.R}        |    4 +-
 r/tests/testthat/test-arrow.R                      |   17 +
 .../{test-bufferreader.R => test-buffer-reader.R}  |   18 +-
 r/tests/testthat/test-buffer.R                     |   41 +-
 .../{test-chunkedarray.R => test-chunked-array.R}  |   38 +-
 r/tests/testthat/test-compressed.R                 |   23 +-
 r/tests/testthat/{test-arrow-csv.R => test-csv.R}  |   41 +-
 r/tests/testthat/test-data-type.R                  |    2 +-
 r/tests/testthat/test-feather.R                    |    6 +-
 r/tests/testthat/test-field.R                      |    2 +-
 r/tests/testthat/test-json.R                       |   12 +-
 ...{test-messagereader.R => test-message-reader.R} |   46 +-
 r/tests/testthat/test-message.R                    |   22 +-
 ...ead_record_batch.R => test-read-record-batch.R} |   16 +-
 r/tests/testthat/test-read-write.R                 |   14 +-
 ...ordbatchreader.R => test-record-batch-reader.R} |   36 +-
 r/tests/testthat/test-schema.R                     |   24 +-
 ...-cputhreadpoolcapacity.R => test-thread-pool.R} |    0
 r/tests/testthat/test-type.R                       |    6 +-
 r/vignettes/arrow.Rmd                              |   86 ++
 .../red-arrow/benchmark/raw-records/decimal128.yml |    4 +-
 .../red-arrow/benchmark/raw-records/dictionary.yml |   22 +-
 ruby/red-arrow/benchmark/raw-records/int64.yml     |    4 +-
 ruby/red-arrow/benchmark/raw-records/list.yml      |   10 +-
 ruby/red-arrow/benchmark/raw-records/timestamp.yml |   15 +-
 .../red-arrow/benchmark/values/boolean.yml         |   27 +-
 .../red-arrow/benchmark/values/decimal128.yml      |   42 +-
 .../int64.yml => values/dictionary.yml}            |   47 +-
 .../red-arrow/benchmark/values/int64.yml           |   27 +-
 .../{raw-records/int64.yml => values/list.yml}     |   41 +-
 .../red-arrow/benchmark/values/string.yml          |   28 +-
 .../int64.yml => values/timestamp.yml}             |   48 +-
 ruby/red-arrow/ext/arrow/arrow.cpp                 |   13 +
 ruby/red-arrow/ext/arrow/converters.cpp            |   42 +
 ruby/red-arrow/ext/arrow/converters.hpp            |  626 ++++++++
 ruby/red-arrow/ext/arrow/raw-records.cpp           |  635 +-------
 ruby/red-arrow/ext/arrow/red-arrow.hpp             |   11 +
 ruby/red-arrow/ext/arrow/values.cpp                |  154 ++
 ruby/red-arrow/lib/arrow/array.rb                  |    4 +
 ruby/red-arrow/lib/arrow/loader.rb                 |    8 +
 ruby/red-arrow/red-arrow.gemspec                   |    2 +-
 ruby/red-arrow/test/values/test-basic-arrays.rb    |  284 ++++
 .../test/values/test-dense-union-array.rb          |  487 ++++++
 ruby/red-arrow/test/values/test-list-array.rb      |  497 ++++++
 .../test/values/test-sparse-union-array.rb         |  477 ++++++
 ruby/red-arrow/test/values/test-struct-array.rb    |  452 ++++++
 rust/arrow/benches/arithmetic_kernels.rs           |   20 +
 rust/arrow/examples/builders.rs                    |   91 +-
 rust/arrow/src/array/array.rs                      |  143 +-
 rust/arrow/src/array/builder.rs                    |   38 +-
 rust/arrow/src/buffer.rs                           |   41 +-
 rust/arrow/src/compute/kernels/arithmetic.rs       |  135 +-
 rust/arrow/src/compute/kernels/comparison.rs       |  101 +-
 rust/arrow/src/compute/util.rs                     |  119 +-
 rust/arrow/src/datatypes.rs                        |   40 +
 rust/datafusion/Cargo.toml                         |    2 +-
 rust/datafusion/src/datasource/csv.rs              |   19 +-
 rust/datafusion/src/execution/aggregate.rs         |   36 +-
 rust/datafusion/src/execution/context.rs           |  173 ++-
 rust/datafusion/src/execution/physical_plan/csv.rs |   44 +-
 .../src/execution/physical_plan/datasource.rs      |   73 +
 rust/datafusion/src/execution/physical_plan/mod.rs |    1 +
 .../src/execution/physical_plan/projection.rs      |   57 +-
 rust/datafusion/src/execution/projection.rs        |    4 +-
 rust/datafusion/src/execution/table_impl.rs        |   24 +-
 rust/datafusion/src/lib.rs                         |    3 +
 rust/datafusion/src/logicalplan.rs                 |    6 +-
 rust/datafusion/src/optimizer/type_coercion.rs     |   14 +
 rust/datafusion/src/test/mod.rs                    |  101 ++
 rust/rust-toolchain                                |    2 +-
 testing                                            |    2 +-
 650 files changed, 29238 insertions(+), 10784 deletions(-)
 create mode 100644 .github/workflows/windows-msvc-cpp.yml
 copy c_glib/Gemfile => ci/conda_env_crossbow.txt (89%)
 create mode 100644 cpp/src/arrow/dataset/dataset_internal.h
 create mode 100644 cpp/src/arrow/dataset/filter.cc
 create mode 100644 cpp/src/arrow/dataset/filter_test.cc
 copy cpp/src/arrow/dataset/{dataset_test.cc => scanner_test.cc} (55%)
 create mode 100644 cpp/src/arrow/filesystem/s3_internal.h
 create mode 100644 cpp/src/arrow/filesystem/s3fs.cc
 copy cpp/src/arrow/filesystem/{mockfs.h => s3fs.h} (50%)
 create mode 100644 cpp/src/arrow/filesystem/s3fs_narrative_test.cc
 create mode 100644 cpp/src/arrow/filesystem/s3fs_test.cc
 copy cpp/src/arrow/{dataset/scanner.cc => io/util_internal.h} (79%)
 create mode 100644 cpp/src/arrow/testing/generator.cc
 create mode 100644 cpp/src/arrow/testing/generator.h
 create mode 100644 cpp/src/arrow/util/iterator_test.cc
 create mode 100644 cpp/src/arrow/util/print.h
 copy cpp/src/arrow/{dataset/api.h => vendored/xxhash.h} (77%)
 copy r/NEWS.md => cpp/src/arrow/vendored/xxhash/README.md (50%)
 create mode 100644 cpp/src/arrow/vendored/xxhash/xxh3.h
 create mode 100644 cpp/src/gandiva/random_generator_holder.cc
 create mode 100644 cpp/src/gandiva/random_generator_holder.h
 create mode 100644 cpp/src/gandiva/random_generator_holder_test.cc
 rename cpp/src/parquet/{thrift.h => thrift_internal.h} (100%)
 create mode 100644 cpp/thirdparty/jemalloc/17c897976c60b0e6e4f4a365c751027244dada7a.tar.gz
 delete mode 100644 dev/tasks/conda-recipes/.ci_support/win_c_compilervs2015cxx_compilervs2015python3.6.yaml
 create mode 100644 dev/tasks/conda-recipes/.ci_support/win_c_compilervs2015cxx_compilervs2015python3.6vc14.yaml
 delete mode 100644 dev/tasks/conda-recipes/.ci_support/win_c_compilervs2015cxx_compilervs2015python3.7.yaml
 create mode 100644 dev/tasks/conda-recipes/.ci_support/win_c_compilervs2015cxx_compilervs2015python3.7vc14.yaml
 create mode 100644 dev/tasks/linux-packages/azure.linux.arm64.yml
 create mode 100644 dev/tasks/linux-packages/azure.linux.yml
 delete mode 100644 dev/tasks/linux-packages/travis.linux.arm64.yml
 delete mode 100644 dev/tasks/linux-packages/travis.linux.yml
 delete mode 100644 dev/tasks/tests.yml
 delete mode 100755 dev/tasks/upload-assets.py
 create mode 100644 docs/source/developers/crossbow.rst
 create mode 100644 docs/source/format/Columnar.rst
 create mode 100644 docs/source/format/Integration.rst
 create mode 100644 docs/source/format/Other.rst
 copy r/NEWS.md => docs/source/format/README.md (50%)
 delete mode 100644 docs/source/format/README.rst
 create mode 100644 docs/source/format/Versioning.rst
 create mode 100644 java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowConfig.java
 create mode 100644 java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowVectorIterator.java
 copy java/adapter/avro/src/main/java/org/apache/arrow/consumers/{NullableTypeConsumer.java => AvroArraysConsumer.java} (56%)
 copy java/adapter/avro/src/main/java/org/apache/arrow/consumers/{AvroIntConsumer.java => AvroEnumConsumer.java} (69%)
 copy java/adapter/avro/src/main/java/org/apache/arrow/consumers/{AvroDoubleConsumer.java => AvroFixedConsumer.java} (62%)
 copy java/adapter/avro/src/main/java/org/apache/arrow/consumers/{NullableTypeConsumer.java => AvroMapConsumer.java} (56%)
 copy java/adapter/avro/src/main/java/org/apache/arrow/consumers/{AvroBooleanConsumer.java => AvroStructConsumer.java} (54%)
 create mode 100644 java/adapter/avro/src/test/java/org/apache/arrow/AvroTestBase.java
 create mode 100644 java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowIteratorTest.java
 copy java/{vector/src/main/java/org/apache/arrow/vector/compare/VectorEqualsVisitor.java => adapter/avro/src/test/resources/schema/test_array.avsc} (52%)
 copy java/{vector/src/main/java/org/apache/arrow/vector/compare/VectorEqualsVisitor.java => adapter/avro/src/test/resources/schema/test_fixed.avsc} (52%)
 copy java/{vector/src/main/java/org/apache/arrow/vector/compare/VectorEqualsVisitor.java => adapter/avro/src/test/resources/schema/test_map.avsc} (52%)
 copy java/{vector/src/main/java/org/apache/arrow/vector/compare/VectorEqualsVisitor.java => adapter/avro/src/test/resources/schema/test_nested_record.avsc} (53%)
 copy java/{vector/src/main/java/org/apache/arrow/vector/compare/VectorEqualsVisitor.java => adapter/avro/src/test/resources/schema/test_primitive_enum.avsc} (52%)
 create mode 100644 java/algorithm/src/main/java/org/apache/arrow/algorithm/deduplicate/DeduplicationUtils.java
 create mode 100644 java/algorithm/src/main/java/org/apache/arrow/algorithm/deduplicate/VectorRunDeduplicator.java
 create mode 100644 java/algorithm/src/main/java/org/apache/arrow/algorithm/sort/StableVectorComparator.java
 create mode 100644 java/algorithm/src/test/java/org/apache/arrow/algorithm/deduplicate/TestDeduplicationUtils.java
 create mode 100644 java/algorithm/src/test/java/org/apache/arrow/algorithm/deduplicate/TestVectorRunDeduplicator.java
 create mode 100644 java/algorithm/src/test/java/org/apache/arrow/algorithm/sort/TestStableVectorComparator.java
 create mode 100644 java/flight/src/main/java/org/apache/arrow/flight/SchemaResult.java
 create mode 100644 java/performance/src/test/java/org/apache/arrow/memory/util/ArrowBufPointerBenchmarks.java
 copy java/vector/src/main/java/org/apache/arrow/vector/{BaseIntVector.java => FloatingPointVector.java} (54%)
 create mode 100644 java/vector/src/main/java/org/apache/arrow/vector/compare/Range.java
 copy java/{adapter/avro/src/main/java/org/apache/arrow/consumers/AvroNullConsumer.java => vector/src/main/java/org/apache/arrow/vector/complex/BaseListVector.java} (53%)
 create mode 100644 java/vector/src/main/java/org/apache/arrow/vector/dictionary/ListSubfieldEncoder.java
 create mode 100644 java/vector/src/main/java/org/apache/arrow/vector/util/ElementAddressableVectorIterator.java
 create mode 100644 java/vector/src/test/java/org/apache/arrow/vector/util/TestElementAddressableVectorIterator.java
 delete mode 100644 r/R/RecordBatchReader.R
 delete mode 100644 r/R/RecordBatchWriter.R
 rename r/R/{ArrayData.R => array-data.R} (78%)
 rename r/R/{ChunkedArray.R => chunked-array.R} (50%)
 rename r/R/{Field.R => field.R} (70%)
 rename r/R/{List.R => list.R} (72%)
 rename r/R/{memory_pool.R => memory-pool.R} (79%)
 rename r/R/{read_record_batch.R => read-record-batch.R} (54%)
 create mode 100644 r/R/read-table.R
 delete mode 100644 r/R/read_table.R
 create mode 100644 r/R/record-batch-reader.R
 create mode 100644 r/R/record-batch-writer.R
 rename r/R/{RecordBatch.R => record-batch.R} (61%)
 rename r/R/{Schema.R => schema.R} (65%)
 rename r/R/{Struct.R => struct.R} (81%)
 rename r/R/{Table.R => table.R} (65%)
 copy r/{tests/testthat/test-arrow.R => R/util.R} (62%)
 rename r/R/{write_arrow.R => write-arrow.R} (67%)
 create mode 100644 r/man/ArrayData.Rd
 delete mode 100644 r/man/BufferOutputStream.Rd
 delete mode 100644 r/man/BufferReader.Rd
 create mode 100644 r/man/ChunkedArray.Rd
 create mode 100644 r/man/Codec.Rd
 delete mode 100644 r/man/CompressedInputStream.Rd
 delete mode 100644 r/man/CompressedOutputStream.Rd
 rename r/man/{arrow__DataType.Rd => DataType.Rd} (75%)
 rename r/man/{arrow__DictionaryType.Rd => DictionaryType.Rd} (52%)
 create mode 100644 r/man/Field.Rd
 delete mode 100644 r/man/FileOutputStream.Rd
 delete mode 100644 r/man/FixedSizeBufferWriter.Rd
 rename r/man/{arrow__FixedWidthType.Rd => FixedWidthType.Rd} (72%)
 create mode 100644 r/man/InputStream.Rd
 create mode 100644 r/man/JsonTableReader.Rd
 rename r/man/{arrow___MemoryPool.Rd => MemoryPool.Rd} (59%)
 rename r/man/{arrow__ipc__Message.Rd => Message.Rd} (54%)
 delete mode 100644 r/man/MockOutputStream.Rd
 create mode 100644 r/man/OutputStream.Rd
 create mode 100644 r/man/ParquetFileReader.Rd
 create mode 100644 r/man/ParquetReaderProperties.Rd
 delete mode 100644 r/man/ReadableFile.Rd
 rename r/man/{arrow__RecordBatch.Rd => RecordBatch.Rd} (59%)
 delete mode 100644 r/man/RecordBatchFileReader.Rd
 delete mode 100644 r/man/RecordBatchFileWriter.Rd
 create mode 100644 r/man/RecordBatchReader.Rd
 delete mode 100644 r/man/RecordBatchStreamReader.Rd
 delete mode 100644 r/man/RecordBatchStreamWriter.Rd
 create mode 100644 r/man/RecordBatchWriter.Rd
 create mode 100644 r/man/Schema.Rd
 create mode 100644 r/man/Table.Rd
 delete mode 100644 r/man/arrow__Array.Rd
 delete mode 100644 r/man/arrow__ArrayData.Rd
 delete mode 100644 r/man/arrow__Buffer.Rd
 delete mode 100644 r/man/arrow__ChunkedArray.Rd
 delete mode 100644 r/man/arrow__Column.Rd
 delete mode 100644 r/man/arrow__Field.Rd
 delete mode 100644 r/man/arrow__RecordBatchReader.Rd
 delete mode 100644 r/man/arrow__Schema.Rd
 delete mode 100644 r/man/arrow__Table.Rd
 delete mode 100644 r/man/arrow__io__BufferOutputStream.Rd
 delete mode 100644 r/man/arrow__io__BufferReader.Rd
 delete mode 100644 r/man/arrow__io__FileOutputStream.Rd
 delete mode 100644 r/man/arrow__io__FixedSizeBufferWriter.Rd
 delete mode 100644 r/man/arrow__io__InputStream.Rd
 delete mode 100644 r/man/arrow__io__MemoryMappedFile.Rd
 delete mode 100644 r/man/arrow__io__MockOutputStream.Rd
 delete mode 100644 r/man/arrow__io__OutputStream.Rd
 delete mode 100644 r/man/arrow__io__RandomAccessFile.Rd
 delete mode 100644 r/man/arrow__io__Readable.Rd
 delete mode 100644 r/man/arrow__io__ReadableFile.Rd
 delete mode 100644 r/man/arrow__ipc__MessageReader.Rd
 delete mode 100644 r/man/arrow__ipc__RecordBatchFileReader.Rd
 delete mode 100644 r/man/arrow__ipc__RecordBatchFileWriter.Rd
 delete mode 100644 r/man/arrow__ipc__RecordBatchStreamReader.Rd
 delete mode 100644 r/man/arrow__ipc__RecordBatchStreamWriter.Rd
 delete mode 100644 r/man/arrow__ipc__RecordBatchWriter.Rd
 delete mode 100644 r/man/arrow__json__TableReader.Rd
 delete mode 100644 r/man/chunked_array.Rd
 create mode 100644 r/man/compression.Rd
 delete mode 100644 r/man/compression_codec.Rd
 delete mode 100644 r/man/field.Rd
 create mode 100644 r/man/make_readable_file.Rd
 delete mode 100644 r/man/parquet_arrow_reader_properties.Rd
 delete mode 100644 r/man/parquet_file_reader.Rd
 delete mode 100644 r/man/schema.Rd
 delete mode 100644 r/man/table.Rd
 delete mode 100644 r/man/write_feather_RecordBatch.Rd
 rename r/src/{array__to_vector.cpp => array_to_vector.cpp} (100%)
 copy r/tests/testthat/{test-arraydata.R => test-array-data.R} (95%)
 rename r/tests/testthat/{test-bufferreader.R => test-buffer-reader.R} (75%)
 rename r/tests/testthat/{test-chunkedarray.R => test-chunked-array.R} (91%)
 rename r/tests/testthat/{test-arrow-csv.R => test-csv.R} (77%)
 rename r/tests/testthat/{test-messagereader.R => test-message-reader.R} (64%)
 rename r/tests/testthat/{test-read_record_batch.R => test-read-record-batch.R} (86%)
 rename r/tests/testthat/{test-recordbatchreader.R => test-record-batch-reader.R} (62%)
 rename r/tests/testthat/{test-cputhreadpoolcapacity.R => test-thread-pool.R} (100%)
 create mode 100644 r/vignettes/arrow.Rmd
 copy r/tests/testthat/test-arraydata.R => ruby/red-arrow/benchmark/values/boolean.yml (61%)
 rename r/R/Column.R => ruby/red-arrow/benchmark/values/decimal128.yml (58%)
 copy ruby/red-arrow/benchmark/{raw-records/int64.yml => values/dictionary.yml} (59%)
 copy r/tests/testthat/test-arraydata.R => ruby/red-arrow/benchmark/values/int64.yml (61%)
 copy ruby/red-arrow/benchmark/{raw-records/int64.yml => values/list.yml} (59%)
 rename r/tests/testthat/test-arraydata.R => ruby/red-arrow/benchmark/values/string.yml (62%)
 copy ruby/red-arrow/benchmark/{raw-records/int64.yml => values/timestamp.yml} (59%)
 create mode 100644 ruby/red-arrow/ext/arrow/converters.cpp
 create mode 100644 ruby/red-arrow/ext/arrow/converters.hpp
 create mode 100644 ruby/red-arrow/ext/arrow/values.cpp
 create mode 100644 ruby/red-arrow/test/values/test-basic-arrays.rb
 create mode 100644 ruby/red-arrow/test/values/test-dense-union-array.rb
 create mode 100644 ruby/red-arrow/test/values/test-list-array.rb
 create mode 100644 ruby/red-arrow/test/values/test-sparse-union-array.rb
 create mode 100644 ruby/red-arrow/test/values/test-struct-array.rb
 create mode 100644 rust/datafusion/src/execution/physical_plan/datasource.rs
 create mode 100644 rust/datafusion/src/test/mod.rs


[arrow] 03/03: ARROW-6315: [Java] Make change to ensure flatbuffer reads are aligned

Posted by we...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch ARROW-6313-flatbuffer-alignment
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 0352456387e0d02036029de4fbb6d49324eb779e
Author: tianchen <ni...@alibaba-inc.com>
AuthorDate: Fri Sep 6 20:36:38 2019 -0700

    ARROW-6315: [Java] Make change to ensure flatbuffer reads are aligned
    
    Implements the IPC message format alignment changes for [ARROW-6315](https://issues.apache.org/jira/browse/ARROW-6315).
    i. MessageReader can read messages with the old alignment
    ii. ArrowWriter could choose produces messages with the new alignment or the old format.
    
    Closes #5229 from tianchen92/ARROW-align-java and squashes the following commits:
    
    1eb71d27c <Bryan Cutler> ARROW-6461:  Prevent EchoServer from closing the client socket after writing
    cd4fd050e <tianchen> fix small bugs
    9a690e47d <tianchen> fix comments and styles
    5ee858c56 <tianchen>  Make change to ensure flatbuffer reads are aligned
    
    Lead-authored-by: tianchen <ni...@alibaba-inc.com>
    Co-authored-by: Bryan Cutler <cu...@gmail.com>
    Signed-off-by: Micah Kornfield <em...@gmail.com>
---
 .../org/apache/arrow/tools/EchoServerTest.java     |   2 +-
 .../apache/arrow/vector/ipc/ArrowFileWriter.java   |  13 ++-
 .../apache/arrow/vector/ipc/ArrowStreamWriter.java |  22 ++++-
 .../org/apache/arrow/vector/ipc/ArrowWriter.java   |  17 +++-
 .../org/apache/arrow/vector/ipc/WriteChannel.java  |   9 ++
 .../apache/arrow/vector/ipc/message/IpcOption.java |  28 ++++++
 .../vector/ipc/message/MessageSerializer.java      | 108 ++++++++++++++++++---
 .../arrow/vector/ipc/MessageSerializerTest.java    |  14 +--
 .../arrow/vector/ipc/TestArrowReaderWriter.java    |  52 +++++++++-
 .../apache/arrow/vector/ipc/TestArrowStream.java   |   2 +-
 .../arrow/vector/ipc/TestArrowStreamPipe.java      |   2 +-
 11 files changed, 231 insertions(+), 38 deletions(-)

diff --git a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
index 219926a..bfb136c 100644
--- a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
+++ b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
@@ -127,7 +127,7 @@ public class EchoServerTest {
       }
       Assert.assertFalse(reader.loadNextBatch());
       assertEquals(0, reader.getVectorSchemaRoot().getRowCount());
-      assertEquals(reader.bytesRead(), writer.bytesWritten());
+      assertEquals(reader.bytesRead() + 4, writer.bytesWritten());
     }
   }
 
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java
index 395a617..936ab6d 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java
@@ -29,6 +29,7 @@ import org.apache.arrow.vector.ipc.message.ArrowBlock;
 import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
 import org.apache.arrow.vector.ipc.message.ArrowFooter;
 import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.IpcOption;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +48,11 @@ public class ArrowFileWriter extends ArrowWriter {
     super(root, provider, out);
   }
 
+  public ArrowFileWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out,
+      IpcOption option) {
+    super(root, provider, out, option);
+  }
+
   @Override
   protected void startInternal(WriteChannel out) throws IOException {
     ArrowMagic.writeMagic(out, true);
@@ -68,7 +74,12 @@ public class ArrowFileWriter extends ArrowWriter {
 
   @Override
   protected void endInternal(WriteChannel out) throws IOException {
-    out.writeIntLittleEndian(0);
+    if (option.write_legacy_ipc_format) {
+      out.writeIntLittleEndian(0);
+    } else {
+      out.writeLongLittleEndian(0);
+    }
+
     long footerStart = out.getCurrentPosition();
     out.write(new ArrowFooter(schema, dictionaryBlocks, recordBlocks), false);
     int footerLength = (int) (out.getCurrentPosition() - footerStart);
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java
index ec0f42e..e74323b 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java
@@ -24,6 +24,7 @@ import java.nio.channels.WritableByteChannel;
 
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.dictionary.DictionaryProvider;
+import org.apache.arrow.vector.ipc.message.IpcOption;
 
 /**
  * Writer for the Arrow stream format to send ArrowRecordBatches over a WriteChannel.
@@ -44,14 +45,23 @@ public class ArrowStreamWriter extends ArrowWriter {
 
   /**
    * Construct an ArrowStreamWriter with an optional DictionaryProvider for the WritableByteChannel.
+   */
+  public ArrowStreamWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) {
+    this(root, provider, out, new IpcOption());
+  }
+
+  /**
+   * Construct an ArrowStreamWriter with an optional DictionaryProvider for the WritableByteChannel.
    *
    * @param root Existing VectorSchemaRoot with vectors to be written.
    * @param provider DictionaryProvider for any vectors that are dictionary encoded.
    *                 (Optional, can be null)
+   * @param option IPC write options
    * @param out WritableByteChannel for writing.
    */
-  public ArrowStreamWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) {
-    super(root, provider, out);
+  public ArrowStreamWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out,
+      IpcOption option) {
+    super(root, provider, out, option);
   }
 
   /**
@@ -60,8 +70,12 @@ public class ArrowStreamWriter extends ArrowWriter {
    * @param out Open WriteChannel with an active Arrow stream.
    * @throws IOException on error
    */
-  public static void writeEndOfStream(WriteChannel out) throws IOException {
-    out.writeIntLittleEndian(0);
+  public void writeEndOfStream(WriteChannel out) throws IOException {
+    if (option.write_legacy_ipc_format) {
+      out.writeIntLittleEndian(0);
+    } else {
+      out.writeLongLittleEndian(0);
+    }
   }
 
   @Override
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java
index 6366f2f..52ab3de 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java
@@ -33,6 +33,7 @@ import org.apache.arrow.vector.dictionary.DictionaryProvider;
 import org.apache.arrow.vector.ipc.message.ArrowBlock;
 import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
 import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.IpcOption;
 import org.apache.arrow.vector.ipc.message.MessageSerializer;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
@@ -59,16 +60,24 @@ public abstract class ArrowWriter implements AutoCloseable {
 
   private boolean dictWritten = false;
 
+  protected IpcOption option;
+
+  protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) {
+    this (root, provider, out, new IpcOption());
+  }
+
   /**
    * Note: fields are not closed when the writer is closed.
    *
    * @param root     the vectors to write to the output
    * @param provider where to find the dictionaries
    * @param out      the output where to write
+   * @param option   IPC write options
    */
-  protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) {
+  protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out, IpcOption option) {
     this.unloader = new VectorUnloader(root);
     this.out = new WriteChannel(out);
+    this.option = option;
 
     List<Field> fields = new ArrayList<>(root.getSchema().getFields().size());
     Set<Long> dictionaryIdsUsed = new HashSet<>();
@@ -112,14 +121,14 @@ public abstract class ArrowWriter implements AutoCloseable {
   }
 
   protected ArrowBlock writeDictionaryBatch(ArrowDictionaryBatch batch) throws IOException {
-    ArrowBlock block = MessageSerializer.serialize(out, batch);
+    ArrowBlock block = MessageSerializer.serialize(out, batch, option);
     LOGGER.debug("DictionaryRecordBatch at {}, metadata: {}, body: {}",
         block.getOffset(), block.getMetadataLength(), block.getBodyLength());
     return block;
   }
 
   protected ArrowBlock writeRecordBatch(ArrowRecordBatch batch) throws IOException {
-    ArrowBlock block = MessageSerializer.serialize(out, batch);
+    ArrowBlock block = MessageSerializer.serialize(out, batch, option);
     LOGGER.debug("RecordBatch at {}, metadata: {}, body: {}",
         block.getOffset(), block.getMetadataLength(), block.getBodyLength());
     return block;
@@ -140,7 +149,7 @@ public abstract class ArrowWriter implements AutoCloseable {
       startInternal(out);
       // write the schema - for file formats this is duplicated in the footer, but matches
       // the streaming format
-      MessageSerializer.serialize(out, schema);
+      MessageSerializer.serialize(out, schema, option);
     }
   }
 
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java
index eef36d3..2d36c93 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java
@@ -102,6 +102,15 @@ public class WriteChannel implements AutoCloseable {
   }
 
   /**
+   * Writes <code>v</code> in little-endian format to the underlying channel.
+   */
+  public long writeLongLittleEndian(long v) throws IOException {
+    byte[] outBuffer = new byte[8];
+    MessageSerializer.longToBytes(v, outBuffer);
+    return write(outBuffer);
+  }
+
+  /**
    * Writes the buffer to the underlying channel.
    */
   public void write(ArrowBuf buffer) throws IOException {
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/IpcOption.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/IpcOption.java
new file mode 100644
index 0000000..81a0603
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/IpcOption.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.ipc.message;
+
+/**
+ * IPC options, now only use for write.
+ */
+public class IpcOption {
+
+  // Write the pre-0.15.0 encapsulated IPC message format
+  // consisting of a 4-byte prefix instead of 8 byte
+  public boolean write_legacy_ipc_format = false;
+}
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
index 4016802..34ea077 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
@@ -56,6 +56,9 @@ import io.netty.buffer.ArrowBuf;
  */
 public class MessageSerializer {
 
+  // This 0xFFFFFFFF value is the first 4 bytes of a valid IPC message
+  public static final int IPC_CONTINUATION_TOKEN = -1;
+
   /**
    * Convert an array of 4 bytes to a little endian i32 value.
    *
@@ -83,6 +86,28 @@ public class MessageSerializer {
   }
 
   /**
+   * Convert a long to a 8 byte array.
+   *
+   * @param value long value input
+   * @param bytes existing byte array with minimum length of 8 to contain the conversion output
+   */
+  public static void longToBytes(long value, byte[] bytes) {
+    bytes[7] = (byte) (value >>> 56);
+    bytes[6] = (byte) (value >>> 48);
+    bytes[5] = (byte) (value >>> 40);
+    bytes[4] = (byte) (value >>> 32);
+    bytes[3] = (byte) (value >>> 24);
+    bytes[2] = (byte) (value >>> 16);
+    bytes[1] = (byte) (value >>> 8);
+    bytes[0] = (byte) (value);
+  }
+
+  public static int writeMessageBuffer(WriteChannel out, int messageLength, ByteBuffer messageBuffer)
+      throws IOException {
+    return writeMessageBuffer(out, messageLength, messageBuffer, new IpcOption());
+  }
+
+  /**
    * Write the serialized Message metadata, prefixed by the length, to the output Channel. This
    * ensures that it aligns to an 8 byte boundary and will adjust the message length to include
    * any padding used for alignment.
@@ -91,22 +116,36 @@ public class MessageSerializer {
    * @param messageLength Number of bytes in the message buffer, written as little Endian prefix
    * @param messageBuffer Message metadata buffer to be written, this does not include any
    *                      message body data which should be subsequently written to the Channel
+   * @param option IPC write options
    * @return Number of bytes written
    * @throws IOException on error
    */
-  public static int writeMessageBuffer(WriteChannel out, int messageLength, ByteBuffer messageBuffer)
+  public static int writeMessageBuffer(WriteChannel out, int messageLength, ByteBuffer messageBuffer, IpcOption option)
       throws IOException {
 
-    // ensure that message aligns to 8 byte padding - 4 bytes for size, then message body
-    if ((messageLength + 4) % 8 != 0) {
-      messageLength += 8 - (messageLength + 4) % 8;
+    // if write the pre-0.15.0 encapsulated IPC message format consisting of a 4-byte prefix instead of 8 byte
+    int prefixSize = option.write_legacy_ipc_format ? 4 : 8;
+
+    // ensure that message aligns to 8 byte padding - prefix_size bytes, then message body
+    if ((messageLength + prefixSize ) % 8 != 0) {
+      messageLength += 8 - (messageLength + prefixSize) % 8;
+    }
+    if (!option.write_legacy_ipc_format) {
+      out.writeIntLittleEndian(IPC_CONTINUATION_TOKEN);
     }
     out.writeIntLittleEndian(messageLength);
     out.write(messageBuffer);
     out.align();
 
     // any bytes written are already captured by our size modification above
-    return messageLength + 4;
+    return messageLength + prefixSize;
+  }
+
+  /**
+   * Serialize a schema object.
+   */
+  public static long serialize(WriteChannel out, Schema schema) throws IOException {
+    return serialize(out, schema, new IpcOption());
   }
 
   /**
@@ -117,7 +156,7 @@ public class MessageSerializer {
    * @return the number of bytes written
    * @throws IOException if something went wrong
    */
-  public static long serialize(WriteChannel out, Schema schema) throws IOException {
+  public static long serialize(WriteChannel out, Schema schema, IpcOption option) throws IOException {
     long start = out.getCurrentPosition();
     assert start % 8 == 0;
 
@@ -125,7 +164,7 @@ public class MessageSerializer {
 
     int messageLength = serializedMessage.remaining();
 
-    int bytesWritten = writeMessageBuffer(out, messageLength, serializedMessage);
+    int bytesWritten = writeMessageBuffer(out, messageLength, serializedMessage, option);
     assert bytesWritten % 8 == 0;
     return bytesWritten;
   }
@@ -182,13 +221,20 @@ public class MessageSerializer {
 
   /**
    * Serializes an ArrowRecordBatch. Returns the offset and length of the written batch.
+   */
+  public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch) throws IOException {
+    return serialize(out, batch, new IpcOption());
+  }
+
+  /**
+   * Serializes an ArrowRecordBatch. Returns the offset and length of the written batch.
    *
    * @param out   where to write the batch
    * @param batch the object to serialize to out
    * @return the serialized block metadata
    * @throws IOException if something went wrong
    */
-  public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch) throws IOException {
+  public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch, IpcOption option) throws IOException {
 
     long start = out.getCurrentPosition();
     int bodyLength = batch.computeBodyLength();
@@ -198,8 +244,14 @@ public class MessageSerializer {
 
     int metadataLength = serializedMessage.remaining();
 
+    int prefixSize = 4;
+    if (!option.write_legacy_ipc_format) {
+      out.writeIntLittleEndian(IPC_CONTINUATION_TOKEN);
+      prefixSize = 8;
+    }
+
     // calculate alignment bytes so that metadata length points to the correct location after alignment
-    int padding = (int) ((start + metadataLength + 4) % 8);
+    int padding = (int) ((start + metadataLength + prefixSize) % 8);
     if (padding != 0) {
       metadataLength += (8 - padding);
     }
@@ -214,7 +266,7 @@ public class MessageSerializer {
     assert bufferLength % 8 == 0;
 
     // Metadata size in the Block account for the size prefix
-    return new ArrowBlock(start, metadataLength + 4, bufferLength);
+    return new ArrowBlock(start, metadataLength + prefixSize, bufferLength);
   }
 
   /**
@@ -305,7 +357,7 @@ public class MessageSerializer {
    */
   public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, ArrowBlock block, BufferAllocator alloc)
       throws IOException {
-    // Metadata length contains integer prefix plus byte padding
+    // Metadata length contains prefix_size bytes plus byte padding
     long totalLen = block.getMetadataLength() + block.getBodyLength();
 
     if (totalLen > Integer.MAX_VALUE) {
@@ -317,7 +369,9 @@ public class MessageSerializer {
       throw new IOException("Unexpected end of input trying to read batch.");
     }
 
-    ArrowBuf metadataBuffer = buffer.slice(4, block.getMetadataLength() - 4);
+    int prefixSize = buffer.getInt(0) == IPC_CONTINUATION_TOKEN ? 8 : 4;
+
+    ArrowBuf metadataBuffer = buffer.slice(prefixSize, block.getMetadataLength() - prefixSize);
 
     Message messageFB =
         Message.getRootAsMessage(metadataBuffer.nioBuffer().asReadOnlyBuffer());
@@ -375,15 +429,21 @@ public class MessageSerializer {
     return deserializeRecordBatch(serializedMessage.getMessage(), underlying);
   }
 
+  public static ArrowBlock serialize(WriteChannel out, ArrowDictionaryBatch batch) throws IOException {
+    return serialize(out, batch, new IpcOption());
+  }
+
   /**
    * Serializes a dictionary ArrowRecordBatch. Returns the offset and length of the written batch.
    *
    * @param out   where to serialize
    * @param batch the batch to serialize
+   * @param option options for IPC
    * @return the metadata of the serialized block
    * @throws IOException if something went wrong
    */
-  public static ArrowBlock serialize(WriteChannel out, ArrowDictionaryBatch batch) throws IOException {
+  public static ArrowBlock serialize(WriteChannel out, ArrowDictionaryBatch batch, IpcOption option)
+      throws IOException {
     long start = out.getCurrentPosition();
     int bodyLength = batch.computeBodyLength();
     assert bodyLength % 8 == 0;
@@ -392,8 +452,14 @@ public class MessageSerializer {
 
     int metadataLength = serializedMessage.remaining();
 
+    int prefixSize = 4;
+    if (!option.write_legacy_ipc_format) {
+      out.writeIntLittleEndian(IPC_CONTINUATION_TOKEN);
+      prefixSize = 8;
+    }
+
     // calculate alignment bytes so that metadata length points to the correct location after alignment
-    int padding = (int) ((start + metadataLength + 4) % 8);
+    int padding = (int) ((start + metadataLength + prefixSize) % 8);
     if (padding != 0) {
       metadataLength += (8 - padding);
     }
@@ -409,7 +475,7 @@ public class MessageSerializer {
     assert bufferLength % 8 == 0;
 
     // Metadata size in the Block account for the size prefix
-    return new ArrowBlock(start, metadataLength + 4, bufferLength);
+    return new ArrowBlock(start, metadataLength + prefixSize, bufferLength);
   }
 
   /**
@@ -491,7 +557,9 @@ public class MessageSerializer {
       throw new IOException("Unexpected end of input trying to read batch.");
     }
 
-    ArrowBuf metadataBuffer = buffer.slice(4, block.getMetadataLength() - 4);
+    int prefixSize = buffer.getInt(0) == IPC_CONTINUATION_TOKEN ? 8 : 4;
+
+    ArrowBuf metadataBuffer = buffer.slice(prefixSize, block.getMetadataLength() - prefixSize);
 
     Message messageFB =
         Message.getRootAsMessage(metadataBuffer.nioBuffer().asReadOnlyBuffer());
@@ -584,7 +652,15 @@ public class MessageSerializer {
     // Read the message size. There is an i32 little endian prefix.
     ByteBuffer buffer = ByteBuffer.allocate(4);
     if (in.readFully(buffer) == 4) {
+
       int messageLength = MessageSerializer.bytesToInt(buffer.array());
+      if (messageLength == IPC_CONTINUATION_TOKEN) {
+        buffer.clear();
+        // ARROW-6313, if the first 4 bytes are continuation message, read the next 4 for the length
+        if (in.readFully(buffer) == 4) {
+          messageLength = MessageSerializer.bytesToInt(buffer.array());
+        }
+      }
 
       // Length of 0 indicates end of stream
       if (messageLength != 0) {
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java
index 789da1f..1cbd5bb 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java
@@ -95,7 +95,7 @@ public class MessageSerializerTest {
     buffer.putInt(3);
     buffer.flip();
     bytesWritten = MessageSerializer.writeMessageBuffer(out, 4, buffer);
-    assertEquals(8, bytesWritten);
+    assertEquals(16, bytesWritten);
 
     ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
     ReadChannel in = new ReadChannel(Channels.newChannel(inputStream));
@@ -103,15 +103,17 @@ public class MessageSerializerTest {
     in.readFully(result);
     result.rewind();
 
-    // First message size, 2 int values, 4 bytes of zero padding
-    assertEquals(12, result.getInt());
+    // First message continuation, size, and 2 int values
+    assertEquals(MessageSerializer.IPC_CONTINUATION_TOKEN, result.getInt());
+    assertEquals(8, result.getInt());
     assertEquals(1, result.getInt());
     assertEquals(2, result.getInt());
-    assertEquals(0, result.getInt());
 
-    // Second message size and 1 int value
-    assertEquals(4, result.getInt());
+    // Second message continuation, size, 1 int value and 4 bytes padding
+    assertEquals(MessageSerializer.IPC_CONTINUATION_TOKEN, result.getInt());
+    assertEquals(8, result.getInt());
     assertEquals(3, result.getInt());
+    assertEquals(0, result.getInt());
   }
 
   @Test
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java
index 5d1f792..58ad669 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -42,6 +43,7 @@ import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.util.Collections2;
 import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.TestUtils;
 import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.VectorLoader;
@@ -54,6 +56,7 @@ import org.apache.arrow.vector.ipc.message.ArrowBlock;
 import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
 import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
 import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.IpcOption;
 import org.apache.arrow.vector.ipc.message.MessageSerializer;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
@@ -75,7 +78,7 @@ public class TestArrowReaderWriter {
   private Dictionary dictionary2;
 
   private Schema schema;
-  private Schema encodedchema;
+  private Schema encodedSchema;
 
   @Before
   public void init() {
@@ -161,7 +164,8 @@ public class TestArrowReaderWriter {
       // deserialize the buffer.
       ByteBuffer headerBuffer = ByteBuffer.allocate(recordBatches.get(0).getMetadataLength());
       headerBuffer.put(byteArray, (int) recordBatches.get(0).getOffset(), headerBuffer.capacity());
-      headerBuffer.position(4);
+      // new format prefix_size ==8
+      headerBuffer.position(8);
       Message messageFB = Message.getRootAsMessage(headerBuffer);
       RecordBatch recordBatchFB = (RecordBatch) messageFB.header(new RecordBatch());
       assertEquals(2, recordBatchFB.buffersLength());
@@ -335,7 +339,7 @@ public class TestArrowReaderWriter {
     try (ArrowStreamReader reader = new ArrowStreamReader(
         new ByteArrayReadableSeekableByteChannel(outStream.toByteArray()), allocator)) {
       Schema readSchema = reader.getVectorSchemaRoot().getSchema();
-      assertEquals(encodedchema, readSchema);
+      assertEquals(encodedSchema, readSchema);
       assertEquals(2, reader.getDictionaryVectors().size());
       assertTrue(reader.loadNextBatch());
       assertTrue(reader.loadNextBatch());
@@ -401,8 +405,48 @@ public class TestArrowReaderWriter {
     schemaFields.add(DictionaryUtility.toMessageFormat(encodedVectorA2.getField(), provider, new HashSet<>()));
     schema = new Schema(schemaFields);
 
-    encodedchema = new Schema(Arrays.asList(encodedVectorA1.getField(), encodedVectorA2.getField()));
+    encodedSchema = new Schema(Arrays.asList(encodedVectorA1.getField(), encodedVectorA2.getField()));
 
     return batches;
   }
+
+  @Test
+  public void testLegacyIpcBackwardsCompatibility() throws Exception {
+    Schema schema = new Schema(asList(Field.nullable("field", new ArrowType.Int(32, true))));
+    IntVector vector = new IntVector("vector", allocator);
+    final int valueCount = 2;
+    vector.setValueCount(valueCount);
+    vector.setSafe(0, 1);
+    vector.setSafe(1, 2);
+    ArrowRecordBatch batch = new ArrowRecordBatch(valueCount, asList(new ArrowFieldNode(valueCount, 0)),
+        asList(vector.getValidityBuffer(), vector.getDataBuffer()));
+
+    ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+    WriteChannel out = new WriteChannel(newChannel(outStream));
+
+    // write legacy ipc format
+    IpcOption option = new IpcOption();
+    option.write_legacy_ipc_format = true;
+    MessageSerializer.serialize(out, schema, option);
+    MessageSerializer.serialize(out, batch);
+
+    ReadChannel in = new ReadChannel(newChannel(new ByteArrayInputStream(outStream.toByteArray())));
+    Schema readSchema = MessageSerializer.deserializeSchema(in);
+    assertEquals(schema, readSchema);
+    ArrowRecordBatch readBatch = MessageSerializer.deserializeRecordBatch(in, allocator);
+    assertEquals(batch.getLength(), readBatch.getLength());
+    assertEquals(batch.computeBodyLength(), readBatch.computeBodyLength());
+
+    // write ipc format with continuation
+    option.write_legacy_ipc_format = false;
+    MessageSerializer.serialize(out, schema, option);
+    MessageSerializer.serialize(out, batch);
+
+    ReadChannel in2 = new ReadChannel(newChannel(new ByteArrayInputStream(outStream.toByteArray())));
+    Schema readSchema2 = MessageSerializer.deserializeSchema(in2);
+    assertEquals(schema, readSchema2);
+    ArrowRecordBatch readBatch2 = MessageSerializer.deserializeRecordBatch(in2, allocator);
+    assertEquals(batch.getLength(), readBatch2.getLength());
+    assertEquals(batch.computeBodyLength(), readBatch2.computeBodyLength());
+  }
 }
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStream.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStream.java
index 92e5276..5d8f5df 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStream.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStream.java
@@ -117,7 +117,7 @@ public class TestArrowStream extends BaseFileTest {
           assertTrue(reader.loadNextBatch());
         }
         // TODO figure out why reader isn't getting padding bytes
-        assertEquals(bytesWritten, reader.bytesRead() + 4);
+        assertEquals(bytesWritten, reader.bytesRead() + 8);
         assertFalse(reader.loadNextBatch());
         assertEquals(0, reader.getVectorSchemaRoot().getRowCount());
       }
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStreamPipe.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStreamPipe.java
index 422a63f..07f4017 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStreamPipe.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStreamPipe.java
@@ -156,6 +156,6 @@ public class TestArrowStreamPipe {
     writer.join();
 
     assertEquals(NUM_BATCHES, reader.getBatchesRead());
-    assertEquals(writer.bytesWritten(), reader.bytesRead());
+    assertEquals(writer.bytesWritten(), reader.bytesRead() + 4);
   }
 }


[arrow] 01/03: ARROW-6314: [C++] Implement IPC message format alignment changes, provide backwards compatibility and "legacy" option to emit old message format

Posted by we...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch ARROW-6313-flatbuffer-alignment
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 9514e2e225cdb84da09476cb5707dbb9443239eb
Author: Wes McKinney <we...@apache.org>
AuthorDate: Wed Aug 28 11:58:52 2019 -0500

    ARROW-6314: [C++] Implement IPC message format alignment changes, provide backwards compatibility and "legacy" option to emit old message format
    
    This also moves the alignment multiple to `IpcOptions` and adds the `IpcOptions` argument to more functions.
    
    Closes #5211 from wesm/ARROW-6314 and squashes the following commits:
    
    df3b910bd <Wes McKinney> Fix MSVC narrowing warning
    62758b614 <Wes McKinney> Code review comments. Copy metadata always in prefix_length==4 legacy case
    857a57155 <Antoine Pitrou> Fix CUDA IPC
    71b2fad4f <Wes McKinney> Add tests exercising backwards compatibility write and read path
    4777d2bc8 <Wes McKinney> Implement backwards compatibility and compatibility mode, pass IpcOptions in more APIs
    1a3843215 <Wes McKinney> Revert changes to submodule
    69883cf63 <Micah Kornfield> verify 8 bytes alignment fixes ubsan for ipc
    
    Lead-authored-by: Wes McKinney <we...@apache.org>
    Co-authored-by: Antoine Pitrou <an...@python.org>
    Co-authored-by: Micah Kornfield <em...@gmail.com>
    Signed-off-by: Wes McKinney <we...@apache.org>
---
 cpp/src/arrow/gpu/cuda_arrow_ipc.cc    |  26 +-----
 cpp/src/arrow/ipc/message.cc           | 154 ++++++++++++++++++++++++++++-----
 cpp/src/arrow/ipc/message.h            |  47 +++++++---
 cpp/src/arrow/ipc/metadata_internal.cc |  32 -------
 cpp/src/arrow/ipc/metadata_internal.h  |  16 ----
 cpp/src/arrow/ipc/options.h            |   8 ++
 cpp/src/arrow/ipc/read_write_test.cc   | 119 ++++++++++++++++++-------
 cpp/src/arrow/ipc/writer.cc            |  69 +++++++--------
 cpp/src/arrow/ipc/writer.h             |  22 ++---
 python/pyarrow/includes/libarrow.pxd   |   7 +-
 python/pyarrow/ipc.pxi                 |   5 +-
 11 files changed, 309 insertions(+), 196 deletions(-)

diff --git a/cpp/src/arrow/gpu/cuda_arrow_ipc.cc b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc
index 34488a1..0fb81bc 100644
--- a/cpp/src/arrow/gpu/cuda_arrow_ipc.cc
+++ b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc
@@ -63,31 +63,7 @@ Status SerializeRecordBatch(const RecordBatch& batch, CudaContext* ctx,
 
 Status ReadMessage(CudaBufferReader* reader, MemoryPool* pool,
                    std::unique_ptr<ipc::Message>* out) {
-  int32_t message_length = 0;
-  int64_t bytes_read = 0;
-
-  RETURN_NOT_OK(reader->Read(sizeof(int32_t), &bytes_read,
-                             reinterpret_cast<uint8_t*>(&message_length)));
-  if (bytes_read != sizeof(int32_t)) {
-    *out = nullptr;
-    return Status::OK();
-  }
-
-  if (message_length == 0) {
-    // Optional 0 EOS control message
-    *out = nullptr;
-    return Status::OK();
-  }
-
-  std::shared_ptr<Buffer> metadata;
-  RETURN_NOT_OK(AllocateBuffer(pool, message_length, &metadata));
-  RETURN_NOT_OK(reader->Read(message_length, &bytes_read, metadata->mutable_data()));
-  if (bytes_read != message_length) {
-    return Status::IOError("Expected ", message_length, " metadata bytes, but only got ",
-                           bytes_read);
-  }
-
-  return ipc::Message::ReadFrom(metadata, reader, out);
+  return ipc::ReadMessageCopy(reader, pool, out);
 }
 
 Status ReadRecordBatch(const std::shared_ptr<Schema>& schema,
diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc
index dad1f98..a281b0d 100644
--- a/cpp/src/arrow/ipc/message.cc
+++ b/cpp/src/arrow/ipc/message.cc
@@ -32,10 +32,14 @@
 #include "arrow/ipc/util.h"
 #include "arrow/status.h"
 #include "arrow/util/logging.h"
+#include "arrow/util/ubsan.h"
 
 namespace arrow {
 namespace ipc {
 
+// This 0xFFFFFFFF value is the first 4 bytes of a valid IPC message
+constexpr int32_t kIpcContinuationToken = -1;
+
 class Message::MessageImpl {
  public:
   explicit MessageImpl(const std::shared_ptr<Buffer>& metadata,
@@ -142,12 +146,19 @@ bool Message::Equals(const Message& other) const {
   }
 }
 
-Status Message::ReadFrom(const std::shared_ptr<Buffer>& metadata, io::InputStream* stream,
-                         std::unique_ptr<Message>* out) {
+Status CheckMetadataAndGetBodyLength(const Buffer& metadata, int64_t* body_length) {
+  // Check metadata memory alignment in debug builds
+  DCHECK_EQ(0, reinterpret_cast<uintptr_t>(metadata.data()) % 8);
   const flatbuf::Message* fb_message;
-  RETURN_NOT_OK(internal::VerifyMessage(metadata->data(), metadata->size(), &fb_message));
+  RETURN_NOT_OK(internal::VerifyMessage(metadata.data(), metadata.size(), &fb_message));
+  *body_length = fb_message->bodyLength();
+  return Status::OK();
+}
 
-  int64_t body_length = fb_message->bodyLength();
+Status Message::ReadFrom(const std::shared_ptr<Buffer>& metadata, io::InputStream* stream,
+                         std::unique_ptr<Message>* out) {
+  int64_t body_length = -1;
+  RETURN_NOT_OK(CheckMetadataAndGetBodyLength(*metadata, &body_length));
 
   std::shared_ptr<Buffer> body;
   RETURN_NOT_OK(stream->Read(body_length, &body));
@@ -161,9 +172,8 @@ Status Message::ReadFrom(const std::shared_ptr<Buffer>& metadata, io::InputStrea
 
 Status Message::ReadFrom(const int64_t offset, const std::shared_ptr<Buffer>& metadata,
                          io::RandomAccessFile* file, std::unique_ptr<Message>* out) {
-  const flatbuf::Message* fb_message;
-  RETURN_NOT_OK(internal::VerifyMessage(metadata->data(), metadata->size(), &fb_message));
-  int64_t body_length = fb_message->bodyLength();
+  int64_t body_length = -1;
+  RETURN_NOT_OK(CheckMetadataAndGetBodyLength(*metadata, &body_length));
 
   std::shared_ptr<Buffer> body;
   RETURN_NOT_OK(file->ReadAt(offset, body_length, &body));
@@ -184,10 +194,10 @@ Status WritePadding(io::OutputStream* stream, int64_t nbytes) {
   return Status::OK();
 }
 
-Status Message::SerializeTo(io::OutputStream* stream, int32_t alignment,
+Status Message::SerializeTo(io::OutputStream* stream, const IpcOptions& options,
                             int64_t* output_length) const {
   int32_t metadata_length = 0;
-  RETURN_NOT_OK(internal::WriteMessage(*metadata(), alignment, stream, &metadata_length));
+  RETURN_NOT_OK(WriteMessage(*metadata(), options, stream, &metadata_length));
 
   *output_length = metadata_length;
 
@@ -237,15 +247,47 @@ Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile
                            " metadata bytes but got ", buffer->size());
   }
 
-  int32_t flatbuffer_size = *reinterpret_cast<const int32_t*>(buffer->data());
+  const int32_t continuation = util::SafeLoadAs<int32_t>(buffer->data());
+
+  // The size of the Flatbuffer including padding
+  int32_t flatbuffer_length = -1;
+  int32_t prefix_size = -1;
+  if (continuation == kIpcContinuationToken) {
+    if (metadata_length < 8) {
+      return Status::Invalid(
+          "Corrupted IPC message, had continuation token "
+          " but length ",
+          metadata_length);
+    }
 
-  if (flatbuffer_size + static_cast<int>(sizeof(int32_t)) > metadata_length) {
-    return Status::Invalid("flatbuffer size ", metadata_length,
+    // Valid IPC message, parse the message length now
+    flatbuffer_length = util::SafeLoadAs<int32_t>(buffer->data() + 4);
+    prefix_size = 8;
+  } else if (continuation == 0) {
+    // EOS
+    *message = nullptr;
+    return Status::OK();
+  } else {
+    // ARROW-6314: Backwards compatibility for reading old IPC
+    // messages produced prior to version 0.15.0
+    flatbuffer_length = continuation;
+    prefix_size = 4;
+  }
+
+  if (flatbuffer_length + prefix_size != metadata_length) {
+    return Status::Invalid("flatbuffer size ", flatbuffer_length,
                            " invalid. File offset: ", offset,
                            ", metadata length: ", metadata_length);
   }
 
-  auto metadata = SliceBuffer(buffer, 4, buffer->size() - 4);
+  std::shared_ptr<Buffer> metadata =
+      SliceBuffer(buffer, prefix_size, buffer->size() - prefix_size);
+  if (prefix_size == 4) {
+    // ARROW-6314: For old messages we copy the metadata to fix UBSAN
+    // issues with Flatbuffers. For new messages, they are already
+    // aligned
+    RETURN_NOT_OK(metadata->Copy(0, metadata->size(), &metadata));
+  }
   return Message::ReadFrom(offset + metadata_length, metadata, file, message);
 }
 
@@ -269,39 +311,105 @@ Status CheckAligned(io::FileInterface* stream, int32_t alignment) {
   int64_t current_position;
   ARROW_RETURN_NOT_OK(stream->Tell(&current_position));
   if (current_position % alignment != 0) {
-    return Status::Invalid("Stream is not aligned");
+    return Status::Invalid("Stream is not aligned pos: ", current_position,
+                           " alignment: ", alignment);
   } else {
     return Status::OK();
   }
 }
 
-Status ReadMessage(io::InputStream* file, std::unique_ptr<Message>* message) {
-  int32_t message_length = 0;
+namespace {
+
+Status ReadMessage(io::InputStream* file, MemoryPool* pool, bool copy_metadata,
+                   std::unique_ptr<Message>* message) {
+  int32_t continuation = 0;
   int64_t bytes_read = 0;
   RETURN_NOT_OK(file->Read(sizeof(int32_t), &bytes_read,
-                           reinterpret_cast<uint8_t*>(&message_length)));
+                           reinterpret_cast<uint8_t*>(&continuation)));
 
   if (bytes_read != sizeof(int32_t)) {
+    // EOS
     *message = nullptr;
     return Status::OK();
   }
 
-  if (message_length == 0) {
-    // Optional 0 EOS control message
+  int32_t flatbuffer_length = -1;
+  bool legacy_format = false;
+  if (continuation == kIpcContinuationToken) {
+    // Valid IPC message, read the message length now
+    RETURN_NOT_OK(file->Read(sizeof(int32_t), &bytes_read,
+                             reinterpret_cast<uint8_t*>(&flatbuffer_length)));
+  } else if (continuation == 0) {
+    // EOS
     *message = nullptr;
     return Status::OK();
+  } else {
+    // ARROW-6314: Backwards compatibility for reading old IPC
+    // messages produced prior to version 0.15.0
+    flatbuffer_length = continuation;
+    legacy_format = true;
   }
 
   std::shared_ptr<Buffer> metadata;
-  RETURN_NOT_OK(file->Read(message_length, &metadata));
-  if (metadata->size() != message_length) {
-    return Status::Invalid("Expected to read ", message_length, " metadata bytes, but ",
-                           "only read ", metadata->size());
+  if (legacy_format || copy_metadata) {
+    DCHECK_NE(pool, nullptr);
+    RETURN_NOT_OK(AllocateBuffer(pool, flatbuffer_length, &metadata));
+    RETURN_NOT_OK(file->Read(flatbuffer_length, &bytes_read, metadata->mutable_data()));
+  } else {
+    RETURN_NOT_OK(file->Read(flatbuffer_length, &metadata));
+    bytes_read = metadata->size();
+  }
+  if (bytes_read != flatbuffer_length) {
+    return Status::Invalid("Expected to read ", flatbuffer_length,
+                           " metadata bytes, but ", "only read ", bytes_read);
   }
 
   return Message::ReadFrom(metadata, file, message);
 }
 
+}  // namespace
+
+Status ReadMessage(io::InputStream* file, std::unique_ptr<Message>* out) {
+  return ReadMessage(file, default_memory_pool(), /*copy_metadata=*/false, out);
+}
+
+Status ReadMessageCopy(io::InputStream* file, MemoryPool* pool,
+                       std::unique_ptr<Message>* out) {
+  return ReadMessage(file, pool, /*copy_metadata=*/true, out);
+}
+
+Status WriteMessage(const Buffer& message, const IpcOptions& options,
+                    io::OutputStream* file, int32_t* message_length) {
+  const int32_t prefix_size = options.write_legacy_ipc_format ? 4 : 8;
+  const int32_t flatbuffer_size = static_cast<int32_t>(message.size());
+
+  int32_t padded_message_length = static_cast<int32_t>(
+      PaddedLength(flatbuffer_size + prefix_size, options.alignment));
+
+  int32_t padding = padded_message_length - flatbuffer_size - prefix_size;
+
+  // The returned message size includes the length prefix, the flatbuffer,
+  // plus padding
+  *message_length = padded_message_length;
+
+  // ARROW-6314: Write continuation / padding token
+  if (!options.write_legacy_ipc_format) {
+    RETURN_NOT_OK(file->Write(&kIpcContinuationToken, sizeof(int32_t)));
+  }
+
+  // Write the flatbuffer size prefix including padding
+  int32_t padded_flatbuffer_size = padded_message_length - prefix_size;
+  RETURN_NOT_OK(file->Write(&padded_flatbuffer_size, sizeof(int32_t)));
+
+  // Write the flatbuffer
+  RETURN_NOT_OK(file->Write(message.data(), flatbuffer_size));
+  if (padding > 0) {
+    RETURN_NOT_OK(file->Write(kPaddingBytes, padding));
+  }
+
+  return Status::OK();
+}
+
 // ----------------------------------------------------------------------
 // Implement InputStream message reader
 
diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h
index 9c152d7..89be45e 100644
--- a/cpp/src/arrow/ipc/message.h
+++ b/cpp/src/arrow/ipc/message.h
@@ -17,8 +17,7 @@
 
 // C++ object model and user API for interprocess schema messaging
 
-#ifndef ARROW_IPC_MESSAGE_H
-#define ARROW_IPC_MESSAGE_H
+#pragma once
 
 #include <cstdint>
 #include <memory>
@@ -32,6 +31,7 @@
 namespace arrow {
 
 class Buffer;
+class MemoryPool;
 
 namespace io {
 
@@ -137,13 +137,10 @@ class ARROW_EXPORT Message {
   /// \brief Write length-prefixed metadata and body to output stream
   ///
   /// \param[in] file output stream to write to
-  /// \param[in] alignment byte alignment for metadata, usually 8 or
-  /// 64. Whether the body is padded depends on the metadata; if the body
-  /// buffer is smaller than the size indicated in the metadata, then extra
-  /// padding bytes will be written
+  /// \param[in] options IPC writing options including alignment
   /// \param[out] output_length the number of bytes written
   /// \return Status
-  Status SerializeTo(io::OutputStream* file, int32_t alignment,
+  Status SerializeTo(io::OutputStream* file, const IpcOptions& options,
                      int64_t* output_length) const;
 
   /// \brief Return true if the Message metadata passes Flatbuffer validation
@@ -223,15 +220,39 @@ Status AlignStream(io::OutputStream* stream, int32_t alignment = 8);
 ARROW_EXPORT
 Status CheckAligned(io::FileInterface* stream, int32_t alignment = 8);
 
-/// \brief Read encapsulated RPC message (metadata and body) from InputStream
+/// \brief Read encapsulated IPC message (metadata and body) from InputStream
 ///
-/// Read length-prefixed message with as-yet unknown length. Returns null if
-/// there are not enough bytes available or the message length is 0 (e.g. EOS
-/// in a stream)
+/// Returns null if there are not enough bytes available or the
+/// message length is 0 (e.g. EOS in a stream)
 ARROW_EXPORT
 Status ReadMessage(io::InputStream* stream, std::unique_ptr<Message>* message);
 
+/// \brief Read encapsulated IPC message (metadata and body) from InputStream
+///
+/// Like ReadMessage, except that the metadata is copied in a new buffer.
+/// This is necessary if the stream returns non-CPU buffers.
+ARROW_EXPORT
+Status ReadMessageCopy(io::InputStream* stream, MemoryPool* pool,
+                       std::unique_ptr<Message>* message);
+
+/// Write encapsulated IPC message Does not make assumptions about
+/// whether the stream is aligned already. Can write legacy (pre
+/// version 0.15.0) IPC message if option set
+///
+/// continuation: 0xFFFFFFFF
+/// message_size: int32
+/// message: const void*
+/// padding
+///
+/// \param[in] message a buffer containing the metadata to write
+/// \param[in] options IPC writing options, including alignment and
+/// legacy message support
+/// \param[in,out] file the OutputStream to write to
+/// \param[out] message_length the total size of the payload written including
+/// padding
+/// \return Status
+Status WriteMessage(const Buffer& message, const IpcOptions& options,
+                    io::OutputStream* file, int32_t* message_length);
+
 }  // namespace ipc
 }  // namespace arrow
-
-#endif  // ARROW_IPC_MESSAGE_H
diff --git a/cpp/src/arrow/ipc/metadata_internal.cc b/cpp/src/arrow/ipc/metadata_internal.cc
index 6810351..dff3369 100644
--- a/cpp/src/arrow/ipc/metadata_internal.cc
+++ b/cpp/src/arrow/ipc/metadata_internal.cc
@@ -1282,38 +1282,6 @@ Status GetSparseTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>
   return ConcreteTypeFromFlatbuffer(sparse_tensor->type_type(), type_data, {}, type);
 }
 
-// ----------------------------------------------------------------------
-// Implement message writing
-
-Status WriteMessage(const Buffer& message, int32_t alignment, io::OutputStream* file,
-                    int32_t* message_length) {
-  // ARROW-3212: We do not make assumptions that the output stream is aligned
-  int32_t padded_message_length = static_cast<int32_t>(message.size()) + 4;
-  const int32_t remainder = padded_message_length % alignment;
-  if (remainder != 0) {
-    padded_message_length += alignment - remainder;
-  }
-
-  // The returned message size includes the length prefix, the flatbuffer,
-  // plus padding
-  *message_length = padded_message_length;
-
-  // Write the flatbuffer size prefix including padding
-  int32_t flatbuffer_size = padded_message_length - 4;
-  RETURN_NOT_OK(file->Write(&flatbuffer_size, sizeof(int32_t)));
-
-  // Write the flatbuffer
-  RETURN_NOT_OK(file->Write(message.data(), message.size()));
-
-  // Write any padding
-  int32_t padding = padded_message_length - static_cast<int32_t>(message.size()) - 4;
-  if (padding > 0) {
-    RETURN_NOT_OK(file->Write(kPaddingBytes, padding));
-  }
-
-  return Status::OK();
-}
-
 }  // namespace internal
 }  // namespace ipc
 }  // namespace arrow
diff --git a/cpp/src/arrow/ipc/metadata_internal.h b/cpp/src/arrow/ipc/metadata_internal.h
index 828affd..420cfb1 100644
--- a/cpp/src/arrow/ipc/metadata_internal.h
+++ b/cpp/src/arrow/ipc/metadata_internal.h
@@ -128,22 +128,6 @@ static inline Status VerifyMessage(const uint8_t* data, int64_t size,
   return Status::OK();
 }
 
-/// Write a serialized message metadata with a length-prefix and padding to an
-/// 8-byte offset. Does not make assumptions about whether the stream is
-/// aligned already
-///
-/// <message_size: int32><message: const void*><padding>
-///
-/// \param[in] message a buffer containing the metadata to write
-/// \param[in] alignment the size multiple of the total message size including
-/// length prefix, metadata, and padding. Usually 8 or 64
-/// \param[in,out] file the OutputStream to write to
-/// \param[out] message_length the total size of the payload written including
-/// padding
-/// \return Status
-Status WriteMessage(const Buffer& message, int32_t alignment, io::OutputStream* file,
-                    int32_t* message_length);
-
 // Serialize arrow::Schema as a Flatbuffer
 //
 // \param[in] schema a Schema instance
diff --git a/cpp/src/arrow/ipc/options.h b/cpp/src/arrow/ipc/options.h
index d380402..3570c06 100644
--- a/cpp/src/arrow/ipc/options.h
+++ b/cpp/src/arrow/ipc/options.h
@@ -36,6 +36,14 @@ struct ARROW_EXPORT IpcOptions {
   // The maximum permitted schema nesting depth.
   int max_recursion_depth = kMaxNestingDepth;
 
+  // Write padding after memory buffers to this multiple of
+  // bytes. Generally 8 or 64
+  int32_t alignment = 8;
+
+  /// \brief Write the pre-0.15.0 encapsulated IPC message format
+  /// consisting of a 4-byte prefix instead of 8 byte
+  bool write_legacy_ipc_format = false;
+
   static IpcOptions Defaults();
 };
 
diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc
index 9cbeacf..efd0a88 100644
--- a/cpp/src/arrow/ipc/read_write_test.cc
+++ b/cpp/src/arrow/ipc/read_write_test.cc
@@ -106,27 +106,69 @@ TEST(TestMessage, SerializeTo) {
 
   std::shared_ptr<io::BufferOutputStream> stream;
 
-  {
-    const int32_t alignment = 8;
-
+  auto CheckWithAlignment = [&](int32_t alignment) {
+    IpcOptions options;
+    options.alignment = alignment;
+    const int32_t prefix_size = 8;
     ASSERT_OK(io::BufferOutputStream::Create(1 << 10, default_memory_pool(), &stream));
-    ASSERT_OK(message->SerializeTo(stream.get(), alignment, &output_length));
+    ASSERT_OK(message->SerializeTo(stream.get(), options, &output_length));
     ASSERT_OK(stream->Tell(&position));
-    ASSERT_EQ(BitUtil::RoundUp(metadata->size() + 4, alignment) + body_length,
+    ASSERT_EQ(BitUtil::RoundUp(metadata->size() + prefix_size, alignment) + body_length,
               output_length);
     ASSERT_EQ(output_length, position);
-  }
+  };
 
-  {
-    const int32_t alignment = 64;
+  CheckWithAlignment(8);
+  CheckWithAlignment(64);
+}
 
-    ASSERT_OK(io::BufferOutputStream::Create(1 << 10, default_memory_pool(), &stream));
-    ASSERT_OK(message->SerializeTo(stream.get(), alignment, &output_length));
-    ASSERT_OK(stream->Tell(&position));
-    ASSERT_EQ(BitUtil::RoundUp(metadata->size() + 4, alignment) + body_length,
-              output_length);
-    ASSERT_EQ(output_length, position);
-  }
+void BuffersOverlapEquals(const Buffer& left, const Buffer& right) {
+  ASSERT_GT(left.size(), 0);
+  ASSERT_GT(right.size(), 0);
+  ASSERT_TRUE(left.Equals(right, std::min(left.size(), right.size())));
+}
+
+TEST(TestMessage, LegacyIpcBackwardsCompatibility) {
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK(MakeIntBatchSized(36, &batch));
+
+  auto RoundtripWithOptions = [&](const IpcOptions& arg_options,
+                                  std::shared_ptr<Buffer>* out_serialized,
+                                  std::unique_ptr<Message>* out) {
+    internal::IpcPayload payload;
+    ASSERT_OK(internal::GetRecordBatchPayload(*batch, arg_options, default_memory_pool(),
+                                              &payload));
+
+    std::shared_ptr<io::BufferOutputStream> stream;
+    ASSERT_OK(io::BufferOutputStream::Create(1 << 20, default_memory_pool(), &stream));
+
+    int32_t metadata_length = -1;
+    ASSERT_OK(
+        internal::WriteIpcPayload(payload, arg_options, stream.get(), &metadata_length));
+
+    ASSERT_OK(stream->Finish(out_serialized));
+    io::BufferReader io_reader(*out_serialized);
+    ASSERT_OK(ReadMessage(&io_reader, out));
+  };
+
+  std::shared_ptr<Buffer> serialized, legacy_serialized;
+  std::unique_ptr<Message> message, legacy_message;
+
+  IpcOptions options;
+  RoundtripWithOptions(options, &serialized, &message);
+
+  // First 4 bytes 0xFFFFFFFF Continuation marker
+  ASSERT_EQ(-1, util::SafeLoadAs<int32_t>(serialized->data()));
+
+  options.write_legacy_ipc_format = true;
+  RoundtripWithOptions(options, &legacy_serialized, &legacy_message);
+
+  // Check that the continuation marker is not written
+  ASSERT_NE(-1, util::SafeLoadAs<int32_t>(legacy_serialized->data()));
+
+  // Have to use the smaller size to exclude padding
+  BuffersOverlapEquals(*legacy_message->metadata(), *message->metadata());
+  ASSERT_TRUE(legacy_message->body()->Equals(*message->body()));
 }
 
 TEST(TestMessage, Verify) {
@@ -635,13 +677,14 @@ TEST_F(RecursionLimits, StressLimit) {
 #endif  // !defined(_WIN32) || defined(NDEBUG)
 
 struct FileWriterHelper {
-  Status Init(const std::shared_ptr<Schema>& schema) {
+  Status Init(const std::shared_ptr<Schema>& schema, const IpcOptions& options) {
     num_batches_written_ = 0;
 
     RETURN_NOT_OK(AllocateResizableBuffer(0, &buffer_));
     sink_.reset(new io::BufferOutputStream(buffer_));
-
-    return RecordBatchFileWriter::Open(sink_.get(), schema, &writer_);
+    ARROW_ASSIGN_OR_RAISE(writer_,
+                          RecordBatchFileWriter::Open(sink_.get(), schema, options));
+    return Status::OK();
   }
 
   Status WriteBatch(const std::shared_ptr<RecordBatch>& batch) {
@@ -680,11 +723,12 @@ struct FileWriterHelper {
 };
 
 struct StreamWriterHelper {
-  Status Init(const std::shared_ptr<Schema>& schema) {
+  Status Init(const std::shared_ptr<Schema>& schema, const IpcOptions& options) {
     RETURN_NOT_OK(AllocateResizableBuffer(0, &buffer_));
     sink_.reset(new io::BufferOutputStream(buffer_));
-
-    return RecordBatchStreamWriter::Open(sink_.get(), schema, &writer_);
+    ARROW_ASSIGN_OR_RAISE(writer_,
+                          RecordBatchStreamWriter::Open(sink_.get(), schema, options));
+    return Status::OK();
   }
 
   Status WriteBatch(const std::shared_ptr<RecordBatch>& batch) {
@@ -718,7 +762,7 @@ class ReaderWriterMixin {
 
   // Check simple RecordBatch roundtripping
   template <typename Param>
-  void TestRoundTrip(Param&& param) {
+  void TestRoundTrip(Param&& param, const IpcOptions& options) {
     std::shared_ptr<RecordBatch> batch1;
     std::shared_ptr<RecordBatch> batch2;
     ASSERT_OK(param(&batch1));  // NOLINT clang-tidy gtest issue
@@ -727,7 +771,7 @@ class ReaderWriterMixin {
     BatchVector in_batches = {batch1, batch2};
     BatchVector out_batches;
 
-    ASSERT_OK(RoundTripHelper(in_batches, &out_batches));
+    ASSERT_OK(RoundTripHelper(in_batches, options, &out_batches));
     ASSERT_EQ(out_batches.size(), in_batches.size());
 
     // Compare batches
@@ -741,7 +785,7 @@ class ReaderWriterMixin {
     ASSERT_OK(MakeDictionary(&batch));
 
     BatchVector out_batches;
-    ASSERT_OK(RoundTripHelper({batch}, &out_batches));
+    ASSERT_OK(RoundTripHelper({batch}, IpcOptions::Defaults(), &out_batches));
     ASSERT_EQ(out_batches.size(), 1);
 
     // TODO(wesm): This was broken in ARROW-3144. I'm not sure how to
@@ -764,7 +808,7 @@ class ReaderWriterMixin {
     schema = schema->WithMetadata(key_value_metadata({"some_key"}, {"some_value"}));
 
     WriterHelper writer_helper;
-    ASSERT_OK(writer_helper.Init(schema));
+    ASSERT_OK(writer_helper.Init(schema, IpcOptions::Defaults()));
     // Writing a record batch with a different schema
     ASSERT_RAISES(Invalid, writer_helper.WriteBatch(batch_ints));
     // Writing a record batch with the same schema (except metadata)
@@ -781,9 +825,10 @@ class ReaderWriterMixin {
   }
 
  private:
-  Status RoundTripHelper(const BatchVector& in_batches, BatchVector* out_batches) {
+  Status RoundTripHelper(const BatchVector& in_batches, const IpcOptions& options,
+                         BatchVector* out_batches) {
     WriterHelper writer_helper;
-    RETURN_NOT_OK(writer_helper.Init(in_batches[0]->schema()));
+    RETURN_NOT_OK(writer_helper.Init(in_batches[0]->schema(), options));
     for (const auto& batch : in_batches) {
       RETURN_NOT_OK(writer_helper.WriteBatch(batch));
     }
@@ -813,9 +858,21 @@ class TestFileFormat : public ReaderWriterMixin<FileWriterHelper>,
 class TestStreamFormat : public ReaderWriterMixin<StreamWriterHelper>,
                          public ::testing::TestWithParam<MakeRecordBatch*> {};
 
-TEST_P(TestFileFormat, RoundTrip) { TestRoundTrip(*GetParam()); }
+TEST_P(TestFileFormat, RoundTrip) {
+  TestRoundTrip(*GetParam(), IpcOptions::Defaults());
 
-TEST_P(TestStreamFormat, RoundTrip) { TestRoundTrip(*GetParam()); }
+  IpcOptions options;
+  options.write_legacy_ipc_format = true;
+  TestRoundTrip(*GetParam(), options);
+}
+
+TEST_P(TestStreamFormat, RoundTrip) {
+  TestRoundTrip(*GetParam(), IpcOptions::Defaults());
+
+  IpcOptions options;
+  options.write_legacy_ipc_format = true;
+  TestRoundTrip(*GetParam(), options);
+}
 
 INSTANTIATE_TEST_CASE_P(GenericIpcRoundTripTests, TestIpcRoundTrip, BATCH_CASES());
 INSTANTIATE_TEST_CASE_P(FileRoundTripTests, TestFileFormat, BATCH_CASES());
@@ -912,13 +969,15 @@ void SpliceMessages(std::shared_ptr<Buffer> stream,
       continue;
     }
 
+    IpcOptions options;
     internal::IpcPayload payload;
     payload.type = msg->type();
     payload.metadata = msg->metadata();
     payload.body_buffers.push_back(msg->body());
     payload.body_length = msg->body()->size();
     int32_t unused_metadata_length = -1;
-    ASSERT_OK(internal::WriteIpcPayload(payload, out.get(), &unused_metadata_length));
+    ASSERT_OK(
+        internal::WriteIpcPayload(payload, options, out.get(), &unused_metadata_length));
   }
   ASSERT_OK(out->Finish(spliced_stream));
 }
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 81332a6..7127300 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -529,10 +529,9 @@ class DictionaryWriter : public RecordBatchSerializer {
   int64_t dictionary_id_;
 };
 
-Status WriteIpcPayload(const IpcPayload& payload, io::OutputStream* dst,
-                       int32_t* metadata_length) {
-  RETURN_NOT_OK(internal::WriteMessage(*payload.metadata, kArrowIpcAlignment, dst,
-                                       metadata_length));
+Status WriteIpcPayload(const IpcPayload& payload, const IpcOptions& options,
+                       io::OutputStream* dst, int32_t* metadata_length) {
+  RETURN_NOT_OK(WriteMessage(*payload.metadata, options, dst, metadata_length));
 
 #ifndef NDEBUG
   RETURN_NOT_OK(CheckAligned(dst));
@@ -604,7 +603,7 @@ Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
   // The body size is computed in the payload
   *body_length = payload.body_length;
 
-  return internal::WriteIpcPayload(payload, dst, metadata_length);
+  return internal::WriteIpcPayload(payload, options, dst, metadata_length);
 }
 
 Status WriteRecordBatchStream(const std::vector<std::shared_ptr<RecordBatch>>& batches,
@@ -625,7 +624,9 @@ Status WriteTensorHeader(const Tensor& tensor, io::OutputStream* dst,
                          int32_t* metadata_length) {
   std::shared_ptr<Buffer> metadata;
   RETURN_NOT_OK(internal::WriteTensorMessage(tensor, 0, &metadata));
-  return internal::WriteMessage(*metadata, kTensorAlignment, dst, metadata_length);
+  IpcOptions options;
+  options.alignment = kTensorAlignment;
+  return WriteMessage(*metadata, options, dst, metadata_length);
 }
 
 Status WriteStridedTensorData(int dim_index, int64_t offset, int elem_size,
@@ -818,19 +819,7 @@ Status WriteSparseTensor(const SparseTensor& sparse_tensor, io::OutputStream* ds
   RETURN_NOT_OK(writer.Assemble(sparse_tensor));
 
   *body_length = payload.body_length;
-  return internal::WriteIpcPayload(payload, dst, metadata_length);
-}
-
-Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,
-                       int64_t buffer_start_offset, io::OutputStream* dst,
-                       int32_t* metadata_length, int64_t* body_length, MemoryPool* pool) {
-  auto options = IpcOptions::Defaults();
-  internal::IpcPayload payload;
-  RETURN_NOT_OK(GetDictionaryPayload(dictionary_id, dictionary, options, pool, &payload));
-
-  // The body size is computed in the payload
-  *body_length = payload.body_length;
-  return internal::WriteIpcPayload(payload, dst, metadata_length);
+  return internal::WriteIpcPayload(payload, IpcOptions::Defaults(), dst, metadata_length);
 }
 
 Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) {
@@ -1022,20 +1011,24 @@ class StreamBookKeeper {
     return Status::OK();
   }
 
+  Status WriteEOS() {
+    // End of stream marker
+    constexpr int64_t kEos = 0;
+    return Write(&kEos, sizeof(kEos));
+  }
+
  protected:
   io::OutputStream* sink_;
   int64_t position_;
 };
 
-// End of stream marker
-constexpr int32_t kEos = 0;
-
 /// A IpcPayloadWriter implementation that writes to a IPC stream
 /// (with an end-of-stream marker)
 class PayloadStreamWriter : public internal::IpcPayloadWriter,
                             protected StreamBookKeeper {
  public:
-  explicit PayloadStreamWriter(io::OutputStream* sink) : StreamBookKeeper(sink) {}
+  PayloadStreamWriter(const IpcOptions& options, io::OutputStream* sink)
+      : StreamBookKeeper(sink), options_(options) {}
 
   ~PayloadStreamWriter() override = default;
 
@@ -1046,23 +1039,24 @@ class PayloadStreamWriter : public internal::IpcPayloadWriter,
 #endif
 
     int32_t metadata_length = 0;  // unused
-    RETURN_NOT_OK(WriteIpcPayload(payload, sink_, &metadata_length));
+    RETURN_NOT_OK(WriteIpcPayload(payload, options_, sink_, &metadata_length));
     RETURN_NOT_OK(UpdatePositionCheckAligned());
     return Status::OK();
   }
 
-  Status Close() override {
-    // Write 0 EOS message
-    return Write(&kEos, sizeof(int32_t));
-  }
+  Status Close() override { return WriteEOS(); }
+
+ private:
+  IpcOptions options_;
 };
 
 /// A IpcPayloadWriter implementation that writes to a IPC file
 /// (with a footer as defined in File.fbs)
 class PayloadFileWriter : public internal::IpcPayloadWriter, protected StreamBookKeeper {
  public:
-  PayloadFileWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema)
-      : StreamBookKeeper(sink), schema_(schema) {}
+  PayloadFileWriter(const IpcOptions& options, const std::shared_ptr<Schema>& schema,
+                    io::OutputStream* sink)
+      : StreamBookKeeper(sink), options_(options), schema_(schema) {}
 
   ~PayloadFileWriter() override = default;
 
@@ -1074,7 +1068,7 @@ class PayloadFileWriter : public internal::IpcPayloadWriter, protected StreamBoo
 
     // Metadata length must include padding, it's computed by WriteIpcPayload()
     FileBlock block = {position_, 0, payload.body_length};
-    RETURN_NOT_OK(WriteIpcPayload(payload, sink_, &block.metadata_length));
+    RETURN_NOT_OK(WriteIpcPayload(payload, options_, sink_, &block.metadata_length));
     RETURN_NOT_OK(UpdatePositionCheckAligned());
 
     // Record position and size of some message types, to list them in the footer
@@ -1107,7 +1101,7 @@ class PayloadFileWriter : public internal::IpcPayloadWriter, protected StreamBoo
 
   Status Close() override {
     // Write 0 EOS message for compatibility with sequential readers
-    RETURN_NOT_OK(Write(&kEos, sizeof(int32_t)));
+    RETURN_NOT_OK(WriteEOS());
 
     // Write file footer
     RETURN_NOT_OK(UpdatePosition());
@@ -1128,6 +1122,7 @@ class PayloadFileWriter : public internal::IpcPayloadWriter, protected StreamBoo
   }
 
  protected:
+  IpcOptions options_;
   std::shared_ptr<Schema> schema_;
   std::vector<FileBlock> dictionaries_;
   std::vector<FileBlock> record_batches_;
@@ -1141,9 +1136,9 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl
   RecordBatchStreamWriterImpl(io::OutputStream* sink,
                               const std::shared_ptr<Schema>& schema,
                               const IpcOptions& options)
-      : RecordBatchPayloadWriter(
-            std::unique_ptr<internal::IpcPayloadWriter>(new PayloadStreamWriter(sink)),
-            schema, options) {}
+      : RecordBatchPayloadWriter(std::unique_ptr<internal::IpcPayloadWriter>(
+                                     new PayloadStreamWriter(options, sink)),
+                                 schema, options) {}
 
   ~RecordBatchStreamWriterImpl() = default;
 };
@@ -1153,7 +1148,7 @@ class RecordBatchFileWriter::RecordBatchFileWriterImpl : public RecordBatchPaylo
   RecordBatchFileWriterImpl(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
                             const IpcOptions& options)
       : RecordBatchPayloadWriter(std::unique_ptr<internal::IpcPayloadWriter>(
-                                     new PayloadFileWriter(sink, schema)),
+                                     new PayloadFileWriter(options, schema, sink)),
                                  schema, options) {}
 
   ~RecordBatchFileWriterImpl() = default;
@@ -1277,7 +1272,7 @@ Status SerializeSchema(const Schema& schema, DictionaryMemo* dictionary_memo,
   RETURN_NOT_OK(io::BufferOutputStream::Create(1024, pool, &stream));
 
   auto options = IpcOptions::Defaults();
-  auto payload_writer = make_unique<PayloadStreamWriter>(stream.get());
+  auto payload_writer = make_unique<PayloadStreamWriter>(options, stream.get());
   RecordBatchPayloadWriter writer(std::move(payload_writer), schema, options,
                                   dictionary_memo);
   // Write schema and populate fields (but not dictionaries) in dictionary_memo
diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h
index 75030c2..c673b0a 100644
--- a/cpp/src/arrow/ipc/writer.h
+++ b/cpp/src/arrow/ipc/writer.h
@@ -177,7 +177,9 @@ class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter {
   std::unique_ptr<RecordBatchFileWriterImpl> file_impl_;
 };
 
-/// \brief Low-level API for writing a record batch (without schema) to an OutputStream
+/// \brief Low-level API for writing a record batch (without schema)
+/// to an OutputStream as encapsulated IPC message. See Arrow format
+/// documentation for more detail.
 ///
 /// \param[in] batch the record batch to write
 /// \param[in] buffer_start_offset the start offset to use in the buffer metadata,
@@ -189,20 +191,6 @@ class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter {
 /// \param[in] options options for serialization
 /// \param[in] pool the memory pool to allocate memory from
 /// \return Status
-///
-/// Write the RecordBatch (collection of equal-length Arrow arrays) to the
-/// output stream in a contiguous block. The record batch metadata is written as
-/// a flatbuffer (see format/Message.fbs -- the RecordBatch message type)
-/// prefixed by its size, followed by each of the memory buffers in the batch
-/// written end to end (with appropriate alignment and padding):
-///
-/// \code
-/// <int32: metadata size> <uint8*: metadata> <buffers ...>
-/// \endcode
-///
-/// Finally, the absolute offsets (relative to the start of the output stream)
-/// to the end of the body and end of the metadata / data header (suffixed by
-/// the header size) is returned in out-variables
 ARROW_EXPORT
 Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
                         io::OutputStream* dst, int32_t* metadata_length,
@@ -392,8 +380,8 @@ Status GetRecordBatchPayload(const RecordBatch& batch, const IpcOptions& options
                              MemoryPool* pool, IpcPayload* out);
 
 ARROW_EXPORT
-Status WriteIpcPayload(const IpcPayload& payload, io::OutputStream* dst,
-                       int32_t* metadata_length);
+Status WriteIpcPayload(const IpcPayload& payload, const IpcOptions& options,
+                       io::OutputStream* dst, int32_t* metadata_length);
 
 }  // namespace internal
 
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 2c05ec5..87393fb 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -964,6 +964,11 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
         MessageType_V4" arrow::ipc::MetadataVersion::V4"
 
     cdef cppclass CIpcOptions" arrow::ipc::IpcOptions":
+        c_bool allow_64bit
+        int max_recursion_depth
+        int32_t alignment
+        c_bool write_legacy_ipc_format
+
         @staticmethod
         CIpcOptions Defaults()
 
@@ -989,7 +994,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
         MetadataVersion metadata_version()
         MessageType type()
 
-        CStatus SerializeTo(OutputStream* stream, int32_t alignment,
+        CStatus SerializeTo(OutputStream* stream, const CIpcOptions& options,
                             int64_t* output_length)
 
     c_string FormatMessageType(MessageType type)
diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi
index b1aca23..6710f63 100644
--- a/python/pyarrow/ipc.pxi
+++ b/python/pyarrow/ipc.pxi
@@ -76,13 +76,14 @@ cdef class Message:
         """
         cdef:
             int64_t output_length = 0
-            int32_t c_alignment = alignment
             OutputStream* out
+            CIpcOptions options
 
+        options.alignment = alignment
         out = sink.get_output_stream().get()
         with nogil:
             check_status(self.message.get()
-                         .SerializeTo(out, c_alignment, &output_length))
+                         .SerializeTo(out, options, &output_length))
 
     def serialize(self, alignment=8, memory_pool=None):
         """


[arrow] 02/03: ARROW-6316: [Go] implement new ARROW format with 32b-aligned buffers

Posted by we...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch ARROW-6313-flatbuffer-alignment
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit fa96d4fcd52d7da135197bc7c4d219172c513a04
Author: Sebastien Binet <bi...@cern.ch>
AuthorDate: Thu Sep 5 09:53:34 2019 -0500

    ARROW-6316: [Go] implement new ARROW format with 32b-aligned buffers
    
    Closes #5251 from sbinet/issue-6316 and squashes the following commits:
    
    7427340b8 <Sebastien Binet> ARROW-6316:  implement new ARROW format with 32b-aligned buffers
    
    Authored-by: Sebastien Binet <bi...@cern.ch>
    Signed-off-by: Wes McKinney <we...@apache.org>
---
 go/arrow/ipc/ipc.go      |  5 +++--
 go/arrow/ipc/message.go  | 27 +++++++++++++++++++++++----
 go/arrow/ipc/metadata.go | 31 +++++++++++++++++++++++++------
 3 files changed, 51 insertions(+), 12 deletions(-)

diff --git a/go/arrow/ipc/ipc.go b/go/arrow/ipc/ipc.go
index e688974..e5c7d1d 100644
--- a/go/arrow/ipc/ipc.go
+++ b/go/arrow/ipc/ipc.go
@@ -37,8 +37,9 @@ const (
 )
 
 var (
-	paddingBytes [kArrowAlignment]byte
-	kEOS         = [4]byte{0, 0, 0, 0} // end of stream message
+	paddingBytes  [kArrowAlignment]byte
+	kEOS                 = [8]byte{0, 0, 0, 0, 0, 0, 0, 0} // end of stream message
+	kIPCContToken uint32 = 0xFFFFFFFF                      // 32b continuation indicator for FlatBuffers 8b alignment
 )
 
 func paddedLength(nbytes int64, alignment int32) int64 {
diff --git a/go/arrow/ipc/message.go b/go/arrow/ipc/message.go
index bb12dbb..bfd9494 100644
--- a/go/arrow/ipc/message.go
+++ b/go/arrow/ipc/message.go
@@ -181,12 +181,31 @@ func (r *MessageReader) Message() (*Message, error) {
 	var buf = make([]byte, 4)
 	_, err := io.ReadFull(r.r, buf)
 	if err != nil {
-		return nil, errors.Wrap(err, "arrow/ipc: could not read message length")
+		return nil, errors.Wrap(err, "arrow/ipc: could not read continuation indicator")
 	}
-	msgLen := int32(binary.LittleEndian.Uint32(buf))
-	if msgLen == 0 {
-		// optional 0 EOS control message
+	var (
+		cid    = binary.LittleEndian.Uint32(buf)
+		msgLen int32
+	)
+	switch cid {
+	case 0:
+		// EOS message.
 		return nil, io.EOF // FIXME(sbinet): send nil instead? or a special EOS error?
+	case kIPCContToken:
+		_, err = io.ReadFull(r.r, buf)
+		if err != nil {
+			return nil, errors.Wrap(err, "arrow/ipc: could not read message length")
+		}
+		msgLen = int32(binary.LittleEndian.Uint32(buf))
+		if msgLen == 0 {
+			// optional 0 EOS control message
+			return nil, io.EOF // FIXME(sbinet): send nil instead? or a special EOS error?
+		}
+
+	default:
+		// ARROW-6314: backwards compatibility for reading old IPC
+		// messages produced prior to version 0.15.0
+		msgLen = int32(cid)
 	}
 
 	buf = make([]byte, msgLen)
diff --git a/go/arrow/ipc/metadata.go b/go/arrow/ipc/metadata.go
index 7b4e3da..034d3e1 100644
--- a/go/arrow/ipc/metadata.go
+++ b/go/arrow/ipc/metadata.go
@@ -88,7 +88,19 @@ func (blk fileBlock) NewMessage() (*Message, error) {
 	if err != nil {
 		return nil, errors.Wrap(err, "arrow/ipc: could not read message metadata")
 	}
-	meta := memory.NewBufferBytes(buf[4:]) // drop buf-size already known from blk.Meta
+
+	prefix := 0
+	switch binary.LittleEndian.Uint32(buf) {
+	case 0:
+	case kIPCContToken:
+		prefix = 8
+	default:
+		// ARROW-6314: backwards compatibility for reading old IPC
+		// messages produced prior to version 0.15.0
+		prefix = 4
+	}
+
+	meta := memory.NewBufferBytes(buf[prefix:]) // drop buf-size already known from blk.Meta
 
 	buf = make([]byte, blk.Body)
 	_, err = io.ReadFull(r, buf)
@@ -1002,19 +1014,26 @@ func writeMessage(msg *memory.Buffer, alignment int32, w io.Writer) (int, error)
 	)
 
 	// ARROW-3212: we do not make any assumption on whether the output stream is aligned or not.
-	paddedMsgLen := int32(msg.Len()) + 4
+	paddedMsgLen := int32(msg.Len()) + 8
 	remainder := paddedMsgLen % alignment
 	if remainder != 0 {
 		paddedMsgLen += alignment - remainder
 	}
 
+	tmp := make([]byte, 4)
+
+	// write continuation indicator, to address 8-byte alignment requirement from FlatBuffers.
+	binary.LittleEndian.PutUint32(tmp, kIPCContToken)
+	_, err = w.Write(tmp)
+	if err != nil {
+		return 0, errors.Wrap(err, "arrow/ipc: could not write continuation bit indicator")
+	}
+
 	// the returned message size includes the length prefix, the flatbuffer, + padding
 	n = int(paddedMsgLen)
 
-	tmp := make([]byte, 4)
-
 	// write the flatbuffer size prefix, including padding
-	sizeFB := paddedMsgLen - 4
+	sizeFB := paddedMsgLen - 8
 	binary.LittleEndian.PutUint32(tmp, uint32(sizeFB))
 	_, err = w.Write(tmp)
 	if err != nil {
@@ -1028,7 +1047,7 @@ func writeMessage(msg *memory.Buffer, alignment int32, w io.Writer) (int, error)
 	}
 
 	// write any padding
-	padding := paddedMsgLen - int32(msg.Len()) - 4
+	padding := paddedMsgLen - int32(msg.Len()) - 8
 	if padding > 0 {
 		_, err = w.Write(paddingBytes[:padding])
 		if err != nil {