You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/09/20 20:57:02 UTC
[arrow] branch master updated: ARROW-3146: [C++] Prototype Flight
RPC client and server implementations
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new db0ef22 ARROW-3146: [C++] Prototype Flight RPC client and server implementations
db0ef22 is described below
commit db0ef22dd68ae00e11f09da40b6734c1d9770b57
Author: Wes McKinney <we...@apache.org>
AuthorDate: Thu Sep 20 16:56:50 2018 -0400
ARROW-3146: [C++] Prototype Flight RPC client and server implementations
This is a partial C++ implementation of the Flight RPC system initially proposed by Jacques in ARROW-249. As in Java, I had to dig into gRPC and Protocol Buffers internals to ensure that
* On write, memory is only copied once into the outgoing gRPC buffer
* On read, no memory is copied
The way that I tricked gRPC into circumventing the built-in protobuf serde paths might look a bit hacky, but after digging around in the library a bunch I've convinced myself that it's the best and perhaps only way to accomplish this. Luckily, the message that's being serialized/deserialized is pretty opaque to the rest of the gRPC system, and it's controlled by the `SerializationTraits<T>` class. So you can take a gRPC stream reader and make it create any kind of type you want, even [...]
Some things that won't be addressed in this patch, as scope is too large:
* gRPC build toolchain issues (this is rather complex, I will create follow-up issues)
* Security / encryption, and authentication issues. I have only implemented an insecure server
* Integration with Travis CI
* Python bindings
API is preliminary and I expect to be the subject of iteration to make general and fast over the next several months.
Author: Wes McKinney <we...@apache.org>
Closes #2547 from wesm/flight-cpp-prototype and squashes the following commits:
64bcdea43 <Wes McKinney> Initial Arrow Flight C++ implementation
---
ci/travis_before_script_cpp.sh | 4 +
ci/travis_script_cpp.sh | 2 +-
cpp/CMakeLists.txt | 36 +-
cpp/build-support/lint_cpp_cli.py | 6 +-
cpp/cmake_modules/FindProtobuf.cmake | 1 +
cpp/cmake_modules/ThirdpartyToolchain.cmake | 116 +++---
cpp/src/arrow/buffer.cc | 4 +
cpp/src/arrow/buffer.h | 5 +
.../arrow/dbi/hiveserver2/thrift/CMakeLists.txt | 9 +-
cpp/src/arrow/flight/CMakeLists.txt | 139 +++++++
cpp/src/arrow/flight/README.md | 36 ++
.../util/stopwatch.h => arrow/flight/api.h} | 38 +-
cpp/src/arrow/flight/client.cc | 410 +++++++++++++++++++++
cpp/src/arrow/flight/client.h | 110 ++++++
cpp/src/arrow/flight/flight-benchmark.cc | 193 ++++++++++
cpp/src/arrow/flight/flight-test.cc | 267 ++++++++++++++
cpp/src/arrow/flight/internal.cc | 235 ++++++++++++
cpp/src/arrow/flight/internal.h | 77 ++++
cpp/src/arrow/flight/perf-server.cc | 200 ++++++++++
cpp/src/arrow/flight/perf.proto | 44 +++
cpp/src/arrow/flight/server.cc | 385 +++++++++++++++++++
cpp/src/arrow/flight/server.h | 142 +++++++
cpp/src/arrow/flight/test-server.cc | 141 +++++++
cpp/src/arrow/flight/test-util.h | 157 ++++++++
cpp/src/arrow/flight/types.cc | 75 ++++
cpp/src/arrow/flight/types.h | 210 +++++++++++
cpp/src/arrow/gpu/cuda-test.cc | 2 +-
cpp/src/arrow/ipc/message.cc | 4 +
cpp/src/arrow/ipc/message.h | 4 +
cpp/src/arrow/ipc/metadata-internal.cc | 2 +
cpp/src/arrow/ipc/metadata-internal.h | 2 +-
cpp/src/arrow/ipc/reader.cc | 6 +
cpp/src/arrow/ipc/test-common.h | 41 +--
cpp/src/arrow/ipc/writer.cc | 206 ++++++-----
cpp/src/arrow/ipc/writer.h | 30 ++
cpp/src/arrow/test-util.h | 90 ++++-
cpp/src/arrow/util/memory.h | 4 +-
cpp/src/{parquet => arrow}/util/stopwatch.h | 27 +-
cpp/src/parquet/util/CMakeLists.txt | 1 -
cpp/thirdparty/versions.txt | 4 +-
format/Flight.proto | 299 +++++++++++++++
integration/README.md | 2 +-
python/.gitignore | 1 -
python/README.md | 4 +-
44 files changed, 3502 insertions(+), 269 deletions(-)
diff --git a/ci/travis_before_script_cpp.sh b/ci/travis_before_script_cpp.sh
index e1c231c..54a00f7 100755
--- a/ci/travis_before_script_cpp.sh
+++ b/ci/travis_before_script_cpp.sh
@@ -94,6 +94,10 @@ if [ $ARROW_TRAVIS_COVERAGE == "1" ]; then
CMAKE_COMMON_FLAGS="$CMAKE_COMMON_FLAGS -DARROW_GENERATE_COVERAGE=ON"
fi
+if [ $ARROW_TRAVIS_VERBOSE == "1" ]; then
+ CMAKE_COMMON_FLAGS="$CMAKE_COMMON_FLAGS -DARROW_VERBOSE_THIRDPARTY_BUILD=ON"
+fi
+
if [ $TRAVIS_OS_NAME == "linux" ]; then
cmake $CMAKE_COMMON_FLAGS \
$CMAKE_LINUX_FLAGS \
diff --git a/ci/travis_script_cpp.sh b/ci/travis_script_cpp.sh
index 3a6b2f7..b89e5b7 100755
--- a/ci/travis_script_cpp.sh
+++ b/ci/travis_script_cpp.sh
@@ -23,7 +23,7 @@ source $TRAVIS_BUILD_DIR/ci/travis_env_common.sh
pushd $CPP_BUILD_DIR
-ctest -j2 --output-on-failure -L unittest
+PATH=$ARROW_BUILD_TYPE:$PATH ctest -j2 --output-on-failure -L unittest
popd
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index aa68f92..23ef7d0 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -140,6 +140,10 @@ Pass multiple labels by dividing with semicolons")
"Compile with extra error context (line numbers, code)"
OFF)
+ option(ARROW_FLIGHT
+ "Build the Arrow Flight RPC System (requires GRPC, Protocol Buffers)"
+ OFF)
+
option(ARROW_IPC
"Build the Arrow IPC extensions"
ON)
@@ -240,10 +244,6 @@ Pass multiple labels by dividing with semicolons")
"Build with zstd compression"
ON)
- option(ARROW_WITH_GRPC
- "Build with GRPC"
- OFF)
-
option(ARROW_GENERATE_COVERAGE
"Build with C++ code coverage enabled"
OFF)
@@ -634,14 +634,6 @@ if (ARROW_WITH_ZSTD)
SET(ARROW_STATIC_LINK_LIBS zstd_static ${ARROW_STATIC_LINK_LIBS})
endif()
-if (ARROW_WITH_GRPC)
- SET(ARROW_STATIC_LINK_LIBS
- grpc_grp
- grpc_grpc
- grpc_grpcpp
- ${ARROW_STATIC_LINK_LIBS})
-endif()
-
if (ARROW_ORC)
SET(ARROW_STATIC_LINK_LIBS
${ARROW_STATIC_LINK_LIBS}
@@ -657,11 +649,13 @@ if (ARROW_STATIC_LINK_LIBS)
add_dependencies(arrow_dependencies ${ARROW_STATIC_LINK_LIBS})
endif()
-set(ARROW_BENCHMARK_LINK_LIBS
- arrow_static
- arrow_benchmark_main
- gtest
- ${ARROW_STATIC_LINK_LIBS})
+if (ARROW_BUILD_BENCHMARKS)
+ set(ARROW_BENCHMARK_LINK_LIBS
+ arrow_static
+ arrow_benchmark_main
+ gtest
+ ${ARROW_STATIC_LINK_LIBS})
+endif()
set(ARROW_SHARED_PRIVATE_LINK_LIBS
${ARROW_STATIC_LINK_LIBS}
@@ -683,8 +677,8 @@ endif()
set(ARROW_MIN_TEST_LIBS
arrow_static
${ARROW_STATIC_LINK_LIBS}
- gtest
- gtest_main)
+ gtest_main
+ gtest)
if(NOT MSVC)
set(ARROW_MIN_TEST_LIBS
@@ -743,6 +737,10 @@ endif()
add_subdirectory(src/arrow)
+if(ARROW_FLIGHT)
+ add_subdirectory(src/arrow/flight)
+endif()
+
if(ARROW_PYTHON)
add_subdirectory(src/arrow/python)
endif()
diff --git a/cpp/build-support/lint_cpp_cli.py b/cpp/build-support/lint_cpp_cli.py
index 0c6bad1..4028105 100644
--- a/cpp/build-support/lint_cpp_cli.py
+++ b/cpp/build-support/lint_cpp_cli.py
@@ -72,10 +72,8 @@ EXCLUSIONS = [
'arrow/util/macros.h',
'arrow/python/iterators.h',
'arrow/util/parallel.h',
- 'arrow/io/hdfs-internal.h',
- 'parquet/arrow/test-util.h',
- 'parquet/encoding-internal.h',
- 'parquet/test-util.h'
+ 'test',
+ 'internal'
]
try:
diff --git a/cpp/cmake_modules/FindProtobuf.cmake b/cpp/cmake_modules/FindProtobuf.cmake
index 9591bd1..cb003e3 100644
--- a/cpp/cmake_modules/FindProtobuf.cmake
+++ b/cpp/cmake_modules/FindProtobuf.cmake
@@ -94,6 +94,7 @@ else()
endif()
mark_as_advanced (
+ PROTOBUF_EXECUTABLE
PROTOBUF_INCLUDE_DIR
PROTOBUF_LIBS
PROTOBUF_STATIC_LIB
diff --git a/cpp/cmake_modules/ThirdpartyToolchain.cmake b/cpp/cmake_modules/ThirdpartyToolchain.cmake
index 4bd6470..e25f954 100644
--- a/cpp/cmake_modules/ThirdpartyToolchain.cmake
+++ b/cpp/cmake_modules/ThirdpartyToolchain.cmake
@@ -31,6 +31,7 @@ if (NOT "$ENV{ARROW_BUILD_TOOLCHAIN}" STREQUAL "")
set(GTEST_HOME "$ENV{ARROW_BUILD_TOOLCHAIN}")
endif()
set(JEMALLOC_HOME "$ENV{ARROW_BUILD_TOOLCHAIN}")
+ set(GRPC_HOME "$ENV{ARROW_BUILD_TOOLCHAIN}")
set(LZ4_HOME "$ENV{ARROW_BUILD_TOOLCHAIN}")
# orc disabled as it's not in conda-forge (but in Anaconda with an incompatible ABI)
# set(ORC_HOME "$ENV{ARROW_BUILD_TOOLCHAIN}")
@@ -210,7 +211,7 @@ if (DEFINED ENV{ARROW_PROTOBUF_URL})
set(PROTOBUF_SOURCE_URL "$ENV{ARROW_PROTOBUF_URL}")
else()
string(SUBSTRING ${PROTOBUF_VERSION} 1 -1 STRIPPED_PROTOBUF_VERSION) # strip the leading `v`
- set(PROTOBUF_SOURCE_URL "https://github.com/google/protobuf/releases/download/${PROTOBUF_VERSION}/protobuf-${STRIPPED_PROTOBUF_VERSION}.tar.gz")
+ set(PROTOBUF_SOURCE_URL "https://github.com/protocolbuffers/protobuf/releases/download/${PROTOBUF_VERSION}/protobuf-all-${STRIPPED_PROTOBUF_VERSION}.tar.gz")
endif()
set(RAPIDJSON_SOURCE_MD5 "badd12c511e081fec6c89c43a7027bce")
@@ -1009,54 +1010,11 @@ if (ARROW_WITH_ZSTD)
endif()
endif()
-if (ARROW_WITH_GRPC)
-# ----------------------------------------------------------------------
-# GRPC
- if ("${GRPC_HOME}" STREQUAL "")
- set(GRPC_VENDORED 1)
- set(GRPC_BUILD_DIR "${CMAKE_CURRENT_BINARY_DIR}/grpc_ep-prefix/src/grpc_ep-build")
- set(GRPC_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/grpc_ep/src/grpc_ep-install")
- set(GRPC_HOME "${GRPC_PREFIX}")
- set(GRPC_INCLUDE_DIR "${GRPC_PREFIX}/include")
- set(GRPC_STATIC_LIBRARY_GPR "${GRPC_BUILD_DIR}/${CMAKE_CFG_INTDIR}/${CMAKE_STATIC_LIBRARY_PREFIX}gpr${CMAKE_STATIC_LIBRARY_SUFFIX}")
- set(GRPC_STATIC_LIBRARY_GRPC "${GRPC_BUILD_DIR}/${CMAKE_CFG_INTDIR}/${CMAKE_STATIC_LIBRARY_PREFIX}grpc${CMAKE_STATIC_LIBRARY_SUFFIX}")
- set(GRPC_STATIC_LIBRARY_GRPCPP "${GRPC_BUILD_DIR}/${CMAKE_CFG_INTDIR}/${CMAKE_STATIC_LIBRARY_PREFIX}grpc++${CMAKE_STATIC_LIBRARY_SUFFIX}")
- set(GRPC_CMAKE_ARGS -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
- "-DCMAKE_CXX_FLAGS=${EP_CXX_FLAGS}"
- "-DCMAKE_C_FLAGS=${EP_C_FLAGS}"
- -DCMAKE_INSTALL_PREFIX=${GRPC_PREFIX}
- -DBUILD_SHARED_LIBS=OFF)
- ExternalProject_Add(grpc_ep
- GIT_REPOSITORY "https://github.com/grpc/grpc"
- GIT_TAG ${GRPC_VERSION}
- BUILD_BYPRODUCTS "${GRPC_STATIC_LIBRARY_GPR}" "${GRPC_STATIC_LIBRARY_GRPC}" "${GRPC_STATIC_LIBRARY_GRPCPP}"
- ${GRPC_BUILD_BYPRODUCTS}
- ${EP_LOG_OPTIONS}
- CMAKE_ARGS ${GRPC_CMAKE_ARGS}
- ${EP_LOG_OPTIONS})
- else()
- find_package(gRPC CONFIG REQUIRED)
- set(GRPC_VENDORED 0)
- endif()
-
- include_directories(SYSTEM ${GRPC_INCLUDE_DIR})
- ADD_THIRDPARTY_LIB(grpc_grp
- STATIC_LIB ${GRPC_STATIC_LIBRARY_GPR})
- ADD_THIRDPARTY_LIB(grpc_grpc
- STATIC_LIB ${GRPC_STATIC_LIBRARY_GRPC})
- ADD_THIRDPARTY_LIB(grpc_grpcpp
- STATIC_LIB ${GRPC_STATIC_LIBRARY_GRPCPP})
-
- if (GRPC_VENDORED)
- add_dependencies(grpc_grp grpc_ep)
- add_dependencies(grpc_grpc grpc_ep)
- add_dependencies(grpc_grpcpp grpc_ep)
- endif()
-
-endif()
+# ----------------------------------------------------------------------
+# Protocol Buffers (required for ORC and Flight libraries)
-if (ARROW_ORC)
+if (ARROW_ORC OR ARROW_FLIGHT)
# protobuf
if ("${PROTOBUF_HOME}" STREQUAL "")
set (PROTOBUF_PREFIX "${THIRDPARTY_DIR}/protobuf_ep-install")
@@ -1089,9 +1047,69 @@ if (ARROW_ORC)
if (PROTOBUF_VENDORED)
add_dependencies (protobuf protobuf_ep)
endif ()
+endif()
- # orc
+# ----------------------------------------------------------------------
+# Dependencies for Arrow Flight RPC
+
+if (ARROW_FLIGHT)
+ if ("${GRPC_HOME}" STREQUAL "")
+ set(GRPC_VENDORED 1)
+ set(GRPC_BUILD_DIR "${CMAKE_CURRENT_BINARY_DIR}/grpc_ep-prefix/src/grpc_ep-build")
+ set(GRPC_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/grpc_ep/src/grpc_ep-install")
+ set(GRPC_HOME "${GRPC_PREFIX}")
+ set(GRPC_INCLUDE_DIR "${GRPC_PREFIX}/include")
+ set(GRPC_STATIC_LIBRARY_GPR "${GRPC_BUILD_DIR}/${CMAKE_CFG_INTDIR}/${CMAKE_STATIC_LIBRARY_PREFIX}gpr${CMAKE_STATIC_LIBRARY_SUFFIX}")
+ set(GRPC_STATIC_LIBRARY_GRPC "${GRPC_BUILD_DIR}/${CMAKE_CFG_INTDIR}/${CMAKE_STATIC_LIBRARY_PREFIX}grpc${CMAKE_STATIC_LIBRARY_SUFFIX}")
+ set(GRPC_STATIC_LIBRARY_GRPCPP "${GRPC_BUILD_DIR}/${CMAKE_CFG_INTDIR}/${CMAKE_STATIC_LIBRARY_PREFIX}grpcpp${CMAKE_STATIC_LIBRARY_SUFFIX}")
+ set(GRPC_CMAKE_ARGS -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
+ "-DCMAKE_CXX_FLAGS=${EP_CXX_FLAGS}"
+ "-DCMAKE_C_FLAGS=${EP_C_FLAGS}"
+ -DCMAKE_INSTALL_PREFIX=${GRPC_PREFIX}
+ -DBUILD_SHARED_LIBS=OFF)
+ ExternalProject_Add(grpc_ep
+ GIT_REPOSITORY "https://github.com/grpc/grpc"
+ GIT_TAG ${GRPC_VERSION}
+ BUILD_BYPRODUCTS "${GRPC_STATIC_LIBRARY_GPR}" "${GRPC_STATIC_LIBRARY_GRPC}" "${GRPC_STATIC_LIBRARY_GRPCPP}"
+ ${GRPC_BUILD_BYPRODUCTS}
+ ${EP_LOG_OPTIONS}
+ CMAKE_ARGS ${GRPC_CMAKE_ARGS}
+ ${EP_LOG_OPTIONS})
+ include_directories(SYSTEM ${GRPC_INCLUDE_DIR})
+ else()
+ find_package(gRPC CONFIG REQUIRED)
+ set(GRPC_VENDORED 0)
+ endif()
+
+ get_property(GPR_STATIC_LIB TARGET gRPC::gpr PROPERTY LOCATION)
+ ADD_THIRDPARTY_LIB(grpc_gpr
+ STATIC_LIB ${GPR_STATIC_LIB})
+
+ get_property(GRPC_STATIC_LIB TARGET gRPC::grpc_unsecure PROPERTY LOCATION)
+ ADD_THIRDPARTY_LIB(grpc_grpc
+ STATIC_LIB ${GRPC_STATIC_LIB})
+
+ get_property(GRPCPP_STATIC_LIB TARGET gRPC::grpc++_unsecure PROPERTY LOCATION)
+ ADD_THIRDPARTY_LIB(grpc_grpcpp
+ STATIC_LIB ${GRPCPP_STATIC_LIB})
+
+ get_property(GRPC_ADDRESS_SORTING_STATIC_LIB
+ TARGET gRPC::address_sorting PROPERTY LOCATION)
+ ADD_THIRDPARTY_LIB(grpc_address_sorting
+ STATIC_LIB ${GRPC_ADDRESS_SORTING_STATIC_LIB})
+
+ # XXX(wesm): relying on vendored c-ares provided by gRPC for the time being
+ get_property(CARES_STATIC_LIB TARGET c-ares::cares_static PROPERTY LOCATION)
+ ADD_THIRDPARTY_LIB(cares
+ STATIC_LIB ${CARES_STATIC_LIB})
+endif()
+
+# ----------------------------------------------------------------------
+# Apache ORC
+
+if (ARROW_ORC)
+ # orc
if ("${ORC_HOME}" STREQUAL "")
set(ORC_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/orc_ep-install")
set(ORC_HOME "${ORC_PREFIX}")
@@ -1274,10 +1292,10 @@ endif()
endif() # ARROW_HIVESERVER2
-if (ARROW_USE_GLOG)
# ----------------------------------------------------------------------
# GLOG
+if (ARROW_USE_GLOG)
if("${GLOG_HOME}" STREQUAL "")
set(GLOG_BUILD_DIR "${CMAKE_CURRENT_BINARY_DIR}/glog_ep-prefix/src/glog_ep")
set(GLOG_INCLUDE_DIR "${GLOG_BUILD_DIR}/include")
diff --git a/cpp/src/arrow/buffer.cc b/cpp/src/arrow/buffer.cc
index 2c01041..006fe0f 100644
--- a/cpp/src/arrow/buffer.cc
+++ b/cpp/src/arrow/buffer.cc
@@ -71,6 +71,10 @@ Status Buffer::FromString(const std::string& data, std::shared_ptr<Buffer>* out)
return FromString(data, default_memory_pool(), out);
}
+std::string Buffer::ToString() const {
+ return std::string(reinterpret_cast<const char*>(data_), static_cast<size_t>(size_));
+}
+
void Buffer::CheckMutable() const { DCHECK(is_mutable()) << "buffer not mutable"; }
/// A Buffer whose lifetime is tied to a particular MemoryPool
diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h
index f8c2c83..42b99bf 100644
--- a/cpp/src/arrow/buffer.h
+++ b/cpp/src/arrow/buffer.h
@@ -146,6 +146,11 @@ class ARROW_EXPORT Buffer {
static_cast<int64_t>(sizeof(T) * data.size()));
} // namespace arrow
+ /// \brief Copy buffer contents into a new std::string
+ /// \return std::string
+ /// \note Can throw std::bad_alloc if buffer is large
+ std::string ToString() const;
+
int64_t capacity() const { return capacity_; }
const uint8_t* data() const { return data_; }
diff --git a/cpp/src/arrow/dbi/hiveserver2/thrift/CMakeLists.txt b/cpp/src/arrow/dbi/hiveserver2/thrift/CMakeLists.txt
index c59fd5a..be689f9 100644
--- a/cpp/src/arrow/dbi/hiveserver2/thrift/CMakeLists.txt
+++ b/cpp/src/arrow/dbi/hiveserver2/thrift/CMakeLists.txt
@@ -44,10 +44,11 @@ function(HS2_THRIFT_GEN VAR)
# All the output files we can determine based on filename.
# - Does not include .skeleton.cpp files
# - Does not include java output files
- set(OUTPUT_BE_FILE "${GEN_DIR}/${FIL_WE}_types.cpp")
- set(OUTPUT_BE_FILE ${OUTPUT_BE_FILE} " ${GEN_DIR}/${FIL_WE}_types.h")
- set(OUTPUT_BE_FILE ${OUTPUT_BE_FILE} " ${GEN_DIR}/${FIL_WE}_constants.cpp")
- set(OUTPUT_BE_FILE ${OUTPUT_BE_FILE} " ${GEN_DIR}/${FIL_WE}_constants.h")
+ set(OUTPUT_BE_FILE
+ "${GEN_DIR}/${FIL_WE}_types.cpp"
+ "${GEN_DIR}/${FIL_WE}_types.h"
+ "${GEN_DIR}/${FIL_WE}_constants.cpp"
+ "${GEN_DIR}/${FIL_WE}_constants.h")
list(APPEND ${VAR} ${OUTPUT_BE_FILE})
# BeeswaxService thrift generation
diff --git a/cpp/src/arrow/flight/CMakeLists.txt b/cpp/src/arrow/flight/CMakeLists.txt
new file mode 100644
index 0000000..e830be3
--- /dev/null
+++ b/cpp/src/arrow/flight/CMakeLists.txt
@@ -0,0 +1,139 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_custom_target(arrow_flight)
+
+# Header files
+install(FILES
+ api.h
+ client.h
+ server.h
+ types.h
+ DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/flight")
+
+SET(ARROW_FLIGHT_STATIC_LINK_LIBS
+ grpc_grpcpp
+ grpc_grpc
+ grpc_gpr
+ grpc_address_sorting
+ cares)
+
+# TODO(wesm): Protobuf shared vs static linking
+
+set(FLIGHT_PROTO_PATH "${CMAKE_SOURCE_DIR}/../format")
+set(FLIGHT_PROTO ${CMAKE_SOURCE_DIR}/../format/Flight.proto)
+
+set(FLIGHT_GENERATED_PROTO_FILES
+ "${CMAKE_CURRENT_BINARY_DIR}/Flight.pb.cc"
+ "${CMAKE_CURRENT_BINARY_DIR}/Flight.pb.h"
+ "${CMAKE_CURRENT_BINARY_DIR}/Flight.grpc.pb.cc"
+ "${CMAKE_CURRENT_BINARY_DIR}/Flight.grpc.pb.h")
+
+if(PROTOBUF_VENDORED)
+ set(PROTO_DEPENDS ${FLIGHT_PROTO} protobuf)
+else()
+ set(PROTO_DEPENDS ${FLIGHT_PROTO})
+endif()
+
+# Get location of grpc_cpp_plugin so we can pass it to protoc
+get_property(GRPC_CPP_PLUGIN TARGET gRPC::grpc_cpp_plugin PROPERTY LOCATION)
+
+add_custom_command(
+ OUTPUT ${FLIGHT_GENERATED_PROTO_FILES}
+ COMMAND ${PROTOBUF_EXECUTABLE}
+ "-I${FLIGHT_PROTO_PATH}"
+ "--cpp_out=${CMAKE_CURRENT_BINARY_DIR}"
+ "${FLIGHT_PROTO}"
+ DEPENDS ${PROTO_DEPENDS}
+ ARGS
+ COMMAND ${PROTOBUF_EXECUTABLE}
+ "-I${FLIGHT_PROTO_PATH}"
+ "--grpc_out=${CMAKE_CURRENT_BINARY_DIR}"
+ "--plugin=protoc-gen-grpc=${GRPC_CPP_PLUGIN}"
+ "${FLIGHT_PROTO}")
+
+set_source_files_properties(${FLIGHT_GENERATED_PROTO_FILES}
+ PROPERTIES GENERATED TRUE)
+
+set(ARROW_FLIGHT_SRCS
+ client.cc
+ Flight.pb.cc
+ Flight.grpc.pb.cc
+ internal.cc
+ server.cc
+ types.cc
+)
+
+ADD_ARROW_LIB(arrow_flight
+ SOURCES ${ARROW_FLIGHT_SRCS}
+ DEPENDENCIES arrow_dependencies
+ SHARED_LINK_LIBS arrow_shared ${ARROW_FLIGHT_STATIC_LINK_LIBS}
+ STATIC_LINK_LIBS arrow_static ${ARROW_FLIGHT_STATIC_LINK_LIBS})
+
+ADD_ARROW_TEST(flight-test
+ EXTRA_LINK_LIBS arrow_flight_static ${ARROW_FLIGHT_STATIC_LINK_LIBS}
+ LABELS "arrow_flight")
+
+# Build test server for unit tests or benchmarks
+if (ARROW_BUILD_TESTS OR ARROW_BUILD_BENCHMARKS)
+ add_executable(flight-test-server test-server.cc)
+ target_link_libraries(flight-test-server
+ arrow_flight_static
+ ${ARROW_FLIGHT_STATIC_LINK_LIBS}
+ gflags
+ gtest)
+
+ # This is needed for the unit tests
+ if (ARROW_BUILD_TESTS)
+ add_dependencies(flight-test flight-test-server)
+ endif()
+endif()
+
+if (ARROW_BUILD_BENCHMARKS)
+ # Perf server for benchmarks
+ set(PERF_PROTO_GENERATED_FILES
+ "${CMAKE_CURRENT_BINARY_DIR}/perf.pb.cc"
+ "${CMAKE_CURRENT_BINARY_DIR}/perf.pb.h")
+
+ add_custom_command(
+ OUTPUT ${PERF_PROTO_GENERATED_FILES}
+ COMMAND ${PROTOBUF_EXECUTABLE}
+ "-I${CMAKE_CURRENT_SOURCE_DIR}"
+ "--cpp_out=${CMAKE_CURRENT_BINARY_DIR}"
+ "perf.proto"
+ DEPENDS ${PROTO_DEPENDS})
+
+ add_executable(flight-perf-server
+ perf-server.cc
+ perf.pb.cc)
+ target_link_libraries(flight-perf-server
+ arrow_flight_static
+ ${ARROW_FLIGHT_STATIC_LINK_LIBS}
+ gflags
+ gtest)
+
+ add_executable(flight-benchmark
+ flight-benchmark.cc
+ perf.pb.cc)
+ target_link_libraries(flight-benchmark
+ arrow_flight_static
+ ${ARROW_FLIGHT_STATIC_LINK_LIBS}
+ gflags
+ gtest)
+
+ add_dependencies(flight-benchmark flight-perf-server)
+endif(ARROW_BUILD_BENCHMARKS)
diff --git a/cpp/src/arrow/flight/README.md b/cpp/src/arrow/flight/README.md
new file mode 100644
index 0000000..5156973
--- /dev/null
+++ b/cpp/src/arrow/flight/README.md
@@ -0,0 +1,36 @@
+<!---
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Arrow Flight RPC System for C++
+
+## Development notes
+
+The gRPC protobuf plugin requires that libprotoc is in your
+`LD_LIBRARY_PATH`. Until we figure out a general solution, you may need to do:
+
+```
+export LD_LIBRARY_PATH=$PROTOBUF_HOME/lib:$LD_LIBRARY_PATH
+```
+
+Currently, to run the unit tests, the directory of executables must either be
+your current working directory or you need to add it to your path, e.g.
+
+```
+PATH=debug:$PATH debug/flight-test
+```
\ No newline at end of file
diff --git a/cpp/src/parquet/util/stopwatch.h b/cpp/src/arrow/flight/api.h
similarity index 57%
copy from cpp/src/parquet/util/stopwatch.h
copy to cpp/src/arrow/flight/api.h
index 68cf792..98b0840 100644
--- a/cpp/src/parquet/util/stopwatch.h
+++ b/cpp/src/arrow/flight/api.h
@@ -15,38 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-#ifndef PARQUET_UTIL_STOPWATCH_H
-#define PARQUET_UTIL_STOPWATCH_H
+#pragma once
-#include <stdio.h>
-#ifndef _MSC_VER
-#include <sys/time.h>
-#endif
-
-#include <ctime>
-#include <iostream>
-
-namespace parquet {
-
-class StopWatch {
- public:
- StopWatch() {}
-
- void Start() { gettimeofday(&start_time, 0); }
-
- // Returns time in nanoseconds.
- uint64_t Stop() {
- struct timeval t_time;
- gettimeofday(&t_time, 0);
-
- return (1000L * 1000L * 1000L * (t_time.tv_sec - start_time.tv_sec) +
- (t_time.tv_usec - start_time.tv_usec));
- }
-
- private:
- struct timeval start_time;
-};
-
-} // namespace parquet
-
-#endif
+#include "arrow/flight/client.h"
+#include "arrow/flight/server.h"
+#include "arrow/flight/types.h"
diff --git a/cpp/src/arrow/flight/client.cc b/cpp/src/arrow/flight/client.cc
new file mode 100644
index 0000000..94c4928
--- /dev/null
+++ b/cpp/src/arrow/flight/client.cc
@@ -0,0 +1,410 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/flight/client.h"
+
+#include <memory>
+#include <sstream>
+#include <string>
+#include <utility>
+
+#include "google/protobuf/io/coded_stream.h"
+#include "google/protobuf/wire_format_lite.h"
+#include "grpc/byte_buffer_reader.h"
+#include "grpcpp/grpcpp.h"
+
+#include "arrow/ipc/reader.h"
+#include "arrow/record_batch.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+#include "arrow/flight/Flight.grpc.pb.h"
+#include "arrow/flight/Flight.pb.h"
+#include "arrow/flight/internal.h"
+
+namespace pb = arrow::flight::protocol;
+
+namespace arrow {
+namespace flight {
+
+/// Internal, not user-visible type used for memory-efficient reads from gRPC
+/// stream
+struct FlightData {
+ /// Used only for puts, may be null
+ std::unique_ptr<FlightDescriptor> descriptor;
+
+ /// Non-length-prefixed Message header as described in format/Message.fbs
+ std::shared_ptr<Buffer> metadata;
+
+ /// Message body
+ std::shared_ptr<Buffer> body;
+};
+
+} // namespace flight
+} // namespace arrow
+
+namespace grpc {
+
+// Customizations to gRPC for more efficient deserialization of FlightData
+
+using google::protobuf::internal::WireFormatLite;
+using google::protobuf::io::CodedInputStream;
+
+using arrow::flight::FlightData;
+
+bool ReadBytesZeroCopy(const std::shared_ptr<arrow::Buffer>& source_data,
+ CodedInputStream* input, std::shared_ptr<arrow::Buffer>* out) {
+ uint32_t length;
+ if (!input->ReadVarint32(&length)) {
+ return false;
+ }
+ *out = arrow::SliceBuffer(source_data, input->CurrentPosition(),
+ static_cast<int64_t>(length));
+ return input->Skip(static_cast<int>(length));
+}
+
+// Internal wrapper for gRPC ByteBuffer so its memory can be exposed to Arrow
+// consumers with zero-copy
+class GrpcBuffer : public arrow::MutableBuffer {
+ public:
+ GrpcBuffer(grpc_slice slice, bool incref)
+ : MutableBuffer(GRPC_SLICE_START_PTR(slice),
+ static_cast<int64_t>(GRPC_SLICE_LENGTH(slice))),
+ slice_(incref ? grpc_slice_ref(slice) : slice) {}
+
+ ~GrpcBuffer() override {
+ // Decref slice
+ grpc_slice_unref(slice_);
+ }
+
+ static arrow::Status Wrap(ByteBuffer* cpp_buf, std::shared_ptr<arrow::Buffer>* out) {
+ // These types are guaranteed by static assertions in gRPC to have the same
+ // in-memory representation
+
+ auto buffer = *reinterpret_cast<grpc_byte_buffer**>(cpp_buf);
+
+ // This part below is based on the Flatbuffers gRPC SerializationTraits in
+ // flatbuffers/grpc.h
+
+ // Check if this is a single uncompressed slice.
+ if ((buffer->type == GRPC_BB_RAW) &&
+ (buffer->data.raw.compression == GRPC_COMPRESS_NONE) &&
+ (buffer->data.raw.slice_buffer.count == 1)) {
+ // If it is, then we can reference the `grpc_slice` directly.
+ grpc_slice slice = buffer->data.raw.slice_buffer.slices[0];
+
+ // Increment reference count so this memory remains valid
+ *out = std::make_shared<GrpcBuffer>(slice, true);
+ } else {
+ // Otherwise, we need to use `grpc_byte_buffer_reader_readall` to read
+ // `buffer` into a single contiguous `grpc_slice`. The gRPC reader gives
+ // us back a new slice with the refcount already incremented.
+ grpc_byte_buffer_reader reader;
+ if (!grpc_byte_buffer_reader_init(&reader, buffer)) {
+ return arrow::Status::IOError("Internal gRPC error reading from ByteBuffer");
+ }
+ grpc_slice slice = grpc_byte_buffer_reader_readall(&reader);
+ grpc_byte_buffer_reader_destroy(&reader);
+
+ // Steal the slice reference
+ *out = std::make_shared<GrpcBuffer>(slice, false);
+ }
+
+ return arrow::Status::OK();
+ }
+
+ private:
+ grpc_slice slice_;
+};
+
+// Read internal::FlightData from grpc::ByteBuffer containing FlightData
+// protobuf without copying
+template <>
+class SerializationTraits<FlightData> {
+ public:
+ static Status Serialize(const FlightData& msg, ByteBuffer** buffer, bool* own_buffer) {
+ return Status(StatusCode::UNIMPLEMENTED,
+ "internal::FlightData serialization not implemented");
+ }
+
+ static Status Deserialize(ByteBuffer* buffer, FlightData* out) {
+ if (!buffer) {
+ return Status(StatusCode::INTERNAL, "No payload");
+ }
+
+ std::shared_ptr<arrow::Buffer> wrapped_buffer;
+ GRPC_RETURN_NOT_OK(GrpcBuffer::Wrap(buffer, &wrapped_buffer));
+
+ auto buffer_length = static_cast<int>(wrapped_buffer->size());
+ CodedInputStream pb_stream(wrapped_buffer->data(), buffer_length);
+
+ // TODO(wesm): The 2-parameter version of this function is deprecated
+ pb_stream.SetTotalBytesLimit(buffer_length, -1 /* no threshold */);
+
+ // This is the bytes remaining when using CodedInputStream like this
+ while (pb_stream.BytesUntilTotalBytesLimit()) {
+ const uint32_t tag = pb_stream.ReadTag();
+ const int field_number = WireFormatLite::GetTagFieldNumber(tag);
+ switch (field_number) {
+ case pb::FlightData::kFlightDescriptorFieldNumber: {
+ pb::FlightDescriptor pb_descriptor;
+ if (!pb_descriptor.ParseFromCodedStream(&pb_stream)) {
+ return Status(StatusCode::INTERNAL, "Unable to parse FlightDescriptor");
+ }
+ } break;
+ case pb::FlightData::kDataHeaderFieldNumber: {
+ if (!ReadBytesZeroCopy(wrapped_buffer, &pb_stream, &out->metadata)) {
+ return Status(StatusCode::INTERNAL, "Unable to read FlightData metadata");
+ }
+ } break;
+ case pb::FlightData::kDataBodyFieldNumber: {
+ if (!ReadBytesZeroCopy(wrapped_buffer, &pb_stream, &out->body)) {
+ return Status(StatusCode::INTERNAL, "Unable to read FlightData body");
+ }
+ } break;
+ default:
+ DCHECK(false) << "cannot happen";
+ }
+ }
+ buffer->Clear();
+
+ // TODO(wesm): Where and when should we verify that the FlightData is not
+ // malformed or missing components?
+
+ return Status::OK;
+ }
+};
+
+} // namespace grpc
+
+namespace arrow {
+namespace flight {
+
+struct ClientRpc {
+ grpc::ClientContext context;
+
+ ClientRpc() {
+ /// XXX workaround until we have a handshake in Connect
+ context.set_wait_for_ready(true);
+ }
+};
+
+class FlightStreamReader : public RecordBatchReader {
+ public:
+ FlightStreamReader(std::unique_ptr<ClientRpc> rpc,
+ const std::shared_ptr<Schema>& schema,
+ std::unique_ptr<grpc::ClientReader<pb::FlightData>> stream)
+ : rpc_(std::move(rpc)),
+ stream_finished_(false),
+ schema_(schema),
+ stream_(std::move(stream)) {}
+
+ std::shared_ptr<Schema> schema() const override { return schema_; }
+
+ Status ReadNext(std::shared_ptr<RecordBatch>* out) override {
+ FlightData data;
+
+ if (stream_finished_) {
+ *out = nullptr;
+ return Status::OK();
+ }
+
+ // For customizing read path for better memory/serialization efficiency
+ auto custom_reader = reinterpret_cast<grpc::ClientReader<FlightData>*>(stream_.get());
+
+ if (custom_reader->Read(&data)) {
+ std::unique_ptr<ipc::Message> message;
+
+ // Validate IPC message
+ RETURN_NOT_OK(ipc::Message::Open(data.metadata, data.body, &message));
+ return ipc::ReadRecordBatch(*message, schema_, out);
+ } else {
+ // Stream is completed
+ stream_finished_ = true;
+ *out = nullptr;
+ return internal::FromGrpcStatus(stream_->Finish());
+ }
+ }
+
+ private:
+ // The RPC context lifetime must be coupled to the ClientReader
+ std::unique_ptr<ClientRpc> rpc_;
+
+ bool stream_finished_;
+ std::shared_ptr<Schema> schema_;
+ std::unique_ptr<grpc::ClientReader<pb::FlightData>> stream_;
+};
+
+class FlightClient::FlightClientImpl {
+ public:
+ Status Connect(const std::string& host, int port) {
+ // TODO(wesm): Support other kinds of GRPC ChannelCredentials
+ std::stringstream ss;
+ ss << host << ":" << port;
+ std::string uri = ss.str();
+
+ stub_ = pb::FlightService::NewStub(
+ grpc::CreateChannel(ss.str(), grpc::InsecureChannelCredentials()));
+ return Status::OK();
+ }
+
+ Status ListFlights(const Criteria& criteria, std::unique_ptr<FlightListing>* listing) {
+ // TODO(wesm): populate criteria
+ pb::Criteria pb_criteria;
+
+ ClientRpc rpc;
+ std::unique_ptr<grpc::ClientReader<pb::FlightGetInfo>> stream(
+ stub_->ListFlights(&rpc.context, pb_criteria));
+
+ std::vector<FlightInfo> flights;
+
+ pb::FlightGetInfo pb_info;
+ FlightInfo::Data info_data;
+ while (stream->Read(&pb_info)) {
+ RETURN_NOT_OK(internal::FromProto(pb_info, &info_data));
+ flights.emplace_back(FlightInfo(std::move(info_data)));
+ }
+
+ listing->reset(new SimpleFlightListing(flights));
+ return internal::FromGrpcStatus(stream->Finish());
+ }
+
+ Status DoAction(const Action& action, std::unique_ptr<ResultStream>* results) {
+ pb::Action pb_action;
+ RETURN_NOT_OK(internal::ToProto(action, &pb_action));
+
+ ClientRpc rpc;
+ std::unique_ptr<grpc::ClientReader<pb::Result>> stream(
+ stub_->DoAction(&rpc.context, pb_action));
+
+ pb::Result pb_result;
+
+ std::vector<Result> materialized_results;
+ while (stream->Read(&pb_result)) {
+ Result result;
+ RETURN_NOT_OK(internal::FromProto(pb_result, &result));
+ materialized_results.emplace_back(std::move(result));
+ }
+
+ *results = std::unique_ptr<ResultStream>(
+ new SimpleResultStream(std::move(materialized_results)));
+ return internal::FromGrpcStatus(stream->Finish());
+ }
+
+ Status ListActions(std::vector<ActionType>* types) {
+ pb::Empty empty;
+
+ ClientRpc rpc;
+ std::unique_ptr<grpc::ClientReader<pb::ActionType>> stream(
+ stub_->ListActions(&rpc.context, empty));
+
+ pb::ActionType pb_type;
+ ActionType type;
+ while (stream->Read(&pb_type)) {
+ RETURN_NOT_OK(internal::FromProto(pb_type, &type));
+ types->emplace_back(std::move(type));
+ }
+ return internal::FromGrpcStatus(stream->Finish());
+ }
+
+ Status GetFlightInfo(const FlightDescriptor& descriptor,
+ std::unique_ptr<FlightInfo>* info) {
+ pb::FlightDescriptor pb_descriptor;
+ pb::FlightGetInfo pb_response;
+
+ RETURN_NOT_OK(internal::ToProto(descriptor, &pb_descriptor));
+
+ ClientRpc rpc;
+ Status s = internal::FromGrpcStatus(
+ stub_->GetFlightInfo(&rpc.context, pb_descriptor, &pb_response));
+ RETURN_NOT_OK(s);
+
+ FlightInfo::Data info_data;
+ RETURN_NOT_OK(internal::FromProto(pb_response, &info_data));
+ info->reset(new FlightInfo(std::move(info_data)));
+ return Status::OK();
+ }
+
+ Status DoGet(const Ticket& ticket, const std::shared_ptr<Schema>& schema,
+ std::unique_ptr<RecordBatchReader>* out) {
+ pb::Ticket pb_ticket;
+ internal::ToProto(ticket, &pb_ticket);
+
+ // ClientRpc rpc;
+ std::unique_ptr<ClientRpc> rpc(new ClientRpc);
+ std::unique_ptr<grpc::ClientReader<pb::FlightData>> stream(
+ stub_->DoGet(&rpc->context, pb_ticket));
+
+ *out = std::unique_ptr<RecordBatchReader>(
+ new FlightStreamReader(std::move(rpc), schema, std::move(stream)));
+ return Status::OK();
+ }
+
+ Status DoPut(std::unique_ptr<FlightPutWriter>* stream) {
+ return Status::NotImplemented("DoPut");
+ }
+
+ private:
+ std::unique_ptr<pb::FlightService::Stub> stub_;
+};
+
+FlightClient::FlightClient() { impl_.reset(new FlightClientImpl); }
+
+FlightClient::~FlightClient() {}
+
+Status FlightClient::Connect(const std::string& host, int port,
+ std::unique_ptr<FlightClient>* client) {
+ client->reset(new FlightClient);
+ return (*client)->impl_->Connect(host, port);
+}
+
+Status FlightClient::DoAction(const Action& action,
+ std::unique_ptr<ResultStream>* results) {
+ return impl_->DoAction(action, results);
+}
+
+Status FlightClient::ListActions(std::vector<ActionType>* actions) {
+ return impl_->ListActions(actions);
+}
+
+Status FlightClient::GetFlightInfo(const FlightDescriptor& descriptor,
+ std::unique_ptr<FlightInfo>* info) {
+ return impl_->GetFlightInfo(descriptor, info);
+}
+
+Status FlightClient::ListFlights(std::unique_ptr<FlightListing>* listing) {
+ return ListFlights({}, listing);
+}
+
+Status FlightClient::ListFlights(const Criteria& criteria,
+ std::unique_ptr<FlightListing>* listing) {
+ return impl_->ListFlights(criteria, listing);
+}
+
+Status FlightClient::DoGet(const Ticket& ticket, const std::shared_ptr<Schema>& schema,
+ std::unique_ptr<RecordBatchReader>* stream) {
+ return impl_->DoGet(ticket, schema, stream);
+}
+
+Status FlightClient::DoPut(const Schema& schema,
+ std::unique_ptr<FlightPutWriter>* stream) {
+ return Status::NotImplemented("DoPut");
+}
+
+} // namespace flight
+} // namespace arrow
diff --git a/cpp/src/arrow/flight/client.h b/cpp/src/arrow/flight/client.h
new file mode 100644
index 0000000..be3d86a
--- /dev/null
+++ b/cpp/src/arrow/flight/client.h
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// \brief Implementation of Flight RPC client using gRPC. API should be
+// considered experimental for now
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/status.h"
+#include "arrow/util/visibility.h"
+
+#include "arrow/flight/types.h"
+
+namespace arrow {
+
+class RecordBatch;
+class RecordBatchReader;
+class Schema;
+
+namespace flight {
+
+/// \brief Client class for Arrow Flight RPC services (gRPC-based).
+/// API experimental for now
+class ARROW_EXPORT FlightClient {
+ public:
+ ~FlightClient();
+
+ /// \brief Connect to an unauthenticated flight service
+ /// \param[in] host the hostname or IP address
+ /// \param[in] port the port on the host
+ /// \param[out] client the created FlightClient
+ /// \return Status OK status may not indicate that the connection was
+ /// successful
+ static Status Connect(const std::string& host, int port,
+ std::unique_ptr<FlightClient>* client);
+
+ /// \brief Perform the indicated action, returning an iterator to the stream
+ /// of results, if any
+ /// \param[in] action the action to be performed
+ /// \param[out] results an iterator object for reading the returned results
+ /// \return Status
+ Status DoAction(const Action& action, std::unique_ptr<ResultStream>* results);
+
+ /// \brief Retrieve a list of available Action types
+ /// \param[out] actions the available actions
+ /// \return Status
+ Status ListActions(std::vector<ActionType>* actions);
+
+ /// \brief Request access plan for a single flight, which may be an existing
+ /// dataset or a command to be executed
+ /// \param[in] descriptor the dataset request, whether a named dataset or
+ /// command
+ /// \param[out] info the FlightInfo describing where to access the dataset
+ /// \return Status
+ Status GetFlightInfo(const FlightDescriptor& descriptor,
+ std::unique_ptr<FlightInfo>* info);
+
+ /// \brief List all available flights known to the server
+ /// \param[out] listing an iterator that returns a FlightInfo for each flight
+ /// \return Status
+ Status ListFlights(std::unique_ptr<FlightListing>* listing);
+
+ /// \brief List available flights given indicated filter criteria
+ /// \param[in] criteria the filter criteria (opaque)
+ /// \param[out] listing an iterator that returns a FlightInfo for each flight
+ /// \return Status
+ Status ListFlights(const Criteria& criteria, std::unique_ptr<FlightListing>* listing);
+
+ /// \brief Given a flight ticket and schema, request to be sent the
+ /// stream. Returns record batch stream reader
+ /// \param[in] ticket
+ /// \param[in] schema the arrow::Schema for the stream as computed by
+ /// GetFlightInfo
+ /// \param[out] stream the returned RecordBatchReader
+ /// \return Status
+ Status DoGet(const Ticket& ticket, const std::shared_ptr<Schema>& schema,
+ std::unique_ptr<RecordBatchReader>* stream);
+
+ /// \brief Initiate DoPut RPC, returns FlightPutWriter interface to
+ /// write. Not yet implemented
+ /// \param[out] stream the created stream to write record batches to
+ /// \return Status
+ Status DoPut(const Schema& schema, std::unique_ptr<FlightPutWriter>* stream);
+
+ private:
+ FlightClient();
+ class FlightClientImpl;
+ std::unique_ptr<FlightClientImpl> impl_;
+};
+
+} // namespace flight
+} // namespace arrow
diff --git a/cpp/src/arrow/flight/flight-benchmark.cc b/cpp/src/arrow/flight/flight-benchmark.cc
new file mode 100644
index 0000000..ac50ab0
--- /dev/null
+++ b/cpp/src/arrow/flight/flight-benchmark.cc
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <mutex>
+#include <sstream>
+#include <string>
+#include <vector>
+
+#include <gflags/gflags.h>
+
+#include "arrow/api.h"
+#include "arrow/io/memory.h"
+#include "arrow/ipc/api.h"
+#include "arrow/record_batch.h"
+#include "arrow/test-util.h"
+#include "arrow/util/stopwatch.h"
+#include "arrow/util/thread-pool.h"
+
+#include "arrow/flight/api.h"
+#include "arrow/flight/perf.pb.h"
+#include "arrow/flight/test-util.h"
+
+DEFINE_int32(num_servers, 1, "Number of performance servers to run");
+DEFINE_int32(num_streams, 4, "Number of streams for each server");
+DEFINE_int32(num_threads, 4, "Number of concurrent gets");
+DEFINE_int32(records_per_stream, 10000000, "Total records per stream");
+DEFINE_int32(records_per_batch, 4096, "Total records per batch within stream");
+
+namespace perf = arrow::flight::perf;
+
+using ThreadPool = ::arrow::internal::ThreadPool;
+
+namespace arrow {
+namespace flight {
+
+struct PerformanceStats {
+ PerformanceStats() : total_records(0), total_bytes(0) {}
+ std::mutex mutex;
+ int64_t total_records;
+ int64_t total_bytes;
+
+ void Update(const int64_t total_records, const int64_t total_bytes) {
+ std::lock_guard<std::mutex> lock(this->mutex);
+ this->total_records += total_records;
+ this->total_bytes += total_bytes;
+ }
+};
+
+Status RunPerformanceTest(const int port) {
+ // TODO(wesm): Multiple servers
+ // std::vector<std::unique_ptr<TestServer>> servers;
+
+ // schema not needed
+ perf::Perf perf;
+ perf.set_stream_count(FLAGS_num_streams);
+ perf.set_records_per_stream(FLAGS_records_per_stream);
+ perf.set_records_per_batch(FLAGS_records_per_batch);
+
+ // Construct client and plan the query
+ std::unique_ptr<FlightClient> client;
+ RETURN_NOT_OK(FlightClient::Connect("localhost", port, &client));
+
+ FlightDescriptor descriptor;
+ descriptor.type = FlightDescriptor::CMD;
+ perf.SerializeToString(&descriptor.cmd);
+
+ std::unique_ptr<FlightInfo> plan;
+ RETURN_NOT_OK(client->GetFlightInfo(descriptor, &plan));
+
+ // Read the streams in parallel
+ std::shared_ptr<Schema> schema;
+ RETURN_NOT_OK(plan->GetSchema(&schema));
+
+ PerformanceStats stats;
+ auto ConsumeStream = [&stats, &schema, &port](const FlightEndpoint& endpoint) {
+ // TODO(wesm): Use location from endpoint, same host/port for now
+ std::unique_ptr<FlightClient> client;
+ RETURN_NOT_OK(FlightClient::Connect("localhost", port, &client));
+
+ perf::Token token;
+ token.ParseFromString(endpoint.ticket.ticket);
+
+ std::unique_ptr<RecordBatchReader> reader;
+ RETURN_NOT_OK(client->DoGet(endpoint.ticket, schema, &reader));
+
+ std::shared_ptr<RecordBatch> batch;
+
+ // This is hard-coded for right now, 4 columns each with int64
+ const int bytes_per_record = 32;
+
+ // This must also be set in perf-server.c
+ const bool verify = false;
+
+ int64_t num_bytes = 0;
+ int64_t num_records = 0;
+ while (true) {
+ RETURN_NOT_OK(reader->ReadNext(&batch));
+ if (!batch) {
+ break;
+ }
+
+ if (verify) {
+ auto values =
+ reinterpret_cast<const int64_t*>(batch->column_data(0)->buffers[1]->data());
+ const int64_t start = token.start() + num_records;
+ for (int64_t i = 0; i < batch->num_rows(); ++i) {
+ if (values[i] != start + i) {
+ return Status::Invalid("verification failure");
+ }
+ }
+ }
+
+ num_records += batch->num_rows();
+
+ // Hard-coded
+ num_bytes += batch->num_rows() * bytes_per_record;
+ }
+ stats.Update(num_records, num_bytes);
+ return Status::OK();
+ };
+
+ StopWatch timer;
+ timer.Start();
+
+ // XXX(wesm): Serial version for debugging
+ // for (const auto& endpoint : plan->endpoints()) {
+ // RETURN_NOT_OK(ConsumeStream(endpoint));
+ // }
+
+ std::shared_ptr<ThreadPool> pool;
+ RETURN_NOT_OK(ThreadPool::Make(FLAGS_num_threads, &pool));
+ std::vector<std::future<Status>> tasks;
+ for (const auto& endpoint : plan->endpoints()) {
+ tasks.emplace_back(pool->Submit(ConsumeStream, endpoint));
+ }
+
+ // Wait for tasks to finish
+ for (auto&& task : tasks) {
+ RETURN_NOT_OK(task.get());
+ }
+
+ // Elapsed time in seconds
+ uint64_t elapsed_nanos = timer.Stop();
+ double time_elapsed = elapsed_nanos / static_cast<double>(1000000000LL);
+
+ constexpr double kMegabyte = static_cast<double>(1 << 20);
+
+ // Check that number of rows read is as expected
+ if (stats.total_records != static_cast<int64_t>(plan->total_records())) {
+ return Status::Invalid("Did not consume expected number of records");
+ }
+
+ std::cout << "Bytes read: " << stats.total_bytes << std::endl;
+ std::cout << "Nanos: " << elapsed_nanos << std::endl;
+ std::cout << "Speed: " << (stats.total_bytes / time_elapsed / kMegabyte) << " MB/s"
+ << std::endl;
+ return Status::OK();
+}
+
+} // namespace flight
+} // namespace arrow
+
+int main(int argc, char** argv) {
+ gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+ const int port = 31337;
+ arrow::flight::TestServer server("flight-perf-server", port);
+ server.Start();
+
+ arrow::Status s = arrow::flight::RunPerformanceTest(port);
+ server.Stop();
+
+ if (!s.ok()) {
+ std::cerr << "Failed with error: << " << s.ToString() << std::endl;
+ }
+
+ return 0;
+}
diff --git a/cpp/src/arrow/flight/flight-test.cc b/cpp/src/arrow/flight/flight-test.cc
new file mode 100644
index 0000000..2d1b2f8
--- /dev/null
+++ b/cpp/src/arrow/flight/flight-test.cc
@@ -0,0 +1,267 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef _WIN32
+#include <sys/stat.h>
+#include <sys/wait.h>
+#include <unistd.h>
+#endif
+
+#include <chrono>
+#include <cstdint>
+#include <cstdio>
+#include <cstring>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+#include <boost/process.hpp>
+
+#include "arrow/ipc/test-common.h"
+#include "arrow/status.h"
+#include "arrow/test-util.h"
+
+#include "arrow/flight/api.h"
+
+#ifdef GRPCPP_GRPCPP_H
+#error "gRPC headers should not be in public API"
+#endif
+
+#include "arrow/flight/Flight.pb.h"
+#include "arrow/flight/internal.h"
+#include "arrow/flight/test-util.h"
+
+namespace pb = arrow::flight::protocol;
+
+namespace arrow {
+namespace flight {
+
+TEST(TestFlight, StartStopTestServer) {
+ TestServer server("flight-test-server", 92385);
+ server.Start();
+ ASSERT_TRUE(server.IsRunning());
+
+ sleep_for(0.2);
+
+ ASSERT_TRUE(server.IsRunning());
+ int exit_code = server.Stop();
+ ASSERT_EQ(0, exit_code);
+}
+
+// ----------------------------------------------------------------------
+// Client tests
+
+class TestFlightClient : public ::testing::Test {
+ public:
+ // Uncomment these when you want to run the server separately for
+ // debugging/valgrind/gdb
+
+ // void SetUp() {
+ // port_ = 92358;
+ // ASSERT_OK(ConnectClient());
+ // }
+ // void TearDown() {}
+
+ void SetUp() {
+ port_ = 92358;
+ server_.reset(new TestServer("flight-test-server", port_));
+ server_->Start();
+ ASSERT_OK(ConnectClient());
+ }
+
+ void TearDown() { server_->Stop(); }
+
+ Status ConnectClient() { return FlightClient::Connect("localhost", port_, &client_); }
+
+ protected:
+ int port_;
+ std::unique_ptr<FlightClient> client_;
+ std::unique_ptr<TestServer> server_;
+};
+
+// The server implementation is in test-server.cc; to make changes to the
+// expected results, make edits there
+void AssertEqual(const FlightDescriptor& expected, const FlightDescriptor& actual) {}
+
+void AssertEqual(const Ticket& expected, const Ticket& actual) {
+ ASSERT_EQ(expected.ticket, actual.ticket);
+}
+
+void AssertEqual(const Location& expected, const Location& actual) {
+ ASSERT_EQ(expected.host, actual.host);
+ ASSERT_EQ(expected.port, actual.port);
+}
+
+void AssertEqual(const std::vector<FlightEndpoint>& expected,
+ const std::vector<FlightEndpoint>& actual) {
+ ASSERT_EQ(expected.size(), actual.size());
+ for (size_t i = 0; i < expected.size(); ++i) {
+ AssertEqual(expected[i].ticket, actual[i].ticket);
+
+ ASSERT_EQ(expected[i].locations.size(), actual[i].locations.size());
+ for (size_t j = 0; j < expected[i].locations.size(); ++j) {
+ AssertEqual(expected[i].locations[j], actual[i].locations[j]);
+ }
+ }
+}
+
+void AssertEqual(const FlightInfo& expected, const FlightInfo& actual) {
+ std::shared_ptr<Schema> ex_schema, actual_schema;
+ ASSERT_OK(expected.GetSchema(&ex_schema));
+ ASSERT_OK(actual.GetSchema(&actual_schema));
+
+ AssertSchemaEqual(*ex_schema, *actual_schema);
+ ASSERT_EQ(expected.total_records(), actual.total_records());
+ ASSERT_EQ(expected.total_bytes(), actual.total_bytes());
+
+ AssertEqual(expected.descriptor(), actual.descriptor());
+ AssertEqual(expected.endpoints(), actual.endpoints());
+}
+
+void AssertEqual(const ActionType& expected, const ActionType& actual) {
+ ASSERT_EQ(expected.type, actual.type);
+ ASSERT_EQ(expected.description, actual.description);
+}
+
+template <typename T>
+void AssertEqual(const std::vector<T>& expected, const std::vector<T>& actual) {
+ ASSERT_EQ(expected.size(), actual.size());
+ for (size_t i = 0; i < expected.size(); ++i) {
+ AssertEqual(expected[i], actual[i]);
+ }
+}
+
+TEST_F(TestFlightClient, ListFlights) {
+ std::unique_ptr<FlightListing> listing;
+ ASSERT_OK(client_->ListFlights(&listing));
+ ASSERT_TRUE(listing != nullptr);
+
+ std::vector<FlightInfo> flights = ExampleFlightInfo();
+ std::unique_ptr<FlightInfo> info;
+
+ for (const FlightInfo& flight : flights) {
+ ASSERT_OK(listing->Next(&info));
+ AssertEqual(flight, *info);
+ }
+ ASSERT_OK(listing->Next(&info));
+ ASSERT_TRUE(info == nullptr);
+
+ ASSERT_OK(listing->Next(&info));
+}
+
+TEST_F(TestFlightClient, GetFlightInfo) {
+ FlightDescriptor descr{FlightDescriptor::PATH, "", {"foo", "bar"}};
+ std::unique_ptr<FlightInfo> info;
+ ASSERT_OK(client_->GetFlightInfo(descr, &info));
+
+ ASSERT_TRUE(info != nullptr);
+
+ std::vector<FlightInfo> flights = ExampleFlightInfo();
+ AssertEqual(flights[0], *info);
+}
+
+TEST(TestFlightProtocol, FlightDescriptor) {
+ FlightDescriptor descr_test;
+ pb::FlightDescriptor pb_descr;
+
+ FlightDescriptor descr1{FlightDescriptor::PATH, "", {"foo", "bar"}};
+ ASSERT_OK(internal::ToProto(descr1, &pb_descr));
+ ASSERT_OK(internal::FromProto(pb_descr, &descr_test));
+ AssertEqual(descr1, descr_test);
+
+ FlightDescriptor descr2{FlightDescriptor::CMD, "command", {}};
+ ASSERT_OK(internal::ToProto(descr2, &pb_descr));
+ ASSERT_OK(internal::FromProto(pb_descr, &descr_test));
+ AssertEqual(descr2, descr_test);
+}
+
+TEST_F(TestFlightClient, DoGet) {
+ FlightDescriptor descr{FlightDescriptor::PATH, "", {"foo", "bar"}};
+ std::unique_ptr<FlightInfo> info;
+ ASSERT_OK(client_->GetFlightInfo(descr, &info));
+
+ // Two endpoints in the example FlightInfo
+ ASSERT_EQ(2, info->endpoints().size());
+
+ Ticket ticket = info->endpoints()[0].ticket;
+ AssertEqual(Ticket{"ticket-id-1"}, ticket);
+
+ std::shared_ptr<Schema> schema;
+ ASSERT_OK(info->GetSchema(&schema));
+
+ auto expected_schema = ExampleSchema1();
+ AssertSchemaEqual(*expected_schema, *schema);
+
+ std::unique_ptr<RecordBatchReader> stream;
+ ASSERT_OK(client_->DoGet(ticket, schema, &stream));
+
+ BatchVector expected_batches;
+ const int num_batches = 5;
+ ASSERT_OK(SimpleIntegerBatches(num_batches, &expected_batches));
+ std::shared_ptr<RecordBatch> chunk;
+ for (int i = 0; i < num_batches; ++i) {
+ ASSERT_OK(stream->ReadNext(&chunk));
+ ASSERT_BATCHES_EQUAL(*expected_batches[i], *chunk);
+ }
+
+ // Stream exhausted
+ ASSERT_OK(stream->ReadNext(&chunk));
+ ASSERT_EQ(nullptr, chunk);
+}
+
+TEST_F(TestFlightClient, ListActions) {
+ std::vector<ActionType> actions;
+ ASSERT_OK(client_->ListActions(&actions));
+
+ std::vector<ActionType> expected = ExampleActionTypes();
+ AssertEqual(expected, actions);
+}
+
+TEST_F(TestFlightClient, DoAction) {
+ Action action;
+ std::unique_ptr<ResultStream> stream;
+ std::unique_ptr<Result> result;
+
+ // Run action1
+ action.type = "action1";
+
+ const std::string action1_value = "action1-content";
+ ASSERT_OK(Buffer::FromString(action1_value, &action.body));
+ ASSERT_OK(client_->DoAction(action, &stream));
+
+ for (int i = 0; i < 3; ++i) {
+ ASSERT_OK(stream->Next(&result));
+ std::string expected = action1_value + "-part" + std::to_string(i);
+ ASSERT_EQ(expected, result->body->ToString());
+ }
+
+ // stream consumed
+ ASSERT_OK(stream->Next(&result));
+ ASSERT_EQ(nullptr, result);
+
+ // Run action2, no results
+ action.type = "action2";
+ ASSERT_OK(client_->DoAction(action, &stream));
+
+ ASSERT_OK(stream->Next(&result));
+ ASSERT_EQ(nullptr, result);
+}
+
+} // namespace flight
+} // namespace arrow
diff --git a/cpp/src/arrow/flight/internal.cc b/cpp/src/arrow/flight/internal.cc
new file mode 100644
index 0000000..796e609
--- /dev/null
+++ b/cpp/src/arrow/flight/internal.cc
@@ -0,0 +1,235 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/flight/internal.h"
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include <grpcpp/grpcpp.h>
+
+#include "arrow/io/memory.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/ipc/writer.h"
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+namespace flight {
+namespace internal {
+
+Status FromGrpcStatus(const grpc::Status& grpc_status) {
+ if (grpc_status.ok()) {
+ return Status::OK();
+ }
+ std::stringstream ss;
+
+ if (grpc_status.error_code() == grpc::StatusCode::UNIMPLEMENTED) {
+ ss << "gRPC returned unimplemented error, with message: "
+ << grpc_status.error_message();
+ return Status::NotImplemented(ss.str());
+ } else {
+ ss << "gRPC failed with error code " << grpc_status.error_code()
+ << " and message: " << grpc_status.error_message();
+ return Status::IOError(ss.str());
+ }
+}
+
+grpc::Status ToGrpcStatus(const Status& arrow_status) {
+ if (arrow_status.ok()) {
+ return grpc::Status::OK;
+ } else {
+ grpc::StatusCode grpc_code = grpc::StatusCode::UNKNOWN;
+ if (arrow_status.IsNotImplemented()) {
+ grpc_code = grpc::StatusCode::UNIMPLEMENTED;
+ } else if (arrow_status.IsInvalid()) {
+ grpc_code = grpc::StatusCode::INVALID_ARGUMENT;
+ }
+ return grpc::Status(grpc_code, arrow_status.message());
+ }
+}
+
+// ActionType
+
+Status FromProto(const pb::ActionType& pb_type, ActionType* type) {
+ type->type = pb_type.type();
+ type->description = pb_type.description();
+ return Status::OK();
+}
+
+Status ToProto(const ActionType& type, pb::ActionType* pb_type) {
+ pb_type->set_type(type.type);
+ pb_type->set_description(type.description);
+ return Status::OK();
+}
+
+// Action
+
+Status FromProto(const pb::Action& pb_action, Action* action) {
+ action->type = pb_action.type();
+ return Buffer::FromString(pb_action.body(), &action->body);
+}
+
+Status ToProto(const Action& action, pb::Action* pb_action) {
+ pb_action->set_type(action.type);
+ pb_action->set_body(action.body->ToString());
+ return Status::OK();
+}
+
+// Result (of an Action)
+
+Status FromProto(const pb::Result& pb_result, Result* result) {
+ // ARROW-3250; can avoid copy. Can also write custom deserializer if it
+ // becomes an issue
+ return Buffer::FromString(pb_result.body(), &result->body);
+}
+
+Status ToProto(const Result& result, pb::Result* pb_result) {
+ pb_result->set_body(result.body->ToString());
+ return Status::OK();
+}
+
+// Criteria
+
+Status FromProto(const pb::Criteria& pb_criteria, Criteria* criteria) {
+ return Status::OK();
+}
+
+// Location
+
+Status FromProto(const pb::Location& pb_location, Location* location) {
+ location->host = pb_location.host();
+ location->port = pb_location.port();
+ return Status::OK();
+}
+
+void ToProto(const Location& location, pb::Location* pb_location) {
+ pb_location->set_host(location.host);
+ pb_location->set_port(location.port);
+}
+
+// Ticket
+
+Status FromProto(const pb::Ticket& pb_ticket, Ticket* ticket) {
+ ticket->ticket = pb_ticket.ticket();
+ return Status::OK();
+}
+
+void ToProto(const Ticket& ticket, pb::Ticket* pb_ticket) {
+ pb_ticket->set_ticket(ticket.ticket);
+}
+
+// FlightEndpoint
+
+Status FromProto(const pb::FlightEndpoint& pb_endpoint, FlightEndpoint* endpoint) {
+ RETURN_NOT_OK(FromProto(pb_endpoint.ticket(), &endpoint->ticket));
+ endpoint->locations.resize(pb_endpoint.location_size());
+ for (int i = 0; i < pb_endpoint.location_size(); ++i) {
+ RETURN_NOT_OK(FromProto(pb_endpoint.location(i), &endpoint->locations[i]));
+ }
+ return Status::OK();
+}
+
+void ToProto(const FlightEndpoint& endpoint, pb::FlightEndpoint* pb_endpoint) {
+ ToProto(endpoint.ticket, pb_endpoint->mutable_ticket());
+ pb_endpoint->clear_location();
+ for (const Location& location : endpoint.locations) {
+ ToProto(location, pb_endpoint->add_location());
+ }
+}
+
+// FlightDescriptor
+
+Status FromProto(const pb::FlightDescriptor& pb_descriptor,
+ FlightDescriptor* descriptor) {
+ if (pb_descriptor.type() == pb::FlightDescriptor::PATH) {
+ descriptor->type = FlightDescriptor::PATH;
+ descriptor->path.resize(pb_descriptor.path_size());
+ for (int i = 0; i < pb_descriptor.path_size(); ++i) {
+ descriptor->path.emplace_back(pb_descriptor.path(i));
+ }
+ } else if (pb_descriptor.type() == pb::FlightDescriptor::CMD) {
+ descriptor->type = FlightDescriptor::CMD;
+ descriptor->cmd = pb_descriptor.cmd();
+ } else {
+ return Status::Invalid("Client sent UNKNOWN descriptor type");
+ }
+ return Status::OK();
+}
+
+Status ToProto(const FlightDescriptor& descriptor, pb::FlightDescriptor* pb_descriptor) {
+ if (descriptor.type == FlightDescriptor::PATH) {
+ pb_descriptor->set_type(pb::FlightDescriptor::PATH);
+ for (const std::string& path : descriptor.path) {
+ pb_descriptor->add_path(path);
+ }
+ } else {
+ pb_descriptor->set_type(pb::FlightDescriptor::CMD);
+ pb_descriptor->set_cmd(descriptor.cmd);
+ }
+ return Status::OK();
+}
+
+// FlightGetInfo
+
+Status FromProto(const pb::FlightGetInfo& pb_info, FlightInfo::Data* info) {
+ RETURN_NOT_OK(FromProto(pb_info.flight_descriptor(), &info->descriptor));
+
+ info->schema = pb_info.schema();
+
+ info->endpoints.resize(pb_info.endpoint_size());
+ for (int i = 0; i < pb_info.endpoint_size(); ++i) {
+ RETURN_NOT_OK(FromProto(pb_info.endpoint(i), &info->endpoints[i]));
+ }
+
+ info->total_records = pb_info.total_records();
+ info->total_bytes = pb_info.total_bytes();
+ return Status::OK();
+}
+
+Status SchemaToString(const Schema& schema, std::string* out) {
+ // TODO(wesm): Do we care about better memory efficiency here?
+ std::shared_ptr<Buffer> serialized_schema;
+ RETURN_NOT_OK(ipc::SerializeSchema(schema, default_memory_pool(), &serialized_schema));
+ *out = std::string(reinterpret_cast<const char*>(serialized_schema->data()),
+ static_cast<size_t>(serialized_schema->size()));
+ return Status::OK();
+}
+
+Status ToProto(const FlightInfo& info, pb::FlightGetInfo* pb_info) {
+ // clear any repeated fields
+ pb_info->clear_endpoint();
+
+ pb_info->set_schema(info.serialized_schema());
+
+ // descriptor
+ RETURN_NOT_OK(ToProto(info.descriptor(), pb_info->mutable_flight_descriptor()));
+
+ // endpoints
+ for (const FlightEndpoint& endpoint : info.endpoints()) {
+ ToProto(endpoint, pb_info->add_endpoint());
+ }
+
+ pb_info->set_total_records(info.total_records());
+ pb_info->set_total_bytes(info.total_bytes());
+ return Status::OK();
+}
+
+} // namespace internal
+} // namespace flight
+} // namespace arrow
diff --git a/cpp/src/arrow/flight/internal.h b/cpp/src/arrow/flight/internal.h
new file mode 100644
index 0000000..bae1eed
--- /dev/null
+++ b/cpp/src/arrow/flight/internal.h
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include <grpcpp/grpcpp.h>
+
+#include "arrow/buffer.h"
+#include "arrow/ipc/writer.h"
+#include "arrow/util/macros.h"
+
+#include "arrow/flight/Flight.grpc.pb.h"
+#include "arrow/flight/Flight.pb.h"
+#include "arrow/flight/types.h"
+
+namespace arrow {
+
+class Schema;
+class Status;
+
+namespace pb = arrow::flight::protocol;
+
+namespace flight {
+
+#define GRPC_RETURN_NOT_OK(s) \
+ do { \
+ ::arrow::Status _s = (s); \
+ if (ARROW_PREDICT_FALSE(!_s.ok())) { \
+ return ::arrow::flight::internal::ToGrpcStatus(_s); \
+ } \
+ } while (0)
+
+namespace internal {
+
+Status SchemaToString(const Schema& schema, std::string* out);
+
+Status FromProto(const pb::ActionType& pb_type, ActionType* type);
+Status FromProto(const pb::Action& pb_action, Action* action);
+Status FromProto(const pb::Result& pb_result, Result* result);
+Status FromProto(const pb::Criteria& pb_criteria, Criteria* criteria);
+Status FromProto(const pb::Location& pb_location, Location* location);
+Status FromProto(const pb::Ticket& pb_ticket, Ticket* ticket);
+Status FromProto(const pb::FlightDescriptor& pb_descr, FlightDescriptor* descr);
+Status FromProto(const pb::FlightEndpoint& pb_endpoint, FlightEndpoint* endpoint);
+Status FromProto(const pb::FlightGetInfo& pb_info, FlightInfo::Data* info);
+
+Status ToProto(const FlightDescriptor& descr, pb::FlightDescriptor* pb_descr);
+Status ToProto(const FlightInfo& info, pb::FlightGetInfo* pb_info);
+Status ToProto(const ActionType& type, pb::ActionType* pb_type);
+Status ToProto(const Action& action, pb::Action* pb_action);
+Status ToProto(const Result& result, pb::Result* pb_result);
+void ToProto(const Ticket& ticket, pb::Ticket* pb_ticket);
+
+Status FromGrpcStatus(const grpc::Status& grpc_status);
+
+grpc::Status ToGrpcStatus(const Status& arrow_status);
+
+} // namespace internal
+} // namespace flight
+} // namespace arrow
diff --git a/cpp/src/arrow/flight/perf-server.cc b/cpp/src/arrow/flight/perf-server.cc
new file mode 100644
index 0000000..ce2ec7b
--- /dev/null
+++ b/cpp/src/arrow/flight/perf-server.cc
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Performance server for benchmarking purposes
+
+#include <signal.h>
+#include <cstdint>
+#include <iostream>
+#include <memory>
+#include <string>
+
+#include <gflags/gflags.h>
+
+#include "arrow/array.h"
+#include "arrow/io/test-common.h"
+#include "arrow/ipc/writer.h"
+#include "arrow/record_batch.h"
+
+#include "arrow/flight/perf.pb.h"
+#include "arrow/flight/server.h"
+#include "arrow/flight/test-util.h"
+
+DEFINE_int32(port, 31337, "Server port to listen on");
+
+namespace perf = arrow::flight::perf;
+namespace proto = arrow::flight::protocol;
+
+using IpcPayload = arrow::ipc::internal::IpcPayload;
+
+namespace arrow {
+namespace flight {
+
+#define CHECK_PARSE(EXPR) \
+ do { \
+ if (!EXPR) { \
+ return Status::Invalid("cannot parse protobuf"); \
+ } \
+ } while (0)
+
+using ArrayVector = std::vector<std::shared_ptr<Array>>;
+
+// Create record batches with a unique "a" column so we can verify on the
+// client side that the results are correct
+class PerfDataStream : public FlightDataStream {
+ public:
+ PerfDataStream(bool verify, const int64_t start, const int64_t total_records,
+ const std::shared_ptr<Schema>& schema, const ArrayVector& arrays)
+ : start_(start),
+ verify_(verify),
+ batch_length_(arrays[0]->length()),
+ total_records_(total_records),
+ records_sent_(0),
+ schema_(schema),
+ arrays_(arrays) {
+ batch_ = RecordBatch::Make(schema, batch_length_, arrays_);
+ }
+
+ Status Next(IpcPayload* payload) override {
+ if (records_sent_ >= total_records_) {
+ // Signal that iteration is over
+ payload->metadata = nullptr;
+ return Status::OK();
+ }
+
+ if (verify_) {
+ // mutate first array
+ auto data =
+ reinterpret_cast<int64_t*>(arrays_[0]->data()->buffers[1]->mutable_data());
+ for (int64_t i = 0; i < batch_length_; ++i) {
+ data[i] = start_ + records_sent_ + i;
+ }
+ }
+
+ auto batch = batch_;
+
+ // Last partial batch
+ if (records_sent_ + batch_length_ > total_records_) {
+ batch = batch_->Slice(0, total_records_ - records_sent_);
+ records_sent_ += total_records_ - records_sent_;
+ } else {
+ records_sent_ += batch_length_;
+ }
+ return ipc::internal::GetRecordBatchPayload(*batch, default_memory_pool(), payload);
+ }
+
+ private:
+ const int64_t start_;
+ bool verify_;
+ const int64_t batch_length_;
+ const int64_t total_records_;
+ int64_t records_sent_;
+ std::shared_ptr<Schema> schema_;
+ std::shared_ptr<RecordBatch> batch_;
+ ArrayVector arrays_;
+};
+
+Status GetPerfBatches(const perf::Token& token, const std::shared_ptr<Schema>& schema,
+ bool use_verifier, std::unique_ptr<FlightDataStream>* data_stream) {
+ std::shared_ptr<ResizableBuffer> buffer;
+ std::vector<std::shared_ptr<Array>> arrays;
+
+ const int32_t length = token.definition().records_per_batch();
+ const int32_t ncolumns = 4;
+ for (int i = 0; i < ncolumns; ++i) {
+ RETURN_NOT_OK(MakeRandomBuffer<int64_t>(length, default_memory_pool(), &buffer));
+ arrays.push_back(std::make_shared<Int64Array>(length, buffer));
+ }
+
+ *data_stream = std::unique_ptr<FlightDataStream>(
+ new PerfDataStream(use_verifier, token.start(),
+ token.definition().records_per_stream(), schema, arrays));
+ return Status::OK();
+}
+
+class FlightPerfServer : public FlightServerBase {
+ public:
+ FlightPerfServer() : location_(Location{"localhost", FLAGS_port}) {
+ perf_schema_ = schema({field("a", int64()), field("b", int64()), field("c", int64()),
+ field("d", int64())});
+ }
+
+ Status GetFlightInfo(const FlightDescriptor& request,
+ std::unique_ptr<FlightInfo>* info) override {
+ perf::Perf perf_request;
+ CHECK_PARSE(perf_request.ParseFromString(request.cmd));
+
+ perf::Token token;
+ token.mutable_definition()->CopyFrom(perf_request);
+
+ std::vector<FlightEndpoint> endpoints;
+ Ticket tmp_ticket;
+ for (int64_t i = 0; i < perf_request.stream_count(); ++i) {
+ token.set_start(i * perf_request.records_per_stream());
+ token.set_end((i + 1) * perf_request.records_per_stream());
+
+ (void)token.SerializeToString(&tmp_ticket.ticket);
+
+ // All endpoints same location for now
+ endpoints.push_back(FlightEndpoint{tmp_ticket, {location_}});
+ }
+
+ uint64_t total_records =
+ perf_request.stream_count() * perf_request.records_per_stream();
+
+ FlightInfo::Data data;
+ RETURN_NOT_OK(
+ MakeFlightInfo(*perf_schema_, request, endpoints, total_records, -1, &data));
+ *info = std::unique_ptr<FlightInfo>(new FlightInfo(data));
+ return Status::OK();
+ }
+
+ Status DoGet(const Ticket& request,
+ std::unique_ptr<FlightDataStream>* data_stream) override {
+ perf::Token token;
+ CHECK_PARSE(token.ParseFromString(request.ticket));
+ return GetPerfBatches(token, perf_schema_, false, data_stream);
+ }
+
+ private:
+ Location location_;
+ std::shared_ptr<Schema> perf_schema_;
+};
+
+} // namespace flight
+} // namespace arrow
+
+std::unique_ptr<arrow::flight::FlightPerfServer> g_server;
+
+void Shutdown(int signal) {
+ if (g_server != nullptr) {
+ g_server->Shutdown();
+ }
+}
+
+int main(int argc, char** argv) {
+ gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+ // SIGTERM shuts down the server
+ signal(SIGTERM, Shutdown);
+
+ g_server.reset(new arrow::flight::FlightPerfServer);
+
+ // TODO(wesm): How can we tell if the server failed to start for some reason?
+ g_server->Run(FLAGS_port);
+ return 0;
+}
diff --git a/cpp/src/arrow/flight/perf.proto b/cpp/src/arrow/flight/perf.proto
new file mode 100644
index 0000000..9123baf
--- /dev/null
+++ b/cpp/src/arrow/flight/perf.proto
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package arrow.flight.perf;
+
+message Perf {
+ bytes schema = 1;
+ int32 stream_count = 2;
+ int64 records_per_stream = 3;
+ int32 records_per_batch = 4;
+}
+
+/*
+ * Payload of ticket
+ */
+message Token {
+
+ // definition of entire flight.
+ Perf definition = 1;
+
+ // inclusive start
+ int64 start = 2;
+
+ // exclusive end
+ int64 end = 3;
+
+}
\ No newline at end of file
diff --git a/cpp/src/arrow/flight/server.cc b/cpp/src/arrow/flight/server.cc
new file mode 100644
index 0000000..967a254
--- /dev/null
+++ b/cpp/src/arrow/flight/server.cc
@@ -0,0 +1,385 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/flight/server.h"
+
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <string>
+
+#include "google/protobuf/io/coded_stream.h"
+#include "google/protobuf/io/zero_copy_stream.h"
+#include "google/protobuf/wire_format_lite.h"
+#include "grpcpp/grpcpp.h"
+
+#include "arrow/ipc/writer.h"
+#include "arrow/record_batch.h"
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+
+#include "arrow/flight/Flight.grpc.pb.h"
+#include "arrow/flight/Flight.pb.h"
+#include "arrow/flight/internal.h"
+#include "arrow/flight/types.h"
+
+using FlightService = arrow::flight::protocol::FlightService;
+using ServerContext = grpc::ServerContext;
+
+using arrow::ipc::internal::IpcPayload;
+
+template <typename T>
+using ServerWriter = grpc::ServerWriter<T>;
+
+namespace pb = arrow::flight::protocol;
+
+constexpr int64_t kInt32Max = std::numeric_limits<int32_t>::max();
+
+namespace grpc {
+
+using google::protobuf::internal::WireFormatLite;
+using google::protobuf::io::CodedOutputStream;
+
+// More efficient writing of FlightData to gRPC output buffer
+// Implementation of ZeroCopyOutputStream that writes to a fixed-size buffer
+class FixedSizeProtoWriter : public ::google::protobuf::io::ZeroCopyOutputStream {
+ public:
+ explicit FixedSizeProtoWriter(grpc_slice slice)
+ : slice_(slice),
+ bytes_written_(0),
+ total_size_(static_cast<int>(GRPC_SLICE_LENGTH(slice))) {}
+
+ bool Next(void** data, int* size) override {
+ // Consume the whole slice
+ *data = GRPC_SLICE_START_PTR(slice_) + bytes_written_;
+ *size = total_size_ - bytes_written_;
+ bytes_written_ = total_size_;
+ return true;
+ }
+
+ void BackUp(int count) override { bytes_written_ -= count; }
+
+ int64_t ByteCount() const override { return bytes_written_; }
+
+ private:
+ grpc_slice slice_;
+ int bytes_written_;
+ int total_size_;
+};
+
+// Write FlightData to a grpc::ByteBuffer without extra copying
+template <>
+class SerializationTraits<IpcPayload> {
+ public:
+ static grpc::Status Deserialize(ByteBuffer* buffer, IpcPayload* out) {
+ return grpc::Status(grpc::StatusCode::UNIMPLEMENTED,
+ "IpcPayload deserialization not implemented");
+ }
+
+ static grpc::Status Serialize(const IpcPayload& msg, ByteBuffer* out,
+ bool* own_buffer) {
+ size_t total_size = 0;
+
+ DCHECK_LT(msg.metadata->size(), kInt32Max);
+ const int32_t metadata_size = static_cast<int32_t>(msg.metadata->size());
+
+ // 1 byte for metadata tag
+ total_size += 1 + WireFormatLite::LengthDelimitedSize(metadata_size);
+
+ int64_t body_size = 0;
+ for (const auto& buffer : msg.body_buffers) {
+ body_size += buffer->size();
+
+ const int64_t remainder = buffer->size() % 8;
+ if (remainder) {
+ body_size += 8 - remainder;
+ }
+ }
+
+ // 2 bytes for body tag
+ total_size += 2 + WireFormatLite::LengthDelimitedSize(static_cast<size_t>(body_size));
+
+ // TODO(wesm): messages over 2GB unlikely to be yet supported
+ if (total_size > kInt32Max) {
+ return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
+ "Cannot send record batches exceeding 2GB yet");
+ }
+
+ // Allocate slice, assign to output buffer
+ grpc::Slice slice(total_size);
+
+ // XXX(wesm): for debugging
+ // std::cout << "Writing record batch with total size " << total_size << std::endl;
+
+ FixedSizeProtoWriter writer(*reinterpret_cast<grpc_slice*>(&slice));
+ CodedOutputStream pb_stream(&writer);
+
+ // Write header
+ WireFormatLite::WriteTag(pb::FlightData::kDataHeaderFieldNumber,
+ WireFormatLite::WIRETYPE_LENGTH_DELIMITED, &pb_stream);
+ pb_stream.WriteVarint32(metadata_size);
+ pb_stream.WriteRawMaybeAliased(msg.metadata->data(),
+ static_cast<int>(msg.metadata->size()));
+
+ // Write body
+ WireFormatLite::WriteTag(pb::FlightData::kDataBodyFieldNumber,
+ WireFormatLite::WIRETYPE_LENGTH_DELIMITED, &pb_stream);
+ pb_stream.WriteVarint32(static_cast<uint32_t>(body_size));
+
+ constexpr uint8_t kPaddingBytes[8] = {0};
+
+ for (const auto& buffer : msg.body_buffers) {
+ pb_stream.WriteRawMaybeAliased(buffer->data(), static_cast<int>(buffer->size()));
+
+ // Write padding if not multiple of 8
+ const int remainder = buffer->size() % 8;
+ if (remainder) {
+ pb_stream.WriteRawMaybeAliased(kPaddingBytes, 8 - remainder);
+ }
+ }
+
+ DCHECK_EQ(static_cast<int>(total_size), pb_stream.ByteCount());
+
+ // Hand off the slice to the returned ByteBuffer
+ grpc::ByteBuffer tmp(&slice, 1);
+ out->Swap(&tmp);
+ *own_buffer = true;
+ return grpc::Status::OK;
+ }
+};
+
+} // namespace grpc
+
+namespace arrow {
+namespace flight {
+
+#define CHECK_ARG_NOT_NULL(VAL, MESSAGE) \
+ if (VAL == nullptr) { \
+ return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, MESSAGE); \
+ }
+
+// This class glues an implementation of FlightServerBase together with the
+// gRPC service definition, so the latter is not exposed in the public API
+class FlightServiceImpl : public FlightService::Service {
+ public:
+ explicit FlightServiceImpl(FlightServerBase* server) : server_(server) {}
+
+ template <typename UserType, typename Iterator, typename ProtoType>
+ grpc::Status WriteStream(Iterator* iterator, ServerWriter<ProtoType>* writer) {
+ // Write flight info to stream until listing is exhausted
+ ProtoType pb_value;
+ std::unique_ptr<UserType> value;
+ while (true) {
+ GRPC_RETURN_NOT_OK(iterator->Next(&value));
+ if (!value) {
+ break;
+ }
+ GRPC_RETURN_NOT_OK(internal::ToProto(*value, &pb_value));
+
+ // Blocking write
+ if (!writer->Write(pb_value)) {
+ // Write returns false if the stream is closed
+ break;
+ }
+ }
+ return grpc::Status::OK;
+ }
+
+ template <typename UserType, typename ProtoType>
+ grpc::Status WriteStream(const std::vector<UserType>& values,
+ ServerWriter<ProtoType>* writer) {
+ // Write flight info to stream until listing is exhausted
+ ProtoType pb_value;
+ for (const UserType& value : values) {
+ GRPC_RETURN_NOT_OK(internal::ToProto(value, &pb_value));
+ // Blocking write
+ if (!writer->Write(pb_value)) {
+ // Write returns false if the stream is closed
+ break;
+ }
+ }
+ return grpc::Status::OK;
+ }
+
+ grpc::Status ListFlights(ServerContext* context, const pb::Criteria* request,
+ ServerWriter<pb::FlightGetInfo>* writer) {
+ // Retrieve the listing from the implementation
+ std::unique_ptr<FlightListing> listing;
+
+ Criteria criteria;
+ if (request) {
+ GRPC_RETURN_NOT_OK(internal::FromProto(*request, &criteria));
+ }
+ GRPC_RETURN_NOT_OK(server_->ListFlights(&criteria, &listing));
+ return WriteStream<FlightInfo>(listing.get(), writer);
+ }
+
+ grpc::Status GetFlightInfo(ServerContext* context, const pb::FlightDescriptor* request,
+ pb::FlightGetInfo* response) {
+ CHECK_ARG_NOT_NULL(request, "FlightDescriptor cannot be null");
+
+ FlightDescriptor descr;
+ GRPC_RETURN_NOT_OK(internal::FromProto(*request, &descr));
+
+ std::unique_ptr<FlightInfo> info;
+ GRPC_RETURN_NOT_OK(server_->GetFlightInfo(descr, &info));
+
+ GRPC_RETURN_NOT_OK(internal::ToProto(*info, response));
+ return grpc::Status::OK;
+ }
+
+ grpc::Status DoGet(ServerContext* context, const pb::Ticket* request,
+ ServerWriter<pb::FlightData>* writer) {
+ CHECK_ARG_NOT_NULL(request, "ticket cannot be null");
+
+ Ticket ticket;
+ GRPC_RETURN_NOT_OK(internal::FromProto(*request, &ticket));
+
+ std::unique_ptr<FlightDataStream> data_stream;
+ GRPC_RETURN_NOT_OK(server_->DoGet(ticket, &data_stream));
+
+ // Requires ServerWriter customization in grpc_customizations.h
+ auto custom_writer = reinterpret_cast<ServerWriter<IpcPayload>*>(writer);
+
+ while (true) {
+ IpcPayload payload;
+ GRPC_RETURN_NOT_OK(data_stream->Next(&payload));
+ if (payload.metadata == nullptr ||
+ !custom_writer->Write(payload, grpc::WriteOptions())) {
+ // No more messages to write, or connection terminated for some other
+ // reason
+ break;
+ }
+ }
+ return grpc::Status::OK;
+ }
+
+ grpc::Status DoPut(ServerContext* context, grpc::ServerReader<pb::FlightData>* reader,
+ pb::PutResult* response) {
+ return grpc::Status(grpc::StatusCode::UNIMPLEMENTED, "");
+ }
+
+ grpc::Status ListActions(ServerContext* context, const pb::Empty* request,
+ ServerWriter<pb::ActionType>* writer) {
+ // Retrieve the listing from the implementation
+ std::vector<ActionType> types;
+ GRPC_RETURN_NOT_OK(server_->ListActions(&types));
+ return WriteStream<ActionType>(types, writer);
+ }
+
+ grpc::Status DoAction(ServerContext* context, const pb::Action* request,
+ ServerWriter<pb::Result>* writer) {
+ CHECK_ARG_NOT_NULL(request, "Action cannot be null");
+ Action action;
+ GRPC_RETURN_NOT_OK(internal::FromProto(*request, &action));
+
+ std::unique_ptr<ResultStream> results;
+ GRPC_RETURN_NOT_OK(server_->DoAction(action, &results));
+
+ std::unique_ptr<Result> result;
+ pb::Result pb_result;
+ while (true) {
+ GRPC_RETURN_NOT_OK(results->Next(&result));
+ if (!result) {
+ // No more results
+ break;
+ }
+ GRPC_RETURN_NOT_OK(internal::ToProto(*result, &pb_result));
+ if (!writer->Write(pb_result)) {
+ // Stream may be closed
+ break;
+ }
+ }
+ return grpc::Status::OK;
+ }
+
+ private:
+ FlightServerBase* server_;
+};
+
+struct FlightServerBase::FlightServerBaseImpl {
+ std::unique_ptr<grpc::Server> server;
+};
+
+FlightServerBase::FlightServerBase() { impl_.reset(new FlightServerBaseImpl); }
+
+FlightServerBase::~FlightServerBase() {}
+
+void FlightServerBase::Run(int port) {
+ std::string address = "localhost:" + std::to_string(port);
+
+ FlightServiceImpl service(this);
+ grpc::ServerBuilder builder;
+ builder.AddListeningPort(address, grpc::InsecureServerCredentials());
+ builder.RegisterService(&service);
+
+ impl_->server = builder.BuildAndStart();
+ std::cout << "Server listening on " << address << std::endl;
+ impl_->server->Wait();
+}
+
+void FlightServerBase::Shutdown() {
+ DCHECK(impl_->server);
+ impl_->server->Shutdown();
+}
+
+Status FlightServerBase::ListFlights(const Criteria* criteria,
+ std::unique_ptr<FlightListing>* listings) {
+ return Status::NotImplemented("NYI");
+}
+
+Status FlightServerBase::GetFlightInfo(const FlightDescriptor& request,
+ std::unique_ptr<FlightInfo>* info) {
+ std::cout << "GetFlightInfo" << std::endl;
+ return Status::NotImplemented("NYI");
+}
+
+Status FlightServerBase::DoGet(const Ticket& request,
+ std::unique_ptr<FlightDataStream>* data_stream) {
+ return Status::NotImplemented("NYI");
+}
+
+Status FlightServerBase::DoAction(const Action& action,
+ std::unique_ptr<ResultStream>* result) {
+ return Status::NotImplemented("NYI");
+}
+
+Status FlightServerBase::ListActions(std::vector<ActionType>* actions) {
+ return Status::NotImplemented("NYI");
+}
+
+// ----------------------------------------------------------------------
+// Implement RecordBatchStream
+
+RecordBatchStream::RecordBatchStream(const std::shared_ptr<RecordBatchReader>& reader)
+ : pool_(default_memory_pool()), reader_(reader) {}
+
+Status RecordBatchStream::Next(IpcPayload* payload) {
+ std::shared_ptr<RecordBatch> batch;
+ RETURN_NOT_OK(reader_->ReadNext(&batch));
+
+ if (!batch) {
+ // Signal that iteration is over
+ payload->metadata = nullptr;
+ return Status::OK();
+ } else {
+ return ipc::internal::GetRecordBatchPayload(*batch, pool_, payload);
+ }
+}
+
+} // namespace flight
+} // namespace arrow
diff --git a/cpp/src/arrow/flight/server.h b/cpp/src/arrow/flight/server.h
new file mode 100644
index 0000000..89154ac
--- /dev/null
+++ b/cpp/src/arrow/flight/server.h
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Interfaces to use for defining Flight RPC servers. API should be considered
+// experimental for now
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/util/visibility.h"
+
+#include "arrow/flight/types.h"
+
+namespace arrow {
+
+class MemoryPool;
+class RecordBatchReader;
+class Status;
+
+namespace ipc {
+namespace internal {
+
+struct IpcPayload;
+
+} // namespace internal
+} // namespace ipc
+
+namespace io {
+
+class OutputStream;
+
+} // namespace io
+
+namespace flight {
+
+/// \brief Interface that produces a sequence of IPC payloads to be sent in
+/// FlightData protobuf messages
+class ARROW_EXPORT FlightDataStream {
+ public:
+ virtual ~FlightDataStream() = default;
+
+ // When the stream is completed, the last payload written will have null
+ // metadata
+ virtual Status Next(ipc::internal::IpcPayload* payload) = 0;
+};
+
+/// \brief A basic implementation of FlightDataStream that will provide
+/// a sequence of FlightData messages to be written to a gRPC stream
+/// \param[in] reader produces a sequence of record batches
+class ARROW_EXPORT RecordBatchStream : public FlightDataStream {
+ public:
+ explicit RecordBatchStream(const std::shared_ptr<RecordBatchReader>& reader);
+
+ Status Next(ipc::internal::IpcPayload* payload) override;
+
+ private:
+ MemoryPool* pool_;
+ std::shared_ptr<RecordBatchReader> reader_;
+};
+
+/// \brief Skeleton RPC server implementation which can be used to create
+/// custom servers by implementing its abstract methods
+class ARROW_EXPORT FlightServerBase {
+ public:
+ FlightServerBase();
+ virtual ~FlightServerBase();
+
+ /// \brief Run an insecure server on localhost at the indicated port. Block
+ /// until server is shut down or otherwise terminates
+ /// \param[in] port
+ /// \return Status
+ void Run(int port);
+
+ /// \brief Shut down the server. Can be called from signal handler or another
+ /// thread while Run blocks
+ ///
+ /// TODO(wesm): Shutdown with deadline
+ void Shutdown();
+
+ // Implement these methods to create your own server. The default
+ // implementations will return a not-implemented result to the client
+
+ /// \brief Retrieve a list of available fields given an optional opaque
+ /// criteria
+ /// \param[in] criteria may be null
+ /// \param[out] listings the returned listings iterator
+ /// \return Status
+ virtual Status ListFlights(const Criteria* criteria,
+ std::unique_ptr<FlightListing>* listings);
+
+ /// \brief Retrieve the schema and an access plan for the indicated
+ /// descriptor
+ /// \param[in] request may be null
+ /// \param[out] info the returned flight info provider
+ /// \return Status
+ virtual Status GetFlightInfo(const FlightDescriptor& request,
+ std::unique_ptr<FlightInfo>* info);
+
+ /// \brief Get a stream of IPC payloads to put on the wire
+ /// \param[in] ticket an opaque ticket
+ /// \param[out] stream the returned stream provider
+ /// \return Status
+ virtual Status DoGet(const Ticket& request, std::unique_ptr<FlightDataStream>* stream);
+
+ // virtual Status DoPut(std::unique_ptr<FlightMessageReader>* reader) = 0;
+
+ /// \brief Execute an action, return stream of zero or more results
+ /// \param[in] action the action to execute, with type and body
+ /// \param[out] result the result iterator
+ /// \return Status
+ virtual Status DoAction(const Action& action, std::unique_ptr<ResultStream>* result);
+
+ /// \brief Retrieve the list of available actions
+ /// \param[out] actions a vector of available action types
+ /// \return Status
+ virtual Status ListActions(std::vector<ActionType>* actions);
+
+ private:
+ struct FlightServerBaseImpl;
+ std::unique_ptr<FlightServerBaseImpl> impl_;
+};
+
+} // namespace flight
+} // namespace arrow
diff --git a/cpp/src/arrow/flight/test-server.cc b/cpp/src/arrow/flight/test-server.cc
new file mode 100644
index 0000000..14b03d9
--- /dev/null
+++ b/cpp/src/arrow/flight/test-server.cc
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Example server implementation to use for unit testing and benchmarking
+// purposes
+
+#include <signal.h>
+#include <iostream>
+#include <memory>
+#include <string>
+
+#include <gflags/gflags.h>
+
+#include "arrow/io/test-common.h"
+#include "arrow/record_batch.h"
+
+#include "arrow/flight/server.h"
+#include "arrow/flight/test-util.h"
+
+DEFINE_int32(port, 31337, "Server port to listen on");
+
+namespace arrow {
+namespace flight {
+
+Status GetBatchForFlight(const Ticket& ticket, std::shared_ptr<RecordBatchReader>* out) {
+ if (ticket.ticket == "ticket-id-1") {
+ BatchVector batches;
+ RETURN_NOT_OK(SimpleIntegerBatches(5, &batches));
+ *out = std::make_shared<BatchIterator>(batches[0]->schema(), batches);
+ return Status::OK();
+ } else {
+ return Status::NotImplemented("no stream implemented for this ticket");
+ }
+}
+
+class FlightTestServer : public FlightServerBase {
+ Status ListFlights(const Criteria* criteria,
+ std::unique_ptr<FlightListing>* listings) override {
+ std::vector<FlightInfo> flights = ExampleFlightInfo();
+ *listings = std::unique_ptr<FlightListing>(new SimpleFlightListing(flights));
+ return Status::OK();
+ }
+
+ Status GetFlightInfo(const FlightDescriptor& request,
+ std::unique_ptr<FlightInfo>* info) override {
+ std::vector<FlightInfo> flights = ExampleFlightInfo();
+
+ const FlightInfo* value;
+
+ // We only have one kind of flight for each descriptor type
+ if (request.type == FlightDescriptor::PATH) {
+ value = &flights[0];
+ } else {
+ value = &flights[1];
+ }
+
+ *info = std::unique_ptr<FlightInfo>(new FlightInfo(*value));
+ return Status::OK();
+ }
+
+ Status DoGet(const Ticket& request,
+ std::unique_ptr<FlightDataStream>* data_stream) override {
+ std::shared_ptr<RecordBatchReader> batch_reader;
+ RETURN_NOT_OK(GetBatchForFlight(request, &batch_reader));
+
+ *data_stream = std::unique_ptr<FlightDataStream>(new RecordBatchStream(batch_reader));
+ return Status::OK();
+ }
+
+ Status RunAction1(const Action& action, std::unique_ptr<ResultStream>* out) {
+ std::vector<Result> results;
+ for (int i = 0; i < 3; ++i) {
+ Result result;
+ std::string value = action.body->ToString() + "-part" + std::to_string(i);
+ RETURN_NOT_OK(Buffer::FromString(value, &result.body));
+ results.push_back(result);
+ }
+ *out = std::unique_ptr<ResultStream>(new SimpleResultStream(std::move(results)));
+ return Status::OK();
+ }
+
+ Status RunAction2(std::unique_ptr<ResultStream>* out) {
+ // Empty
+ *out = std::unique_ptr<ResultStream>(new SimpleResultStream({}));
+ return Status::OK();
+ }
+
+ Status DoAction(const Action& action, std::unique_ptr<ResultStream>* out) override {
+ if (action.type == "action1") {
+ return RunAction1(action, out);
+ } else if (action.type == "action2") {
+ return RunAction2(out);
+ } else {
+ return Status::NotImplemented(action.type);
+ }
+ }
+
+ Status ListActions(std::vector<ActionType>* out) override {
+ std::vector<ActionType> actions = ExampleActionTypes();
+ *out = std::move(actions);
+ return Status::OK();
+ }
+};
+
+} // namespace flight
+} // namespace arrow
+
+std::unique_ptr<arrow::flight::FlightTestServer> g_server;
+
+void Shutdown(int signal) {
+ if (g_server != nullptr) {
+ g_server->Shutdown();
+ }
+}
+
+int main(int argc, char** argv) {
+ gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+ // SIGTERM shuts down the server
+ signal(SIGTERM, Shutdown);
+
+ g_server.reset(new arrow::flight::FlightTestServer);
+
+ // TODO(wesm): How can we tell if the server failed to start for some reason?
+ g_server->Run(FLAGS_port);
+ return 0;
+}
diff --git a/cpp/src/arrow/flight/test-util.h b/cpp/src/arrow/flight/test-util.h
new file mode 100644
index 0000000..4a12997
--- /dev/null
+++ b/cpp/src/arrow/flight/test-util.h
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstdio>
+#include <cstring>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <boost/process.hpp>
+
+#include "arrow/ipc/test-common.h"
+#include "arrow/status.h"
+#include "arrow/test-util.h"
+
+#include "arrow/flight/api.h"
+#include "arrow/flight/internal.h"
+
+namespace bp = boost::process;
+
+namespace arrow {
+namespace flight {
+
+// ----------------------------------------------------------------------
+// Fixture to use for running test servers
+
+struct TestServer {
+ public:
+ explicit TestServer(const std::string& executable_name, int port)
+ : executable_name_(executable_name), port_(port) {}
+
+ void Start() {
+ std::string str_port = std::to_string(port_);
+ server_process_.reset(
+ new bp::child(bp::search_path(executable_name_), "-port", str_port));
+ std::cout << "Server running with pid " << server_process_->id() << std::endl;
+ }
+
+ int Stop() {
+ kill(server_process_->id(), SIGTERM);
+ server_process_->wait();
+ return server_process_->exit_code();
+ }
+
+ bool IsRunning() { return server_process_->running(); }
+
+ int port() const { return port_; }
+
+ private:
+ std::string executable_name_;
+ int port_;
+ std::unique_ptr<bp::child> server_process_;
+};
+
+// ----------------------------------------------------------------------
+// A RecordBatchReader for serving a sequence of in-memory record batches
+
+class BatchIterator : public RecordBatchReader {
+ public:
+ BatchIterator(const std::shared_ptr<Schema>& schema,
+ const std::vector<std::shared_ptr<RecordBatch>>& batches)
+ : schema_(schema), batches_(batches), position_(0) {}
+
+ std::shared_ptr<Schema> schema() const override { return schema_; }
+
+ Status ReadNext(std::shared_ptr<RecordBatch>* out) override {
+ if (position_ >= batches_.size()) {
+ *out = nullptr;
+ } else {
+ *out = batches_[position_++];
+ }
+ return Status::OK();
+ }
+
+ private:
+ std::shared_ptr<Schema> schema_;
+ std::vector<std::shared_ptr<RecordBatch>> batches_;
+ size_t position_;
+};
+
+// ----------------------------------------------------------------------
+// Example data for test-server and unit tests
+
+using BatchVector = std::vector<std::shared_ptr<RecordBatch>>;
+
+std::shared_ptr<Schema> ExampleSchema1() {
+ auto f0 = field("f0", int32());
+ auto f1 = field("f1", int32());
+ return ::arrow::schema({f0, f1});
+}
+
+std::shared_ptr<Schema> ExampleSchema2() {
+ auto f0 = field("f0", utf8());
+ auto f1 = field("f1", binary());
+ return ::arrow::schema({f0, f1});
+}
+
+Status MakeFlightInfo(const Schema& schema, const FlightDescriptor& descriptor,
+ const std::vector<FlightEndpoint>& endpoints,
+ uint64_t total_records, uint64_t total_bytes,
+ FlightInfo::Data* out) {
+ out->descriptor = descriptor;
+ out->endpoints = endpoints;
+ out->total_records = total_records;
+ out->total_bytes = total_bytes;
+ return internal::SchemaToString(schema, &out->schema);
+}
+
+std::vector<FlightInfo> ExampleFlightInfo() {
+ FlightEndpoint endpoint1({{"ticket-id-1"}, {{"foo1.bar.com", 92385}}});
+ FlightEndpoint endpoint2({{"ticket-id-2"}, {{"foo2.bar.com", 92385}}});
+ FlightEndpoint endpoint3({{"ticket-id-3"}, {{"foo3.bar.com", 92385}}});
+ FlightDescriptor descr1{FlightDescriptor::PATH, "", {"foo", "bar"}};
+ FlightDescriptor descr2{FlightDescriptor::CMD, "my_command", {}};
+
+ auto schema1 = ExampleSchema1();
+ auto schema2 = ExampleSchema2();
+
+ FlightInfo::Data flight1, flight2;
+ EXPECT_OK(
+ MakeFlightInfo(*schema1, descr1, {endpoint1, endpoint2}, 1000, 100000, &flight1));
+ EXPECT_OK(MakeFlightInfo(*schema2, descr2, {endpoint3}, 1000, 100000, &flight2));
+ return {FlightInfo(flight1), FlightInfo(flight2)};
+}
+
+Status SimpleIntegerBatches(const int num_batches, BatchVector* out) {
+ std::shared_ptr<RecordBatch> batch;
+ for (int i = 0; i < num_batches; ++i) {
+ // Make all different sizes, use different random seed
+ RETURN_NOT_OK(ipc::MakeIntBatchSized(10 + i, &batch, i));
+ out->push_back(batch);
+ }
+ return Status::OK();
+}
+
+std::vector<ActionType> ExampleActionTypes() {
+ return {{"drop", "drop a dataset"}, {"cache", "cache a dataset"}};
+}
+
+} // namespace flight
+} // namespace arrow
diff --git a/cpp/src/arrow/flight/types.cc b/cpp/src/arrow/flight/types.cc
new file mode 100644
index 0000000..8c7588d
--- /dev/null
+++ b/cpp/src/arrow/flight/types.cc
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/flight/types.h"
+
+#include <memory>
+#include <sstream>
+#include <string>
+#include <utility>
+
+#include "arrow/io/memory.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/status.h"
+
+namespace arrow {
+namespace flight {
+
+Status FlightInfo::GetSchema(std::shared_ptr<Schema>* out) const {
+ if (reconstructed_schema_) {
+ *out = schema_;
+ return Status::OK();
+ }
+ /// XXX(wesm): arrow::ipc::ReadSchema in its current form will not suffice
+ /// for reading schemas with dictionaries. See ARROW-3144
+ io::BufferReader schema_reader(reinterpret_cast<const uint8_t*>(data_.schema.c_str()),
+ static_cast<int64_t>(data_.schema.size()));
+ RETURN_NOT_OK(ipc::ReadSchema(&schema_reader, &schema_));
+ reconstructed_schema_ = true;
+ *out = schema_;
+ return Status::OK();
+}
+
+SimpleFlightListing::SimpleFlightListing(const std::vector<FlightInfo>& flights)
+ : position_(0), flights_(flights) {}
+
+SimpleFlightListing::SimpleFlightListing(std::vector<FlightInfo>&& flights)
+ : position_(0), flights_(std::move(flights)) {}
+
+Status SimpleFlightListing::Next(std::unique_ptr<FlightInfo>* info) {
+ if (position_ >= static_cast<int>(flights_.size())) {
+ *info = nullptr;
+ return Status::OK();
+ }
+ *info = std::unique_ptr<FlightInfo>(new FlightInfo(std::move(flights_[position_++])));
+ return Status::OK();
+}
+
+SimpleResultStream::SimpleResultStream(std::vector<Result>&& results)
+ : results_(std::move(results)), position_(0) {}
+
+Status SimpleResultStream::Next(std::unique_ptr<Result>* result) {
+ if (position_ >= results_.size()) {
+ *result = nullptr;
+ return Status::OK();
+ }
+ *result = std::unique_ptr<Result>(new Result(std::move(results_[position_++])));
+ return Status::OK();
+}
+
+} // namespace flight
+} // namespace arrow
diff --git a/cpp/src/arrow/flight/types.h b/cpp/src/arrow/flight/types.h
new file mode 100644
index 0000000..0362105
--- /dev/null
+++ b/cpp/src/arrow/flight/types.h
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Data structure for Flight RPC. API should be considered experimental for now
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class Buffer;
+class Schema;
+class Status;
+
+namespace flight {
+
+/// \brief A type of action that can be performed with the DoAction RPC
+struct ActionType {
+ /// Name of action
+ std::string type;
+
+ /// Opaque action description
+ std::string description;
+};
+
+/// \brief Opaque selection critera for ListFlights RPC
+struct Criteria {
+ /// Opaque criteria expression, dependent on server implementation
+ std::string expression;
+};
+
+/// \brief An action to perform with the DoAction RPC
+struct Action {
+ /// The action type
+ std::string type;
+
+ /// The action content as a Buffer
+ std::shared_ptr<Buffer> body;
+};
+
+/// \brief Opaque result returned after executing an action
+struct Result {
+ std::shared_ptr<Buffer> body;
+};
+
+/// \brief A message received after completing a DoPut stream
+struct PutResult {};
+
+/// \brief A request to retrieve or generate a dataset
+struct FlightDescriptor {
+ enum DescriptorType {
+ UNKNOWN = 0, /// Unused
+ PATH = 1, /// Named path identifying a dataset
+ CMD = 2 /// Opaque command to generate a dataset
+ };
+
+ /// The descriptor type
+ DescriptorType type;
+
+ /// Opaque value used to express a command. Should only be defined when type
+ /// is CMD
+ std::string cmd;
+
+ /// List of strings identifying a particular dataset. Should only be defined
+ /// when type is PATH
+ std::vector<std::string> path;
+};
+
+/// \brief Data structure providing an opaque identifier or credential to use
+/// when requesting a data stream with the DoGet RPC
+struct Ticket {
+ std::string ticket;
+};
+
+/// \brief A host location (hostname and port)
+struct Location {
+ std::string host;
+ int32_t port;
+};
+
+/// \brief A flight ticket and list of locations where the ticket can be
+/// redeemed
+struct FlightEndpoint {
+ /// Opaque ticket identify; use with DoGet RPC
+ Ticket ticket;
+
+ /// List of locations where ticket can be redeemed. If the list is empty, the
+ /// ticket can only be redeemed on the current service where the ticket was
+ /// generated
+ std::vector<Location> locations;
+};
+
+/// \brief The access coordinates for retireval of a dataset, returned by
+/// GetFlightInfo
+class FlightInfo {
+ public:
+ struct Data {
+ std::string schema;
+ FlightDescriptor descriptor;
+ std::vector<FlightEndpoint> endpoints;
+ uint64_t total_records;
+ uint64_t total_bytes;
+ };
+
+ explicit FlightInfo(const Data& data) : data_(data), reconstructed_schema_(false) {}
+ explicit FlightInfo(Data&& data)
+ : data_(std::move(data)), reconstructed_schema_(false) {}
+
+ /// Deserialize the Arrow schema of the dataset, to be passed to each call to
+ /// DoGet
+ Status GetSchema(std::shared_ptr<Schema>* out) const;
+
+ const std::string& serialized_schema() const { return data_.schema; }
+
+ /// The descriptor associated with this flight, may not be set
+ const FlightDescriptor& descriptor() const { return data_.descriptor; }
+
+ /// A list of endpoints associated with the flight (dataset). To consume the
+ /// whole flight, all endpoints must be consumed
+ const std::vector<FlightEndpoint>& endpoints() const { return data_.endpoints; }
+
+ /// The total number of records (rows) in the dataset. If unknown, set to -1
+ uint64_t total_records() const { return data_.total_records; }
+
+ /// The total number of bytes in the dataset. If unknown, set to -1
+ uint64_t total_bytes() const { return data_.total_bytes; }
+
+ private:
+ Data data_;
+ mutable std::shared_ptr<Schema> schema_;
+ mutable bool reconstructed_schema_;
+};
+
+// TODO(wesm): NYI
+class ARROW_EXPORT FlightPutWriter {
+ public:
+ virtual ~FlightPutWriter() = default;
+};
+
+/// \brief An iterator to FlightInfo instances returned by ListFlights
+class ARROW_EXPORT FlightListing {
+ public:
+ virtual ~FlightListing() = default;
+
+ /// \brief Retrieve the next FlightInfo from the iterator. Returns nullptr
+ /// when there are none left
+ /// \param[out] info a single FlightInfo
+ /// \return Status
+ virtual Status Next(std::unique_ptr<FlightInfo>* info) = 0;
+};
+
+/// \brief An iterator to Result instances returned by DoAction
+class ARROW_EXPORT ResultStream {
+ public:
+ virtual ~ResultStream() = default;
+
+ /// \brief Retrieve the next Result from the iterator. Returns nullptr
+ /// when there are none left
+ /// \param[out] info a single Result
+ /// \return Status
+ virtual Status Next(std::unique_ptr<Result>* info) = 0;
+};
+
+// \brief Create a FlightListing from a vector of FlightInfo objects. This can
+// be iterated once, then it is consumed
+class ARROW_EXPORT SimpleFlightListing : public FlightListing {
+ public:
+ explicit SimpleFlightListing(const std::vector<FlightInfo>& flights);
+ explicit SimpleFlightListing(std::vector<FlightInfo>&& flights);
+
+ Status Next(std::unique_ptr<FlightInfo>* info) override;
+
+ private:
+ int position_;
+ std::vector<FlightInfo> flights_;
+};
+
+class ARROW_EXPORT SimpleResultStream : public ResultStream {
+ public:
+ explicit SimpleResultStream(std::vector<Result>&& results);
+ Status Next(std::unique_ptr<Result>* result) override;
+
+ private:
+ std::vector<Result> results_;
+ size_t position_;
+};
+
+} // namespace flight
+} // namespace arrow
diff --git a/cpp/src/arrow/gpu/cuda-test.cc b/cpp/src/arrow/gpu/cuda-test.cc
index 5eb5cd7..cb37545 100644
--- a/cpp/src/arrow/gpu/cuda-test.cc
+++ b/cpp/src/arrow/gpu/cuda-test.cc
@@ -340,7 +340,7 @@ TEST_F(TestCudaArrowIpc, BasicWriteRead) {
io::BufferReader cpu_reader(host_buffer);
ASSERT_OK(ipc::ReadRecordBatch(batch->schema(), &cpu_reader, &cpu_batch));
- ipc::CompareBatch(*batch, *cpu_batch);
+ CompareBatch(*batch, *cpu_batch);
}
} // namespace gpu
diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc
index 4ee0c34..d77248a 100644
--- a/cpp/src/arrow/ipc/message.cc
+++ b/cpp/src/arrow/ipc/message.cc
@@ -73,6 +73,8 @@ class Message::MessageImpl {
const void* header() const { return message_->header(); }
+ int64_t body_length() const { return message_->bodyLength(); }
+
std::shared_ptr<Buffer> body() const { return body_; }
std::shared_ptr<Buffer> metadata() const { return metadata_; }
@@ -101,6 +103,8 @@ Message::~Message() {}
std::shared_ptr<Buffer> Message::body() const { return impl_->body(); }
+int64_t Message::body_length() const { return impl_->body_length(); }
+
std::shared_ptr<Buffer> Message::metadata() const { return impl_->metadata(); }
Message::Type Message::type() const { return impl_->type(); }
diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h
index d150eab..08176ab 100644
--- a/cpp/src/arrow/ipc/message.h
+++ b/cpp/src/arrow/ipc/message.h
@@ -125,6 +125,10 @@ class ARROW_EXPORT Message {
/// \return buffer is null if no body
std::shared_ptr<Buffer> body() const;
+ /// \brief The expected body length according to the metadata, for
+ /// verification purposes
+ int64_t body_length() const;
+
/// \brief The Message type
Type type() const;
diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc
index 530262d..0bf18f3 100644
--- a/cpp/src/arrow/ipc/metadata-internal.cc
+++ b/cpp/src/arrow/ipc/metadata-internal.cc
@@ -960,6 +960,8 @@ Status WriteMessage(const Buffer& message, io::OutputStream* file,
int64_t start_offset;
RETURN_NOT_OK(file->Tell(&start_offset));
+ // TODO(wesm): Should we depend on the position of the OutputStream? See
+ // ARROW-3212
int32_t padded_message_length = static_cast<int32_t>(message.size()) + 4;
const int32_t remainder =
(padded_message_length + static_cast<int32_t>(start_offset)) % 8;
diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h
index 380f3c9..730a1a5 100644
--- a/cpp/src/arrow/ipc/metadata-internal.h
+++ b/cpp/src/arrow/ipc/metadata-internal.h
@@ -106,7 +106,7 @@ Status WriteMessage(const Buffer& message, io::OutputStream* file,
// Serialize arrow::Schema as a Flatbuffer
//
// \param[in] schema a Schema instance
-// \param[inout] dictionary_memo class for tracking dictionaries and assigning
+// \param[in,out] dictionary_memo class for tracking dictionaries and assigning
// dictionary ids
// \param[out] out the serialized arrow::Buffer
// \return Status outcome
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 5ffbe6f..92cf75b 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -578,6 +578,9 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl {
std::unique_ptr<Message> message;
RETURN_NOT_OK(ReadMessage(block.offset, block.metadata_length, file_, &message));
+ // TODO(wesm): this breaks integration tests, see ARROW-3256
+ // DCHECK_EQ(message->body_length(), block.body_length);
+
io::BufferReader reader(message->body());
return ::arrow::ipc::ReadRecordBatch(*message->metadata(), schema_, &reader, batch);
}
@@ -596,6 +599,9 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl {
std::unique_ptr<Message> message;
RETURN_NOT_OK(ReadMessage(block.offset, block.metadata_length, file_, &message));
+ // TODO(wesm): this breaks integration tests, see ARROW-3256
+ // DCHECK_EQ(message->body_length(), block.body_length);
+
io::BufferReader reader(message->body());
std::shared_ptr<Array> dictionary;
diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h
index 299f050..4f7de26 100644
--- a/cpp/src/arrow/ipc/test-common.h
+++ b/cpp/src/arrow/ipc/test-common.h
@@ -39,36 +39,6 @@
namespace arrow {
namespace ipc {
-static inline void AssertSchemaEqual(const Schema& lhs, const Schema& rhs) {
- if (!lhs.Equals(rhs)) {
- std::stringstream ss;
- ss << "left schema: " << lhs.ToString() << std::endl
- << "right schema: " << rhs.ToString() << std::endl;
- FAIL() << ss.str();
- }
-}
-
-static inline void CompareBatch(const RecordBatch& left, const RecordBatch& right) {
- if (!left.schema()->Equals(*right.schema())) {
- FAIL() << "Left schema: " << left.schema()->ToString()
- << "\nRight schema: " << right.schema()->ToString();
- }
- ASSERT_EQ(left.num_columns(), right.num_columns())
- << left.schema()->ToString() << " result: " << right.schema()->ToString();
- ASSERT_EQ(left.num_rows(), right.num_rows());
- for (int i = 0; i < left.num_columns(); ++i) {
- if (!left.column(i)->Equals(right.column(i))) {
- std::stringstream ss;
- ss << "Idx: " << i << " Name: " << left.column_name(i);
- ss << std::endl << "Left: ";
- ASSERT_OK(PrettyPrint(*left.column(i), 0, &ss));
- ss << std::endl << "Right: ";
- ASSERT_OK(PrettyPrint(*right.column(i), 0, &ss));
- FAIL() << ss.str();
- }
- }
-}
-
static inline void CompareArraysDetailed(int index, const Array& result,
const Array& expected) {
if (!expected.Equals(result)) {
@@ -96,9 +66,9 @@ const auto kListInt32 = list(int32());
const auto kListListInt32 = list(kListInt32);
Status MakeRandomInt32Array(int64_t length, bool include_nulls, MemoryPool* pool,
- std::shared_ptr<Array>* out) {
+ std::shared_ptr<Array>* out, uint32_t seed = 0) {
std::shared_ptr<ResizableBuffer> data;
- RETURN_NOT_OK(MakeRandomInt32Buffer(length, pool, &data));
+ RETURN_NOT_OK(MakeRandomBuffer<int32_t>(length, pool, &data, seed));
Int32Builder builder(int32(), pool);
RETURN_NOT_OK(builder.Resize(length));
if (include_nulls) {
@@ -195,7 +165,8 @@ Status MakeBooleanBatch(std::shared_ptr<RecordBatch>* out) {
return MakeBooleanBatchSized(1000, out);
}
-Status MakeIntBatchSized(int length, std::shared_ptr<RecordBatch>* out) {
+Status MakeIntBatchSized(int length, std::shared_ptr<RecordBatch>* out,
+ uint32_t seed = 0) {
// Make the schema
auto f0 = field("f0", int32());
auto f1 = field("f1", int32());
@@ -204,8 +175,8 @@ Status MakeIntBatchSized(int length, std::shared_ptr<RecordBatch>* out) {
// Example data
std::shared_ptr<Array> a0, a1;
MemoryPool* pool = default_memory_pool();
- RETURN_NOT_OK(MakeRandomInt32Array(length, false, pool, &a0));
- RETURN_NOT_OK(MakeRandomInt32Array(length, true, pool, &a1));
+ RETURN_NOT_OK(MakeRandomInt32Array(length, false, pool, &a0, seed));
+ RETURN_NOT_OK(MakeRandomInt32Array(length, true, pool, &a1, seed + 1));
*out = RecordBatch::Make(schema, length, {a0, a1});
return Status::OK();
}
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index ec6c5d0..60ca34e 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -96,11 +96,14 @@ static inline bool NeedTruncate(int64_t offset, const Buffer* buffer,
return offset != 0 || min_length < buffer->size();
}
+namespace internal {
+
class RecordBatchSerializer : public ArrayVisitor {
public:
RecordBatchSerializer(MemoryPool* pool, int64_t buffer_start_offset,
- int max_recursion_depth, bool allow_64bit)
- : pool_(pool),
+ int max_recursion_depth, bool allow_64bit, IpcPayload* out)
+ : out_(out),
+ pool_(pool),
max_recursion_depth_(max_recursion_depth),
buffer_start_offset_(buffer_start_offset),
allow_64bit_(allow_64bit) {
@@ -125,19 +128,25 @@ class RecordBatchSerializer : public ArrayVisitor {
std::shared_ptr<Buffer> bitmap;
RETURN_NOT_OK(GetTruncatedBitmap(arr.offset(), arr.length(), arr.null_bitmap(),
pool_, &bitmap));
- buffers_.push_back(bitmap);
+ out_->body_buffers.emplace_back(bitmap);
} else {
// Push a dummy zero-length buffer, not to be copied
- buffers_.push_back(std::make_shared<Buffer>(nullptr, 0));
+ out_->body_buffers.emplace_back(std::make_shared<Buffer>(nullptr, 0));
}
return arr.Accept(this);
}
- Status Assemble(const RecordBatch& batch, int64_t* body_length) {
+ // Override this for writing dictionary metadata
+ virtual Status SerializeMetadata(int64_t num_rows) {
+ return WriteRecordBatchMessage(num_rows, out_->body_length, field_nodes_,
+ buffer_meta_, &out_->metadata);
+ }
+
+ Status Assemble(const RecordBatch& batch) {
if (field_nodes_.size() > 0) {
field_nodes_.clear();
buffer_meta_.clear();
- buffers_.clear();
+ out_->body_buffers.clear();
}
// Perform depth-first traversal of the row-batch
@@ -149,11 +158,11 @@ class RecordBatchSerializer : public ArrayVisitor {
// reference. May be 0 or some other position in an address space
int64_t offset = buffer_start_offset_;
- buffer_meta_.reserve(buffers_.size());
+ buffer_meta_.reserve(out_->body_buffers.size());
// Construct the buffer metadata for the record batch header
- for (size_t i = 0; i < buffers_.size(); ++i) {
- const Buffer* buffer = buffers_[i].get();
+ for (size_t i = 0; i < out_->body_buffers.size(); ++i) {
+ const Buffer* buffer = out_->body_buffers[i].get();
int64_t size = 0;
int64_t padding = 0;
@@ -167,69 +176,15 @@ class RecordBatchSerializer : public ArrayVisitor {
offset += size + padding;
}
- *body_length = offset - buffer_start_offset_;
- DCHECK(BitUtil::IsMultipleOf8(*body_length));
-
- return Status::OK();
- }
-
- // Override this for writing dictionary metadata
- virtual Status WriteMetadataMessage(int64_t num_rows, int64_t body_length,
- std::shared_ptr<Buffer>* out) {
- return WriteRecordBatchMessage(num_rows, body_length, field_nodes_, buffer_meta_,
- out);
- }
-
- Status Write(const RecordBatch& batch, io::OutputStream* dst, int32_t* metadata_length,
- int64_t* body_length) {
- RETURN_NOT_OK(Assemble(batch, body_length));
-
-#ifndef NDEBUG
- int64_t start_position, current_position;
- RETURN_NOT_OK(dst->Tell(&start_position));
-#endif
+ out_->body_length = offset - buffer_start_offset_;
+ DCHECK(BitUtil::IsMultipleOf8(out_->body_length));
// Now that we have computed the locations of all of the buffers in shared
// memory, the data header can be converted to a flatbuffer and written out
//
// Note: The memory written here is prefixed by the size of the flatbuffer
// itself as an int32_t.
- std::shared_ptr<Buffer> metadata_fb;
- RETURN_NOT_OK(WriteMetadataMessage(batch.num_rows(), *body_length, &metadata_fb));
- RETURN_NOT_OK(internal::WriteMessage(*metadata_fb, dst, metadata_length));
-
-#ifndef NDEBUG
- RETURN_NOT_OK(dst->Tell(¤t_position));
- DCHECK(BitUtil::IsMultipleOf8(current_position));
-#endif
-
- // Now write the buffers
- for (size_t i = 0; i < buffers_.size(); ++i) {
- const Buffer* buffer = buffers_[i].get();
- int64_t size = 0;
- int64_t padding = 0;
-
- // The buffer might be null if we are handling zero row lengths.
- if (buffer) {
- size = buffer->size();
- padding = BitUtil::RoundUpToMultipleOf8(size) - size;
- }
-
- if (size > 0) {
- RETURN_NOT_OK(dst->Write(buffer->data(), size));
- }
-
- if (padding > 0) {
- RETURN_NOT_OK(dst->Write(kPaddingBytes, padding));
- }
- }
-
-#ifndef NDEBUG
- RETURN_NOT_OK(dst->Tell(¤t_position));
- DCHECK(BitUtil::IsMultipleOf8(current_position));
-#endif
-
- return Status::OK();
+ return SerializeMetadata(batch.num_rows());
}
protected:
@@ -251,7 +206,7 @@ class RecordBatchSerializer : public ArrayVisitor {
data->size() - byte_offset);
data = SliceBuffer(data, byte_offset, buffer_length);
}
- buffers_.push_back(data);
+ out_->body_buffers.emplace_back(data);
return Status::OK();
}
@@ -303,8 +258,8 @@ class RecordBatchSerializer : public ArrayVisitor {
data = SliceBuffer(data, start_offset, slice_length);
}
- buffers_.push_back(value_offsets);
- buffers_.push_back(data);
+ out_->body_buffers.emplace_back(value_offsets);
+ out_->body_buffers.emplace_back(data);
return Status::OK();
}
@@ -312,12 +267,12 @@ class RecordBatchSerializer : public ArrayVisitor {
std::shared_ptr<Buffer> data;
RETURN_NOT_OK(
GetTruncatedBitmap(array.offset(), array.length(), array.values(), pool_, &data));
- buffers_.push_back(data);
+ out_->body_buffers.emplace_back(data);
return Status::OK();
}
Status Visit(const NullArray& array) override {
- buffers_.push_back(nullptr);
+ out_->body_buffers.emplace_back(nullptr);
return Status::OK();
}
@@ -352,7 +307,7 @@ class RecordBatchSerializer : public ArrayVisitor {
Status Visit(const ListArray& array) override {
std::shared_ptr<Buffer> value_offsets;
RETURN_NOT_OK(GetZeroBasedValueOffsets<ListArray>(array, &value_offsets));
- buffers_.push_back(value_offsets);
+ out_->body_buffers.emplace_back(value_offsets);
--max_recursion_depth_;
std::shared_ptr<Array> values = array.values();
@@ -390,7 +345,7 @@ class RecordBatchSerializer : public ArrayVisitor {
std::shared_ptr<Buffer> type_ids;
RETURN_NOT_OK(GetTruncatedBuffer<UnionArray::type_id_t>(
offset, length, array.type_ids(), pool_, &type_ids));
- buffers_.push_back(type_ids);
+ out_->body_buffers.emplace_back(type_ids);
--max_recursion_depth_;
if (array.mode() == UnionMode::DENSE) {
@@ -449,7 +404,7 @@ class RecordBatchSerializer : public ArrayVisitor {
value_offsets = shifted_offsets_buffer;
}
- buffers_.push_back(value_offsets);
+ out_->body_buffers.emplace_back(value_offsets);
// Visit children and slice accordingly
for (int i = 0; i < type.num_children(); ++i) {
@@ -487,12 +442,14 @@ class RecordBatchSerializer : public ArrayVisitor {
return array.indices()->Accept(this);
}
+ // Destination for output buffers
+ IpcPayload* out_;
+
// In some cases, intermediate buffers may need to be allocated (with sliced arrays)
MemoryPool* pool_;
std::vector<internal::FieldMetadata> field_nodes_;
std::vector<internal::BufferMetadata> buffer_meta_;
- std::vector<std::shared_ptr<Buffer>> buffers_;
int64_t max_recursion_depth_;
int64_t buffer_start_offset_;
@@ -501,29 +458,79 @@ class RecordBatchSerializer : public ArrayVisitor {
class DictionaryWriter : public RecordBatchSerializer {
public:
- using RecordBatchSerializer::RecordBatchSerializer;
-
- Status WriteMetadataMessage(int64_t num_rows, int64_t body_length,
- std::shared_ptr<Buffer>* out) override {
- return WriteDictionaryMessage(dictionary_id_, num_rows, body_length, field_nodes_,
- buffer_meta_, out);
+ DictionaryWriter(int64_t dictionary_id, MemoryPool* pool, int64_t buffer_start_offset,
+ int max_recursion_depth, bool allow_64bit, IpcPayload* out)
+ : RecordBatchSerializer(pool, buffer_start_offset, max_recursion_depth, allow_64bit,
+ out),
+ dictionary_id_(dictionary_id) {}
+
+ Status SerializeMetadata(int64_t num_rows) override {
+ return WriteDictionaryMessage(dictionary_id_, num_rows, out_->body_length,
+ field_nodes_, buffer_meta_, &out_->metadata);
}
- Status Write(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,
- io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length) {
- dictionary_id_ = dictionary_id;
-
+ Status Assemble(const std::shared_ptr<Array>& dictionary) {
// Make a dummy record batch. A bit tedious as we have to make a schema
auto schema = arrow::schema({arrow::field("dictionary", dictionary->type())});
auto batch = RecordBatch::Make(schema, dictionary->length(), {dictionary});
- return RecordBatchSerializer::Write(*batch, dst, metadata_length, body_length);
+ return RecordBatchSerializer::Assemble(*batch);
}
private:
- // TODO(wesm): Setting this in Write is a bit unclean, but it works
int64_t dictionary_id_;
};
+Status WriteIpcPayload(const IpcPayload& payload, io::OutputStream* dst,
+ int32_t* metadata_length) {
+#ifndef NDEBUG
+ int64_t start_position, current_position;
+ RETURN_NOT_OK(dst->Tell(&start_position));
+#endif
+
+ RETURN_NOT_OK(internal::WriteMessage(*payload.metadata, dst, metadata_length));
+
+#ifndef NDEBUG
+ RETURN_NOT_OK(dst->Tell(¤t_position));
+ DCHECK(BitUtil::IsMultipleOf8(current_position));
+#endif
+
+ // Now write the buffers
+ for (size_t i = 0; i < payload.body_buffers.size(); ++i) {
+ const Buffer* buffer = payload.body_buffers[i].get();
+ int64_t size = 0;
+ int64_t padding = 0;
+
+ // The buffer might be null if we are handling zero row lengths.
+ if (buffer) {
+ size = buffer->size();
+ padding = BitUtil::RoundUpToMultipleOf8(size) - size;
+ }
+
+ if (size > 0) {
+ RETURN_NOT_OK(dst->Write(buffer->data(), size));
+ }
+
+ if (padding > 0) {
+ RETURN_NOT_OK(dst->Write(kPaddingBytes, padding));
+ }
+ }
+
+#ifndef NDEBUG
+ RETURN_NOT_OK(dst->Tell(¤t_position));
+ DCHECK(BitUtil::IsMultipleOf8(current_position));
+#endif
+
+ return Status::OK();
+}
+
+Status GetRecordBatchPayload(const RecordBatch& batch, MemoryPool* pool,
+ IpcPayload* out) {
+ RecordBatchSerializer writer(pool, 0, kMaxNestingDepth, true, out);
+ return writer.Assemble(batch);
+}
+
+} // namespace internal
+
// Adds padding bytes if necessary to ensure all memory blocks are written on
// 64-byte boundaries.
Status AlignStreamPosition(io::OutputStream* stream) {
@@ -540,9 +547,18 @@ Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
io::OutputStream* dst, int32_t* metadata_length,
int64_t* body_length, MemoryPool* pool, int max_recursion_depth,
bool allow_64bit) {
- RecordBatchSerializer writer(pool, buffer_start_offset, max_recursion_depth,
- allow_64bit);
- return writer.Write(batch, dst, metadata_length, body_length);
+ internal::IpcPayload payload;
+ internal::RecordBatchSerializer writer(pool, buffer_start_offset, max_recursion_depth,
+ allow_64bit, &payload);
+ RETURN_NOT_OK(writer.Assemble(batch));
+
+ // TODO(wesm): it's a rough edge that the metadata and body length here are
+ // computed separately
+
+ // The body size is computed in the payload
+ *body_length = payload.body_length;
+
+ return internal::WriteIpcPayload(payload, dst, metadata_length);
}
Status WriteRecordBatchStream(const std::vector<std::shared_ptr<RecordBatch>>& batches,
@@ -675,8 +691,14 @@ Status GetTensorMessage(const Tensor& tensor, MemoryPool* pool,
Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,
int64_t buffer_start_offset, io::OutputStream* dst,
int32_t* metadata_length, int64_t* body_length, MemoryPool* pool) {
- DictionaryWriter writer(pool, buffer_start_offset, kMaxNestingDepth, false);
- return writer.Write(dictionary_id, dictionary, dst, metadata_length, body_length);
+ internal::IpcPayload payload;
+ internal::DictionaryWriter writer(dictionary_id, pool, buffer_start_offset,
+ kMaxNestingDepth, true, &payload);
+ RETURN_NOT_OK(writer.Assemble(dictionary));
+
+ // The body size is computed in the payload
+ *body_length = payload.body_length;
+ return internal::WriteIpcPayload(payload, dst, metadata_length);
}
Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) {
diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h
index 6dbf29d..bcf09aa 100644
--- a/cpp/src/arrow/ipc/writer.h
+++ b/cpp/src/arrow/ipc/writer.h
@@ -269,6 +269,36 @@ ARROW_EXPORT
Status WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t* metadata_length,
int64_t* body_length);
+namespace internal {
+
+// These internal APIs may change without warning or deprecation
+
+// Intermediate data structure with metadata header plus zero or more buffers
+// for the message body. This data can either be written out directly as an
+// encapsulated IPC message or used with Flight RPCs
+struct IpcPayload {
+ Message::Type type;
+ std::shared_ptr<Buffer> metadata;
+ std::vector<std::shared_ptr<Buffer>> body_buffers;
+ int64_t body_length;
+};
+
+/// \brief Extract IPC payloads from given schema for purposes of wire
+/// transport, separate from using the *StreamWriter classes
+ARROW_EXPORT
+Status GetDictionaryPayloads(const Schema& schema,
+ std::vector<std::unique_ptr<IpcPayload>>* out);
+
+/// \brief Compute IpcPayload for the given record batch
+/// \param[in] batch the RecordBatch that is being serialized
+/// \param[in,out] pool for any required temporary memory allocations
+/// \param[out] out the returned IpcPayload
+/// \return Status
+ARROW_EXPORT
+Status GetRecordBatchPayload(const RecordBatch& batch, MemoryPool* pool, IpcPayload* out);
+
+} // namespace internal
+
} // namespace ipc
} // namespace arrow
diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h
index 1a50a07..b7179a3 100644
--- a/cpp/src/arrow/test-util.h
+++ b/cpp/src/arrow/test-util.h
@@ -18,7 +18,14 @@
#ifndef ARROW_TEST_UTIL_H_
#define ARROW_TEST_UTIL_H_
+#ifndef _WIN32
+#include <sys/stat.h>
+#include <sys/wait.h>
+#include <unistd.h>
+#endif
+
#include <algorithm>
+#include <chrono>
#include <cstdint>
#include <cstdlib>
#include <iostream>
@@ -27,6 +34,7 @@
#include <random>
#include <sstream>
#include <string>
+#include <thread>
#include <vector>
#include <gtest/gtest.h>
@@ -44,6 +52,11 @@
#include "arrow/util/decimal.h"
#include "arrow/util/logging.h"
+static inline void sleep_for(double seconds) {
+ std::this_thread::sleep_for(
+ std::chrono::nanoseconds(static_cast<int64_t>(seconds * 1e9)));
+}
+
#define STRINGIFY(x) #x
#define ASSERT_RAISES(ENUM, expr) \
@@ -272,6 +285,17 @@ void rand_uniform_int(int64_t n, uint32_t seed, T min_value, T max_value, U* out
std::generate(out, out + n, [&d, &gen] { return static_cast<U>(d(gen)); });
}
+template <typename T, typename Enable = void>
+struct GenerateRandom {};
+
+template <typename T>
+struct GenerateRandom<T, typename std::enable_if<std::is_integral<T>::value>::type> {
+ static void Gen(int64_t length, uint32_t seed, void* out) {
+ rand_uniform_int(length, seed, std::numeric_limits<T>::min(),
+ std::numeric_limits<T>::max(), reinterpret_cast<T*>(out));
+ }
+};
+
static inline void random_ascii(int64_t n, uint32_t seed, uint8_t* out) {
rand_uniform_int(n, seed, static_cast<int32_t>('A'), static_cast<int32_t>('z'), out);
}
@@ -280,13 +304,13 @@ static inline int64_t CountNulls(const std::vector<uint8_t>& valid_bytes) {
return static_cast<int64_t>(std::count(valid_bytes.cbegin(), valid_bytes.cend(), '\0'));
}
-Status MakeRandomInt32Buffer(int64_t length, MemoryPool* pool,
- std::shared_ptr<ResizableBuffer>* out, uint32_t seed = 0) {
+template <typename T>
+Status MakeRandomBuffer(int64_t length, MemoryPool* pool,
+ std::shared_ptr<ResizableBuffer>* out, uint32_t seed = 0) {
DCHECK(pool);
std::shared_ptr<ResizableBuffer> result;
- RETURN_NOT_OK(AllocateResizableBuffer(pool, sizeof(int32_t) * length, &result));
- rand_uniform_int(length, seed, 0, std::numeric_limits<int32_t>::max(),
- reinterpret_cast<int32_t*>(result->mutable_data()));
+ RETURN_NOT_OK(AllocateResizableBuffer(pool, sizeof(T) * length, &result));
+ GenerateRandom<T>::Gen(length, seed, result->mutable_data());
*out = result;
return Status::OK();
}
@@ -344,6 +368,15 @@ void AssertBufferEqual(const Buffer& buffer, const Buffer& expected) {
ASSERT_TRUE(buffer.Equals(expected));
}
+static inline void AssertSchemaEqual(const Schema& lhs, const Schema& rhs) {
+ if (!lhs.Equals(rhs)) {
+ std::stringstream ss;
+ ss << "left schema: " << lhs.ToString() << std::endl
+ << "right schema: " << rhs.ToString() << std::endl;
+ FAIL() << ss.str();
+ }
+}
+
void PrintColumn(const Column& col, std::stringstream* ss) {
const ChunkedArray& carr = *col.data();
for (int i = 0; i < carr.num_chunks(); ++i) {
@@ -456,6 +489,53 @@ Status MakeArray(const std::vector<uint8_t>& valid_bytes, const std::vector<T>&
} \
} while (false)
+static inline void CompareBatch(const RecordBatch& left, const RecordBatch& right) {
+ if (!left.schema()->Equals(*right.schema())) {
+ FAIL() << "Left schema: " << left.schema()->ToString()
+ << "\nRight schema: " << right.schema()->ToString();
+ }
+ ASSERT_EQ(left.num_columns(), right.num_columns())
+ << left.schema()->ToString() << " result: " << right.schema()->ToString();
+ ASSERT_EQ(left.num_rows(), right.num_rows());
+ for (int i = 0; i < left.num_columns(); ++i) {
+ if (!left.column(i)->Equals(right.column(i))) {
+ std::stringstream ss;
+ ss << "Idx: " << i << " Name: " << left.column_name(i);
+ ss << std::endl << "Left: ";
+ ASSERT_OK(PrettyPrint(*left.column(i), 0, &ss));
+ ss << std::endl << "Right: ";
+ ASSERT_OK(PrettyPrint(*right.column(i), 0, &ss));
+ FAIL() << ss.str();
+ }
+ }
+}
+
+// ----------------------------------------------------------------------
+// A RecordBatchReader for serving a sequence of in-memory record batches
+
+class BatchIterator : public RecordBatchReader {
+ public:
+ BatchIterator(const std::shared_ptr<Schema>& schema,
+ const std::vector<std::shared_ptr<RecordBatch>>& batches)
+ : schema_(schema), batches_(batches), position_(0) {}
+
+ std::shared_ptr<Schema> schema() const override { return schema_; }
+
+ Status ReadNext(std::shared_ptr<RecordBatch>* out) override {
+ if (position_ >= batches_.size()) {
+ *out = nullptr;
+ } else {
+ *out = batches_[position_++];
+ }
+ return Status::OK();
+ }
+
+ private:
+ std::shared_ptr<Schema> schema_;
+ std::vector<std::shared_ptr<RecordBatch>> batches_;
+ size_t position_;
+};
+
} // namespace arrow
#endif // ARROW_TEST_UTIL_H_
diff --git a/cpp/src/arrow/util/memory.h b/cpp/src/arrow/util/memory.h
index 56fadca..121f301 100644
--- a/cpp/src/arrow/util/memory.h
+++ b/cpp/src/arrow/util/memory.h
@@ -59,8 +59,8 @@ void parallel_memcopy(uint8_t* dst, const uint8_t* src, int64_t nbytes,
std::vector<std::future<void*>> futures;
for (int i = 0; i < num_threads; i++) {
- futures.push_back(pool->Submit(memcpy, dst + prefix + i * chunk_size,
- left + i * chunk_size, chunk_size));
+ futures.emplace_back(pool->Submit(memcpy, dst + prefix + i * chunk_size,
+ left + i * chunk_size, chunk_size));
}
memcpy(dst, src, prefix);
memcpy(dst + prefix + num_threads * chunk_size, right, suffix);
diff --git a/cpp/src/parquet/util/stopwatch.h b/cpp/src/arrow/util/stopwatch.h
similarity index 71%
rename from cpp/src/parquet/util/stopwatch.h
rename to cpp/src/arrow/util/stopwatch.h
index 68cf792..f16c2ec 100644
--- a/cpp/src/parquet/util/stopwatch.h
+++ b/cpp/src/arrow/util/stopwatch.h
@@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#ifndef PARQUET_UTIL_STOPWATCH_H
-#define PARQUET_UTIL_STOPWATCH_H
+#pragma once
#include <stdio.h>
#ifndef _MSC_VER
@@ -26,27 +25,25 @@
#include <ctime>
#include <iostream>
-namespace parquet {
+namespace arrow {
+
+uint64_t CurrentTime() {
+ timespec time;
+ clock_gettime(CLOCK_MONOTONIC, &time);
+ return 1000000000L * time.tv_sec + time.tv_nsec;
+}
class StopWatch {
public:
StopWatch() {}
- void Start() { gettimeofday(&start_time, 0); }
+ void Start() { start_ = CurrentTime(); }
// Returns time in nanoseconds.
- uint64_t Stop() {
- struct timeval t_time;
- gettimeofday(&t_time, 0);
-
- return (1000L * 1000L * 1000L * (t_time.tv_sec - start_time.tv_sec) +
- (t_time.tv_usec - start_time.tv_usec));
- }
+ uint64_t Stop() { return CurrentTime() - start_; }
private:
- struct timeval start_time;
+ uint64_t start_;
};
-} // namespace parquet
-
-#endif
+} // namespace arrow
diff --git a/cpp/src/parquet/util/CMakeLists.txt b/cpp/src/parquet/util/CMakeLists.txt
index debf4b3..72d4ca2 100644
--- a/cpp/src/parquet/util/CMakeLists.txt
+++ b/cpp/src/parquet/util/CMakeLists.txt
@@ -20,7 +20,6 @@ install(FILES
comparison.h
macros.h
memory.h
- stopwatch.h
visibility.h
DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/parquet/util")
diff --git a/cpp/thirdparty/versions.txt b/cpp/thirdparty/versions.txt
index cdea520..381282b 100644
--- a/cpp/thirdparty/versions.txt
+++ b/cpp/thirdparty/versions.txt
@@ -29,8 +29,8 @@ BROTLI_VERSION=v0.6.0
LZ4_VERSION=v1.7.5
ZLIB_VERSION=1.2.8
ZSTD_VERSION=v1.2.0
-PROTOBUF_VERSION=v2.6.0
-GRPC_VERSION=v1.12.1
+PROTOBUF_VERSION=v3.6.1
+GRPC_VERSION=1.14.1
ORC_VERSION=1.5.1
THRIFT_VERSION=0.11.0
GLOG_VERSION=v0.3.5
diff --git a/format/Flight.proto b/format/Flight.proto
new file mode 100644
index 0000000..ab1d620
--- /dev/null
+++ b/format/Flight.proto
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+option java_package = "org.apache.arrow.flight.impl";
+package arrow.flight.protocol;
+
+/*
+ * A flight service is an endpoint for retrieving or storing Arrow data. A
+ * flight service can expose one or more predefined endpoints that can be
+ * access using the Arrow Flight Protocol. Additionally, the a flight service
+ * and expose a set of actions that are available.
+ */
+service FlightService {
+
+ /*
+ * Handshake between client and server. Depending on the server, the
+ * handshake may be required to determine the token that should be used for
+ * future operations. Both request and response are streams to allow multiple
+ * roundtrips depending on auth mechanism.
+ */
+ rpc Handshake(stream HandshakeRequest) returns (stream HandshakeResponse) {}
+
+ /*
+ * Get a list of available streams given a particular criteria. Most flight
+ * services will expose one or more streams that are readily available for
+ * retrieval. This api allows listing the streams available for
+ * consumption. A user can also provide a criteria. The criteria can limit
+ * the subset of streams that can be listed via this interface. Each flight
+ * service allows its own definition of how to consume criteria.
+ */
+ rpc ListFlights(Criteria) returns (stream FlightGetInfo) {}
+
+ /*
+ * For a given FlightDescriptor, get information about how the flight can be
+ * consumed. This is a useful interface if the consumer of the interface
+ * already can identify the specific flight to consume. This interface can
+ * also allow a consumer to generate a flight stream through a specified
+ * descriptor. For example, a flight descriptor might be something that
+ * includes a SQL statement or a Pickled Python operation that will be
+ * executed. In those cases, the descriptor will not be previously available
+ * within the list of available streams provided by ListFlights but will be
+ * available for consumption for the duration defined by the specific flight
+ * service.
+ */
+ rpc GetFlightInfo(FlightDescriptor) returns (FlightGetInfo) {}
+
+ /*
+ * Retrieve a single stream associated with a particular descriptor
+ * associated with the referenced ticket. A Flight can be composed of one or
+ * more streams where each stream can be retrieved using a separate opaque
+ * ticket that the flight service uses for managing a collection of streams.
+ */
+ rpc DoGet(Ticket) returns (stream FlightData) {}
+
+ /*
+ * Push a stream to the flight service using associated with a particular
+ * flight stream. This allows a client of a flight service to upload a stream
+ * of data. Depending on the particular flight service, a client consumer
+ * could be allowed to upload a single stream per descriptor or an unlimited
+ * number. (In the latter, the service might implement a 'seal' action that
+ * can be applied to a descriptor once all streams are uploaded.
+ */
+ rpc DoPut(stream FlightData) returns (PutResult) {}
+
+ /*
+ * Flight services can support an arbitrary number of simple actions in
+ * addition to the possible ListFlights, GetFlightInfo, DoGet, DoPut
+ * operations that are potentially available. DoAction allows a flight client
+ * to do a specific action against a flight service. An action includes
+ * opaque request and response objects that are specific to the type action
+ * being undertaken.
+ */
+ rpc DoAction(Action) returns (stream Result) {}
+
+ /*
+ * A flight service exposes all of the available action types that it has
+ * along with descriptions. This allows different flight consumers to
+ * understand the capabilities of the flight servic
+ */
+ rpc ListActions(Empty) returns (stream ActionType) {}
+
+}
+
+/*
+ * The request that a client provides to a server on handshake.
+ */
+message HandshakeRequest {
+
+ /*
+ * A defined protocol version
+ */
+ uint64 protocol_version = 1;
+
+ /*
+ * Arbitrary auth/handshake info.
+ */
+ bytes payload = 2;
+}
+
+message HandshakeResponse {
+
+ /*
+ * A defined protocol version
+ */
+ uint64 protocol_version = 1;
+
+ /*
+ * Arbitrary auth/handshake info.
+ */
+ bytes payload = 2;
+}
+
+/*
+ * A message for doing simple auth.
+ */
+message BasicAuth {
+ string username = 2;
+ string password = 3;
+}
+
+message Empty {}
+
+/*
+ * Describes an available action, including both the name used for execution
+ * along with a short description of the purpose of the action.
+ */
+message ActionType {
+ string type = 1;
+ string description = 2;
+}
+
+/*
+ * A service specific expression that can be used to return a limited the set
+ * of available Arrow Flight streams.
+ */
+message Criteria {
+ bytes expression = 1;
+}
+
+/*
+ * An opaque action specific for the service.
+ */
+message Action {
+ string type = 1;
+ bytes body = 2;
+}
+
+/*
+ * An opaque result returned after execution an action.
+ */
+message Result {
+ bytes body = 1;
+}
+
+/*
+ * The name or tag for a Flight. May be used as a way to retrieve or generate
+ * a flight or be used to expose a set of previously defined flights.
+ */
+message FlightDescriptor {
+
+ /*
+ * Describes what type of descriptor is defined.
+ */
+ enum DescriptorType {
+
+ // Protobuf pattern, not used.
+ UNKNOWN = 0;
+
+ /*
+ * A named path that identifies a dataset. A path is composed of a string
+ * or list of strings describing a particular dataset. This is conceptually
+ * similar to a path inside a filesystem.
+ */
+ PATH = 1;
+
+ /*
+ * An opaque command to generate a dataset.
+ */
+ CMD = 2;
+ }
+
+ DescriptorType type = 1;
+
+ /*
+ * Opaque value used to express a command. Should only be defined when
+ * type = CMD.
+ */
+ bytes cmd = 2;
+
+ /*
+ * List of strings identifying a particular dataset. Should only be defined
+ * when type = PATH.
+ */
+ repeated string path = 3;
+}
+
+/*
+ * The access coordinates for retrieval of a dataset. With a FlightGetInfo, a
+ * consumer is able to determine how to retrieve a dataset.
+ */
+message FlightGetInfo {
+ // schema of the dataset as described in Schema.fbs::Schema
+ bytes schema = 1;
+
+ /*
+ * The descriptor associated with this info.
+ */
+ FlightDescriptor flight_descriptor = 2;
+
+ /*
+ * A list of endpoints associated with the flight. To consume the whole
+ * flight, all endpoints must be consumed.
+ */
+ repeated FlightEndpoint endpoint = 3;
+
+ uint64 total_records = 4;
+ uint64 total_bytes = 5;
+}
+
+/*
+ * A particular stream or split associated with a flight.
+ */
+message FlightEndpoint {
+
+ /*
+ * Token used to retrieve this stream.
+ */
+ Ticket ticket = 1;
+
+ /*
+ * A list of locations where this ticket can be redeemed. If the list is
+ * empty, the expectation is that the ticket can only be redeemed on the
+ * current service where the ticket was generated.
+ */
+ repeated Location location = 2;
+}
+
+/*
+ * A location where a flight service will accept retrieval of a particular
+ * stream given a ticket.
+ */
+message Location {
+ string host = 1;
+ int32 port = 2;
+}
+
+/*
+ * An opaque identifier that the service can use to retrieve a particular
+ * portion of a stream.
+ */
+message Ticket {
+ bytes ticket = 1;
+}
+
+/*
+ * A batch of Arrow data as part of a stream of batches.
+ */
+message FlightData {
+
+ /*
+ * The descriptor of the data. This is only relevant when a client is
+ * starting a new DoPut stream
+ */
+ FlightDescriptor flight_descriptor = 1;
+
+ /*
+ * Header for message data as described in Message.fbs::Message
+ */
+ bytes data_header = 2;
+
+ /*
+ * The actual batch of Arrow data. Preferrably handled with minimal-copies
+ * comes last in the definition to help with sidecar patterns.
+ */
+ bytes data_body = 1000;
+}
+
+/**
+ * The response message (currently empty) associated with the submission of a
+ * DoPut.
+ */
+message PutResult {}
diff --git a/integration/README.md b/integration/README.md
index 0a428e6..760a175 100644
--- a/integration/README.md
+++ b/integration/README.md
@@ -70,7 +70,7 @@ Java `arrow-tool` JAR and the build path for the C++ executables:
JAVA_DIR=$ARROW_HOME/java
CPP_BUILD_DIR=$ARROW_HOME/cpp/build
-VERSION=0.1.1-SNAPSHOT
+VERSION=0.11.0-SNAPSHOT
export ARROW_JAVA_INTEGRATION_JAR=$JAVA_DIR/tools/target/arrow-tools-$VERSION-jar-with-dependencies.jar
export ARROW_CPP_EXE_PATH=$CPP_BUILD_DIR/debug
```
diff --git a/python/.gitignore b/python/.gitignore
index fac4e99..3346aa6 100644
--- a/python/.gitignore
+++ b/python/.gitignore
@@ -42,5 +42,4 @@ pyarrow/_table_api.h
manylinux1/arrow
# plasma store
-pyarrow/plasma_store
pyarrow/plasma_store_server
diff --git a/python/README.md b/python/README.md
index c732e3b..e905862 100644
--- a/python/README.md
+++ b/python/README.md
@@ -48,8 +48,8 @@ The code must pass `flake8` (available from pip or conda) or it will fail the
build. Check for style errors before submitting your pull request with:
```
-flake8 pyarrow
-flake8 --config=.flake8.cython pyarrow
+flake8 .
+flake8 --config=.flake8.cython .
```
### Building from Source