You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/09/20 20:57:02 UTC

[arrow] branch master updated: ARROW-3146: [C++] Prototype Flight RPC client and server implementations

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new db0ef22  ARROW-3146: [C++] Prototype Flight RPC client and server implementations
db0ef22 is described below

commit db0ef22dd68ae00e11f09da40b6734c1d9770b57
Author: Wes McKinney <we...@apache.org>
AuthorDate: Thu Sep 20 16:56:50 2018 -0400

    ARROW-3146: [C++] Prototype Flight RPC client and server implementations
    
    This is a partial C++ implementation of the Flight RPC system initially proposed by Jacques in ARROW-249. As in Java, I had to dig into gRPC and Protocol Buffers internals to ensure that
    
    * On write, memory is only copied once into the outgoing gRPC buffer
    * On read, no memory is copied
    
    The way that I tricked gRPC into circumventing the built-in protobuf serde paths might look a bit hacky, but after digging around in the library a bunch I've convinced myself that it's the best and perhaps only way to accomplish this. Luckily, the message that's being serialized/deserialized is pretty opaque to the rest of the gRPC system, and it's controlled by the `SerializationTraits<T>` class. So you can take a gRPC stream reader and make it create any kind of type you want, even  [...]
    
    Some things that won't be addressed in this patch, as scope is too large:
    
    * gRPC build toolchain issues (this is rather complex, I will create follow-up issues)
    * Security / encryption, and authentication issues. I have only implemented an insecure server
    * Integration with Travis CI
    * Python bindings
    
    API is preliminary and I expect to be the subject of iteration to make general and fast over the next several months.
    
    Author: Wes McKinney <we...@apache.org>
    
    Closes #2547 from wesm/flight-cpp-prototype and squashes the following commits:
    
    64bcdea43 <Wes McKinney> Initial Arrow Flight C++ implementation
---
 ci/travis_before_script_cpp.sh                     |   4 +
 ci/travis_script_cpp.sh                            |   2 +-
 cpp/CMakeLists.txt                                 |  36 +-
 cpp/build-support/lint_cpp_cli.py                  |   6 +-
 cpp/cmake_modules/FindProtobuf.cmake               |   1 +
 cpp/cmake_modules/ThirdpartyToolchain.cmake        | 116 +++---
 cpp/src/arrow/buffer.cc                            |   4 +
 cpp/src/arrow/buffer.h                             |   5 +
 .../arrow/dbi/hiveserver2/thrift/CMakeLists.txt    |   9 +-
 cpp/src/arrow/flight/CMakeLists.txt                | 139 +++++++
 cpp/src/arrow/flight/README.md                     |  36 ++
 .../util/stopwatch.h => arrow/flight/api.h}        |  38 +-
 cpp/src/arrow/flight/client.cc                     | 410 +++++++++++++++++++++
 cpp/src/arrow/flight/client.h                      | 110 ++++++
 cpp/src/arrow/flight/flight-benchmark.cc           | 193 ++++++++++
 cpp/src/arrow/flight/flight-test.cc                | 267 ++++++++++++++
 cpp/src/arrow/flight/internal.cc                   | 235 ++++++++++++
 cpp/src/arrow/flight/internal.h                    |  77 ++++
 cpp/src/arrow/flight/perf-server.cc                | 200 ++++++++++
 cpp/src/arrow/flight/perf.proto                    |  44 +++
 cpp/src/arrow/flight/server.cc                     | 385 +++++++++++++++++++
 cpp/src/arrow/flight/server.h                      | 142 +++++++
 cpp/src/arrow/flight/test-server.cc                | 141 +++++++
 cpp/src/arrow/flight/test-util.h                   | 157 ++++++++
 cpp/src/arrow/flight/types.cc                      |  75 ++++
 cpp/src/arrow/flight/types.h                       | 210 +++++++++++
 cpp/src/arrow/gpu/cuda-test.cc                     |   2 +-
 cpp/src/arrow/ipc/message.cc                       |   4 +
 cpp/src/arrow/ipc/message.h                        |   4 +
 cpp/src/arrow/ipc/metadata-internal.cc             |   2 +
 cpp/src/arrow/ipc/metadata-internal.h              |   2 +-
 cpp/src/arrow/ipc/reader.cc                        |   6 +
 cpp/src/arrow/ipc/test-common.h                    |  41 +--
 cpp/src/arrow/ipc/writer.cc                        | 206 ++++++-----
 cpp/src/arrow/ipc/writer.h                         |  30 ++
 cpp/src/arrow/test-util.h                          |  90 ++++-
 cpp/src/arrow/util/memory.h                        |   4 +-
 cpp/src/{parquet => arrow}/util/stopwatch.h        |  27 +-
 cpp/src/parquet/util/CMakeLists.txt                |   1 -
 cpp/thirdparty/versions.txt                        |   4 +-
 format/Flight.proto                                | 299 +++++++++++++++
 integration/README.md                              |   2 +-
 python/.gitignore                                  |   1 -
 python/README.md                                   |   4 +-
 44 files changed, 3502 insertions(+), 269 deletions(-)

diff --git a/ci/travis_before_script_cpp.sh b/ci/travis_before_script_cpp.sh
index e1c231c..54a00f7 100755
--- a/ci/travis_before_script_cpp.sh
+++ b/ci/travis_before_script_cpp.sh
@@ -94,6 +94,10 @@ if [ $ARROW_TRAVIS_COVERAGE == "1" ]; then
   CMAKE_COMMON_FLAGS="$CMAKE_COMMON_FLAGS -DARROW_GENERATE_COVERAGE=ON"
 fi
 
+if [ $ARROW_TRAVIS_VERBOSE == "1" ]; then
+  CMAKE_COMMON_FLAGS="$CMAKE_COMMON_FLAGS -DARROW_VERBOSE_THIRDPARTY_BUILD=ON"
+fi
+
 if [ $TRAVIS_OS_NAME == "linux" ]; then
     cmake $CMAKE_COMMON_FLAGS \
           $CMAKE_LINUX_FLAGS \
diff --git a/ci/travis_script_cpp.sh b/ci/travis_script_cpp.sh
index 3a6b2f7..b89e5b7 100755
--- a/ci/travis_script_cpp.sh
+++ b/ci/travis_script_cpp.sh
@@ -23,7 +23,7 @@ source $TRAVIS_BUILD_DIR/ci/travis_env_common.sh
 
 pushd $CPP_BUILD_DIR
 
-ctest -j2 --output-on-failure -L unittest
+PATH=$ARROW_BUILD_TYPE:$PATH ctest -j2 --output-on-failure -L unittest
 
 popd
 
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index aa68f92..23ef7d0 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -140,6 +140,10 @@ Pass multiple labels by dividing with semicolons")
     "Compile with extra error context (line numbers, code)"
     OFF)
 
+  option(ARROW_FLIGHT
+    "Build the Arrow Flight RPC System (requires GRPC, Protocol Buffers)"
+    OFF)
+
   option(ARROW_IPC
     "Build the Arrow IPC extensions"
     ON)
@@ -240,10 +244,6 @@ Pass multiple labels by dividing with semicolons")
     "Build with zstd compression"
     ON)
 
-  option(ARROW_WITH_GRPC
-    "Build with GRPC"
-    OFF)
-
   option(ARROW_GENERATE_COVERAGE
     "Build with C++ code coverage enabled"
     OFF)
@@ -634,14 +634,6 @@ if (ARROW_WITH_ZSTD)
   SET(ARROW_STATIC_LINK_LIBS zstd_static ${ARROW_STATIC_LINK_LIBS})
 endif()
 
-if (ARROW_WITH_GRPC)
-  SET(ARROW_STATIC_LINK_LIBS
-    grpc_grp
-    grpc_grpc
-    grpc_grpcpp
-    ${ARROW_STATIC_LINK_LIBS})
-endif()
-
 if (ARROW_ORC)
   SET(ARROW_STATIC_LINK_LIBS
     ${ARROW_STATIC_LINK_LIBS}
@@ -657,11 +649,13 @@ if (ARROW_STATIC_LINK_LIBS)
   add_dependencies(arrow_dependencies ${ARROW_STATIC_LINK_LIBS})
 endif()
 
-set(ARROW_BENCHMARK_LINK_LIBS
-  arrow_static
-  arrow_benchmark_main
-  gtest
-  ${ARROW_STATIC_LINK_LIBS})
+if (ARROW_BUILD_BENCHMARKS)
+  set(ARROW_BENCHMARK_LINK_LIBS
+    arrow_static
+    arrow_benchmark_main
+    gtest
+    ${ARROW_STATIC_LINK_LIBS})
+endif()
 
 set(ARROW_SHARED_PRIVATE_LINK_LIBS
   ${ARROW_STATIC_LINK_LIBS}
@@ -683,8 +677,8 @@ endif()
 set(ARROW_MIN_TEST_LIBS
   arrow_static
   ${ARROW_STATIC_LINK_LIBS}
-  gtest
-  gtest_main)
+  gtest_main
+  gtest)
 
 if(NOT MSVC)
   set(ARROW_MIN_TEST_LIBS
@@ -743,6 +737,10 @@ endif()
 
 add_subdirectory(src/arrow)
 
+if(ARROW_FLIGHT)
+  add_subdirectory(src/arrow/flight)
+endif()
+
 if(ARROW_PYTHON)
   add_subdirectory(src/arrow/python)
 endif()
diff --git a/cpp/build-support/lint_cpp_cli.py b/cpp/build-support/lint_cpp_cli.py
index 0c6bad1..4028105 100644
--- a/cpp/build-support/lint_cpp_cli.py
+++ b/cpp/build-support/lint_cpp_cli.py
@@ -72,10 +72,8 @@ EXCLUSIONS = [
     'arrow/util/macros.h',
     'arrow/python/iterators.h',
     'arrow/util/parallel.h',
-    'arrow/io/hdfs-internal.h',
-    'parquet/arrow/test-util.h',
-    'parquet/encoding-internal.h',
-    'parquet/test-util.h'
+    'test',
+    'internal'
 ]
 
 try:
diff --git a/cpp/cmake_modules/FindProtobuf.cmake b/cpp/cmake_modules/FindProtobuf.cmake
index 9591bd1..cb003e3 100644
--- a/cpp/cmake_modules/FindProtobuf.cmake
+++ b/cpp/cmake_modules/FindProtobuf.cmake
@@ -94,6 +94,7 @@ else()
 endif()
 
 mark_as_advanced (
+  PROTOBUF_EXECUTABLE
   PROTOBUF_INCLUDE_DIR
   PROTOBUF_LIBS
   PROTOBUF_STATIC_LIB
diff --git a/cpp/cmake_modules/ThirdpartyToolchain.cmake b/cpp/cmake_modules/ThirdpartyToolchain.cmake
index 4bd6470..e25f954 100644
--- a/cpp/cmake_modules/ThirdpartyToolchain.cmake
+++ b/cpp/cmake_modules/ThirdpartyToolchain.cmake
@@ -31,6 +31,7 @@ if (NOT "$ENV{ARROW_BUILD_TOOLCHAIN}" STREQUAL "")
     set(GTEST_HOME "$ENV{ARROW_BUILD_TOOLCHAIN}")
   endif()
   set(JEMALLOC_HOME "$ENV{ARROW_BUILD_TOOLCHAIN}")
+  set(GRPC_HOME "$ENV{ARROW_BUILD_TOOLCHAIN}")
   set(LZ4_HOME "$ENV{ARROW_BUILD_TOOLCHAIN}")
   # orc disabled as it's not in conda-forge (but in Anaconda with an incompatible ABI)
   # set(ORC_HOME "$ENV{ARROW_BUILD_TOOLCHAIN}")
@@ -210,7 +211,7 @@ if (DEFINED ENV{ARROW_PROTOBUF_URL})
   set(PROTOBUF_SOURCE_URL "$ENV{ARROW_PROTOBUF_URL}")
 else()
   string(SUBSTRING ${PROTOBUF_VERSION} 1 -1 STRIPPED_PROTOBUF_VERSION)  # strip the leading `v`
-  set(PROTOBUF_SOURCE_URL "https://github.com/google/protobuf/releases/download/${PROTOBUF_VERSION}/protobuf-${STRIPPED_PROTOBUF_VERSION}.tar.gz")
+  set(PROTOBUF_SOURCE_URL "https://github.com/protocolbuffers/protobuf/releases/download/${PROTOBUF_VERSION}/protobuf-all-${STRIPPED_PROTOBUF_VERSION}.tar.gz")
 endif()
 
 set(RAPIDJSON_SOURCE_MD5 "badd12c511e081fec6c89c43a7027bce")
@@ -1009,54 +1010,11 @@ if (ARROW_WITH_ZSTD)
   endif()
 endif()
 
-if (ARROW_WITH_GRPC)
-# ----------------------------------------------------------------------
-# GRPC
-  if ("${GRPC_HOME}" STREQUAL "")
-    set(GRPC_VENDORED 1)
-    set(GRPC_BUILD_DIR "${CMAKE_CURRENT_BINARY_DIR}/grpc_ep-prefix/src/grpc_ep-build")
-    set(GRPC_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/grpc_ep/src/grpc_ep-install")
-    set(GRPC_HOME "${GRPC_PREFIX}")
-    set(GRPC_INCLUDE_DIR "${GRPC_PREFIX}/include")
-    set(GRPC_STATIC_LIBRARY_GPR "${GRPC_BUILD_DIR}/${CMAKE_CFG_INTDIR}/${CMAKE_STATIC_LIBRARY_PREFIX}gpr${CMAKE_STATIC_LIBRARY_SUFFIX}")
-    set(GRPC_STATIC_LIBRARY_GRPC "${GRPC_BUILD_DIR}/${CMAKE_CFG_INTDIR}/${CMAKE_STATIC_LIBRARY_PREFIX}grpc${CMAKE_STATIC_LIBRARY_SUFFIX}")
-    set(GRPC_STATIC_LIBRARY_GRPCPP "${GRPC_BUILD_DIR}/${CMAKE_CFG_INTDIR}/${CMAKE_STATIC_LIBRARY_PREFIX}grpc++${CMAKE_STATIC_LIBRARY_SUFFIX}")
-    set(GRPC_CMAKE_ARGS -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
-                          "-DCMAKE_CXX_FLAGS=${EP_CXX_FLAGS}"
-                          "-DCMAKE_C_FLAGS=${EP_C_FLAGS}"
-                          -DCMAKE_INSTALL_PREFIX=${GRPC_PREFIX}
-                          -DBUILD_SHARED_LIBS=OFF)
 
-    ExternalProject_Add(grpc_ep
-      GIT_REPOSITORY "https://github.com/grpc/grpc"
-      GIT_TAG ${GRPC_VERSION}
-      BUILD_BYPRODUCTS "${GRPC_STATIC_LIBRARY_GPR}" "${GRPC_STATIC_LIBRARY_GRPC}" "${GRPC_STATIC_LIBRARY_GRPCPP}"
-      ${GRPC_BUILD_BYPRODUCTS}
-      ${EP_LOG_OPTIONS}
-      CMAKE_ARGS ${GRPC_CMAKE_ARGS}
-      ${EP_LOG_OPTIONS})
-  else()
-    find_package(gRPC CONFIG REQUIRED)
-    set(GRPC_VENDORED 0)
-  endif()
-
-  include_directories(SYSTEM ${GRPC_INCLUDE_DIR})
-  ADD_THIRDPARTY_LIB(grpc_grp
-    STATIC_LIB ${GRPC_STATIC_LIBRARY_GPR})
-  ADD_THIRDPARTY_LIB(grpc_grpc
-    STATIC_LIB ${GRPC_STATIC_LIBRARY_GRPC})
-  ADD_THIRDPARTY_LIB(grpc_grpcpp
-    STATIC_LIB ${GRPC_STATIC_LIBRARY_GRPCPP})
-
-  if (GRPC_VENDORED)
-    add_dependencies(grpc_grp grpc_ep)
-    add_dependencies(grpc_grpc grpc_ep)
-    add_dependencies(grpc_grpcpp grpc_ep)
-  endif()
-
-endif()
+# ----------------------------------------------------------------------
+# Protocol Buffers (required for ORC and Flight libraries)
 
-if (ARROW_ORC)
+if (ARROW_ORC OR ARROW_FLIGHT)
   # protobuf
   if ("${PROTOBUF_HOME}" STREQUAL "")
     set (PROTOBUF_PREFIX "${THIRDPARTY_DIR}/protobuf_ep-install")
@@ -1089,9 +1047,69 @@ if (ARROW_ORC)
   if (PROTOBUF_VENDORED)
     add_dependencies (protobuf protobuf_ep)
   endif ()
+endif()
 
-  # orc
+# ----------------------------------------------------------------------
+# Dependencies for Arrow Flight RPC
+
+if (ARROW_FLIGHT)
+  if ("${GRPC_HOME}" STREQUAL "")
+    set(GRPC_VENDORED 1)
+    set(GRPC_BUILD_DIR "${CMAKE_CURRENT_BINARY_DIR}/grpc_ep-prefix/src/grpc_ep-build")
+    set(GRPC_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/grpc_ep/src/grpc_ep-install")
+    set(GRPC_HOME "${GRPC_PREFIX}")
+    set(GRPC_INCLUDE_DIR "${GRPC_PREFIX}/include")
+    set(GRPC_STATIC_LIBRARY_GPR "${GRPC_BUILD_DIR}/${CMAKE_CFG_INTDIR}/${CMAKE_STATIC_LIBRARY_PREFIX}gpr${CMAKE_STATIC_LIBRARY_SUFFIX}")
+    set(GRPC_STATIC_LIBRARY_GRPC "${GRPC_BUILD_DIR}/${CMAKE_CFG_INTDIR}/${CMAKE_STATIC_LIBRARY_PREFIX}grpc${CMAKE_STATIC_LIBRARY_SUFFIX}")
+    set(GRPC_STATIC_LIBRARY_GRPCPP "${GRPC_BUILD_DIR}/${CMAKE_CFG_INTDIR}/${CMAKE_STATIC_LIBRARY_PREFIX}grpcpp${CMAKE_STATIC_LIBRARY_SUFFIX}")
+    set(GRPC_CMAKE_ARGS -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
+                        "-DCMAKE_CXX_FLAGS=${EP_CXX_FLAGS}"
+                        "-DCMAKE_C_FLAGS=${EP_C_FLAGS}"
+                        -DCMAKE_INSTALL_PREFIX=${GRPC_PREFIX}
+                        -DBUILD_SHARED_LIBS=OFF)
 
+    ExternalProject_Add(grpc_ep
+      GIT_REPOSITORY "https://github.com/grpc/grpc"
+      GIT_TAG ${GRPC_VERSION}
+      BUILD_BYPRODUCTS "${GRPC_STATIC_LIBRARY_GPR}" "${GRPC_STATIC_LIBRARY_GRPC}" "${GRPC_STATIC_LIBRARY_GRPCPP}"
+      ${GRPC_BUILD_BYPRODUCTS}
+      ${EP_LOG_OPTIONS}
+      CMAKE_ARGS ${GRPC_CMAKE_ARGS}
+      ${EP_LOG_OPTIONS})
+    include_directories(SYSTEM ${GRPC_INCLUDE_DIR})
+  else()
+    find_package(gRPC CONFIG REQUIRED)
+    set(GRPC_VENDORED 0)
+  endif()
+
+  get_property(GPR_STATIC_LIB TARGET gRPC::gpr PROPERTY LOCATION)
+  ADD_THIRDPARTY_LIB(grpc_gpr
+    STATIC_LIB ${GPR_STATIC_LIB})
+
+  get_property(GRPC_STATIC_LIB TARGET gRPC::grpc_unsecure PROPERTY LOCATION)
+  ADD_THIRDPARTY_LIB(grpc_grpc
+    STATIC_LIB ${GRPC_STATIC_LIB})
+
+  get_property(GRPCPP_STATIC_LIB TARGET gRPC::grpc++_unsecure PROPERTY LOCATION)
+  ADD_THIRDPARTY_LIB(grpc_grpcpp
+    STATIC_LIB ${GRPCPP_STATIC_LIB})
+
+  get_property(GRPC_ADDRESS_SORTING_STATIC_LIB
+    TARGET gRPC::address_sorting PROPERTY LOCATION)
+  ADD_THIRDPARTY_LIB(grpc_address_sorting
+    STATIC_LIB ${GRPC_ADDRESS_SORTING_STATIC_LIB})
+
+  # XXX(wesm): relying on vendored c-ares provided by gRPC for the time being
+  get_property(CARES_STATIC_LIB TARGET c-ares::cares_static PROPERTY LOCATION)
+  ADD_THIRDPARTY_LIB(cares
+    STATIC_LIB ${CARES_STATIC_LIB})
+endif()
+
+# ----------------------------------------------------------------------
+# Apache ORC
+
+if (ARROW_ORC)
+  # orc
   if ("${ORC_HOME}" STREQUAL "")
     set(ORC_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/orc_ep-install")
     set(ORC_HOME "${ORC_PREFIX}")
@@ -1274,10 +1292,10 @@ endif()
 
 endif()  # ARROW_HIVESERVER2
 
-if (ARROW_USE_GLOG)
 # ----------------------------------------------------------------------
 # GLOG
 
+if (ARROW_USE_GLOG)
   if("${GLOG_HOME}" STREQUAL "")
     set(GLOG_BUILD_DIR "${CMAKE_CURRENT_BINARY_DIR}/glog_ep-prefix/src/glog_ep")
     set(GLOG_INCLUDE_DIR "${GLOG_BUILD_DIR}/include")
diff --git a/cpp/src/arrow/buffer.cc b/cpp/src/arrow/buffer.cc
index 2c01041..006fe0f 100644
--- a/cpp/src/arrow/buffer.cc
+++ b/cpp/src/arrow/buffer.cc
@@ -71,6 +71,10 @@ Status Buffer::FromString(const std::string& data, std::shared_ptr<Buffer>* out)
   return FromString(data, default_memory_pool(), out);
 }
 
+std::string Buffer::ToString() const {
+  return std::string(reinterpret_cast<const char*>(data_), static_cast<size_t>(size_));
+}
+
 void Buffer::CheckMutable() const { DCHECK(is_mutable()) << "buffer not mutable"; }
 
 /// A Buffer whose lifetime is tied to a particular MemoryPool
diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h
index f8c2c83..42b99bf 100644
--- a/cpp/src/arrow/buffer.h
+++ b/cpp/src/arrow/buffer.h
@@ -146,6 +146,11 @@ class ARROW_EXPORT Buffer {
                                     static_cast<int64_t>(sizeof(T) * data.size()));
   }  // namespace arrow
 
+  /// \brief Copy buffer contents into a new std::string
+  /// \return std::string
+  /// \note Can throw std::bad_alloc if buffer is large
+  std::string ToString() const;
+
   int64_t capacity() const { return capacity_; }
   const uint8_t* data() const { return data_; }
 
diff --git a/cpp/src/arrow/dbi/hiveserver2/thrift/CMakeLists.txt b/cpp/src/arrow/dbi/hiveserver2/thrift/CMakeLists.txt
index c59fd5a..be689f9 100644
--- a/cpp/src/arrow/dbi/hiveserver2/thrift/CMakeLists.txt
+++ b/cpp/src/arrow/dbi/hiveserver2/thrift/CMakeLists.txt
@@ -44,10 +44,11 @@ function(HS2_THRIFT_GEN VAR)
     # All the output files we can determine based on filename.
     #   - Does not include .skeleton.cpp files
     #   - Does not include java output files
-    set(OUTPUT_BE_FILE "${GEN_DIR}/${FIL_WE}_types.cpp")
-    set(OUTPUT_BE_FILE ${OUTPUT_BE_FILE} " ${GEN_DIR}/${FIL_WE}_types.h")
-    set(OUTPUT_BE_FILE ${OUTPUT_BE_FILE} " ${GEN_DIR}/${FIL_WE}_constants.cpp")
-    set(OUTPUT_BE_FILE ${OUTPUT_BE_FILE} " ${GEN_DIR}/${FIL_WE}_constants.h")
+    set(OUTPUT_BE_FILE
+      "${GEN_DIR}/${FIL_WE}_types.cpp"
+      "${GEN_DIR}/${FIL_WE}_types.h"
+      "${GEN_DIR}/${FIL_WE}_constants.cpp"
+      "${GEN_DIR}/${FIL_WE}_constants.h")
     list(APPEND ${VAR} ${OUTPUT_BE_FILE})
 
     # BeeswaxService thrift generation
diff --git a/cpp/src/arrow/flight/CMakeLists.txt b/cpp/src/arrow/flight/CMakeLists.txt
new file mode 100644
index 0000000..e830be3
--- /dev/null
+++ b/cpp/src/arrow/flight/CMakeLists.txt
@@ -0,0 +1,139 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_custom_target(arrow_flight)
+
+# Header files
+install(FILES
+  api.h
+  client.h
+  server.h
+  types.h
+  DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/flight")
+
+SET(ARROW_FLIGHT_STATIC_LINK_LIBS
+  grpc_grpcpp
+  grpc_grpc
+  grpc_gpr
+  grpc_address_sorting
+  cares)
+
+# TODO(wesm): Protobuf shared vs static linking
+
+set(FLIGHT_PROTO_PATH "${CMAKE_SOURCE_DIR}/../format")
+set(FLIGHT_PROTO ${CMAKE_SOURCE_DIR}/../format/Flight.proto)
+
+set(FLIGHT_GENERATED_PROTO_FILES
+  "${CMAKE_CURRENT_BINARY_DIR}/Flight.pb.cc"
+  "${CMAKE_CURRENT_BINARY_DIR}/Flight.pb.h"
+  "${CMAKE_CURRENT_BINARY_DIR}/Flight.grpc.pb.cc"
+  "${CMAKE_CURRENT_BINARY_DIR}/Flight.grpc.pb.h")
+
+if(PROTOBUF_VENDORED)
+  set(PROTO_DEPENDS ${FLIGHT_PROTO} protobuf)
+else()
+  set(PROTO_DEPENDS ${FLIGHT_PROTO})
+endif()
+
+# Get location of grpc_cpp_plugin so we can pass it to protoc
+get_property(GRPC_CPP_PLUGIN TARGET gRPC::grpc_cpp_plugin PROPERTY LOCATION)
+
+add_custom_command(
+  OUTPUT ${FLIGHT_GENERATED_PROTO_FILES}
+  COMMAND ${PROTOBUF_EXECUTABLE}
+  "-I${FLIGHT_PROTO_PATH}"
+  "--cpp_out=${CMAKE_CURRENT_BINARY_DIR}"
+  "${FLIGHT_PROTO}"
+  DEPENDS ${PROTO_DEPENDS}
+  ARGS
+  COMMAND ${PROTOBUF_EXECUTABLE}
+  "-I${FLIGHT_PROTO_PATH}"
+  "--grpc_out=${CMAKE_CURRENT_BINARY_DIR}"
+  "--plugin=protoc-gen-grpc=${GRPC_CPP_PLUGIN}"
+  "${FLIGHT_PROTO}")
+
+set_source_files_properties(${FLIGHT_GENERATED_PROTO_FILES}
+  PROPERTIES GENERATED TRUE)
+
+set(ARROW_FLIGHT_SRCS
+  client.cc
+  Flight.pb.cc
+  Flight.grpc.pb.cc
+  internal.cc
+  server.cc
+  types.cc
+)
+
+ADD_ARROW_LIB(arrow_flight
+  SOURCES ${ARROW_FLIGHT_SRCS}
+  DEPENDENCIES arrow_dependencies
+  SHARED_LINK_LIBS arrow_shared ${ARROW_FLIGHT_STATIC_LINK_LIBS}
+  STATIC_LINK_LIBS arrow_static ${ARROW_FLIGHT_STATIC_LINK_LIBS})
+
+ADD_ARROW_TEST(flight-test
+  EXTRA_LINK_LIBS arrow_flight_static ${ARROW_FLIGHT_STATIC_LINK_LIBS}
+  LABELS "arrow_flight")
+
+# Build test server for unit tests or benchmarks
+if (ARROW_BUILD_TESTS OR ARROW_BUILD_BENCHMARKS)
+  add_executable(flight-test-server test-server.cc)
+  target_link_libraries(flight-test-server
+    arrow_flight_static
+    ${ARROW_FLIGHT_STATIC_LINK_LIBS}
+    gflags
+    gtest)
+
+  # This is needed for the unit tests
+  if (ARROW_BUILD_TESTS)
+    add_dependencies(flight-test flight-test-server)
+  endif()
+endif()
+
+if (ARROW_BUILD_BENCHMARKS)
+  # Perf server for benchmarks
+  set(PERF_PROTO_GENERATED_FILES
+    "${CMAKE_CURRENT_BINARY_DIR}/perf.pb.cc"
+    "${CMAKE_CURRENT_BINARY_DIR}/perf.pb.h")
+
+  add_custom_command(
+    OUTPUT ${PERF_PROTO_GENERATED_FILES}
+    COMMAND ${PROTOBUF_EXECUTABLE}
+    "-I${CMAKE_CURRENT_SOURCE_DIR}"
+    "--cpp_out=${CMAKE_CURRENT_BINARY_DIR}"
+    "perf.proto"
+    DEPENDS ${PROTO_DEPENDS})
+
+  add_executable(flight-perf-server
+    perf-server.cc
+    perf.pb.cc)
+  target_link_libraries(flight-perf-server
+    arrow_flight_static
+    ${ARROW_FLIGHT_STATIC_LINK_LIBS}
+    gflags
+    gtest)
+
+  add_executable(flight-benchmark
+    flight-benchmark.cc
+    perf.pb.cc)
+  target_link_libraries(flight-benchmark
+    arrow_flight_static
+    ${ARROW_FLIGHT_STATIC_LINK_LIBS}
+    gflags
+    gtest)
+
+  add_dependencies(flight-benchmark flight-perf-server)
+endif(ARROW_BUILD_BENCHMARKS)
diff --git a/cpp/src/arrow/flight/README.md b/cpp/src/arrow/flight/README.md
new file mode 100644
index 0000000..5156973
--- /dev/null
+++ b/cpp/src/arrow/flight/README.md
@@ -0,0 +1,36 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# Arrow Flight RPC System for C++
+
+## Development notes
+
+The gRPC protobuf plugin requires that libprotoc is in your
+`LD_LIBRARY_PATH`. Until we figure out a general solution, you may need to do:
+
+```
+export LD_LIBRARY_PATH=$PROTOBUF_HOME/lib:$LD_LIBRARY_PATH
+```
+
+Currently, to run the unit tests, the directory of executables must either be
+your current working directory or you need to add it to your path, e.g.
+
+```
+PATH=debug:$PATH debug/flight-test
+```
\ No newline at end of file
diff --git a/cpp/src/parquet/util/stopwatch.h b/cpp/src/arrow/flight/api.h
similarity index 57%
copy from cpp/src/parquet/util/stopwatch.h
copy to cpp/src/arrow/flight/api.h
index 68cf792..98b0840 100644
--- a/cpp/src/parquet/util/stopwatch.h
+++ b/cpp/src/arrow/flight/api.h
@@ -15,38 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef PARQUET_UTIL_STOPWATCH_H
-#define PARQUET_UTIL_STOPWATCH_H
+#pragma once
 
-#include <stdio.h>
-#ifndef _MSC_VER
-#include <sys/time.h>
-#endif
-
-#include <ctime>
-#include <iostream>
-
-namespace parquet {
-
-class StopWatch {
- public:
-  StopWatch() {}
-
-  void Start() { gettimeofday(&start_time, 0); }
-
-  // Returns time in nanoseconds.
-  uint64_t Stop() {
-    struct timeval t_time;
-    gettimeofday(&t_time, 0);
-
-    return (1000L * 1000L * 1000L * (t_time.tv_sec - start_time.tv_sec) +
-            (t_time.tv_usec - start_time.tv_usec));
-  }
-
- private:
-  struct timeval start_time;
-};
-
-}  // namespace parquet
-
-#endif
+#include "arrow/flight/client.h"
+#include "arrow/flight/server.h"
+#include "arrow/flight/types.h"
diff --git a/cpp/src/arrow/flight/client.cc b/cpp/src/arrow/flight/client.cc
new file mode 100644
index 0000000..94c4928
--- /dev/null
+++ b/cpp/src/arrow/flight/client.cc
@@ -0,0 +1,410 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/flight/client.h"
+
+#include <memory>
+#include <sstream>
+#include <string>
+#include <utility>
+
+#include "google/protobuf/io/coded_stream.h"
+#include "google/protobuf/wire_format_lite.h"
+#include "grpc/byte_buffer_reader.h"
+#include "grpcpp/grpcpp.h"
+
+#include "arrow/ipc/reader.h"
+#include "arrow/record_batch.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+#include "arrow/flight/Flight.grpc.pb.h"
+#include "arrow/flight/Flight.pb.h"
+#include "arrow/flight/internal.h"
+
+namespace pb = arrow::flight::protocol;
+
+namespace arrow {
+namespace flight {
+
+/// Internal, not user-visible type used for memory-efficient reads from gRPC
+/// stream
+struct FlightData {
+  /// Used only for puts, may be null
+  std::unique_ptr<FlightDescriptor> descriptor;
+
+  /// Non-length-prefixed Message header as described in format/Message.fbs
+  std::shared_ptr<Buffer> metadata;
+
+  /// Message body
+  std::shared_ptr<Buffer> body;
+};
+
+}  // namespace flight
+}  // namespace arrow
+
+namespace grpc {
+
+// Customizations to gRPC for more efficient deserialization of FlightData
+
+using google::protobuf::internal::WireFormatLite;
+using google::protobuf::io::CodedInputStream;
+
+using arrow::flight::FlightData;
+
+bool ReadBytesZeroCopy(const std::shared_ptr<arrow::Buffer>& source_data,
+                       CodedInputStream* input, std::shared_ptr<arrow::Buffer>* out) {
+  uint32_t length;
+  if (!input->ReadVarint32(&length)) {
+    return false;
+  }
+  *out = arrow::SliceBuffer(source_data, input->CurrentPosition(),
+                            static_cast<int64_t>(length));
+  return input->Skip(static_cast<int>(length));
+}
+
+// Internal wrapper for gRPC ByteBuffer so its memory can be exposed to Arrow
+// consumers with zero-copy
+class GrpcBuffer : public arrow::MutableBuffer {
+ public:
+  GrpcBuffer(grpc_slice slice, bool incref)
+      : MutableBuffer(GRPC_SLICE_START_PTR(slice),
+                      static_cast<int64_t>(GRPC_SLICE_LENGTH(slice))),
+        slice_(incref ? grpc_slice_ref(slice) : slice) {}
+
+  ~GrpcBuffer() override {
+    // Decref slice
+    grpc_slice_unref(slice_);
+  }
+
+  static arrow::Status Wrap(ByteBuffer* cpp_buf, std::shared_ptr<arrow::Buffer>* out) {
+    // These types are guaranteed by static assertions in gRPC to have the same
+    // in-memory representation
+
+    auto buffer = *reinterpret_cast<grpc_byte_buffer**>(cpp_buf);
+
+    // This part below is based on the Flatbuffers gRPC SerializationTraits in
+    // flatbuffers/grpc.h
+
+    // Check if this is a single uncompressed slice.
+    if ((buffer->type == GRPC_BB_RAW) &&
+        (buffer->data.raw.compression == GRPC_COMPRESS_NONE) &&
+        (buffer->data.raw.slice_buffer.count == 1)) {
+      // If it is, then we can reference the `grpc_slice` directly.
+      grpc_slice slice = buffer->data.raw.slice_buffer.slices[0];
+
+      // Increment reference count so this memory remains valid
+      *out = std::make_shared<GrpcBuffer>(slice, true);
+    } else {
+      // Otherwise, we need to use `grpc_byte_buffer_reader_readall` to read
+      // `buffer` into a single contiguous `grpc_slice`. The gRPC reader gives
+      // us back a new slice with the refcount already incremented.
+      grpc_byte_buffer_reader reader;
+      if (!grpc_byte_buffer_reader_init(&reader, buffer)) {
+        return arrow::Status::IOError("Internal gRPC error reading from ByteBuffer");
+      }
+      grpc_slice slice = grpc_byte_buffer_reader_readall(&reader);
+      grpc_byte_buffer_reader_destroy(&reader);
+
+      // Steal the slice reference
+      *out = std::make_shared<GrpcBuffer>(slice, false);
+    }
+
+    return arrow::Status::OK();
+  }
+
+ private:
+  grpc_slice slice_;
+};
+
+// Read internal::FlightData from grpc::ByteBuffer containing FlightData
+// protobuf without copying
+template <>
+class SerializationTraits<FlightData> {
+ public:
+  static Status Serialize(const FlightData& msg, ByteBuffer** buffer, bool* own_buffer) {
+    return Status(StatusCode::UNIMPLEMENTED,
+                  "internal::FlightData serialization not implemented");
+  }
+
+  static Status Deserialize(ByteBuffer* buffer, FlightData* out) {
+    if (!buffer) {
+      return Status(StatusCode::INTERNAL, "No payload");
+    }
+
+    std::shared_ptr<arrow::Buffer> wrapped_buffer;
+    GRPC_RETURN_NOT_OK(GrpcBuffer::Wrap(buffer, &wrapped_buffer));
+
+    auto buffer_length = static_cast<int>(wrapped_buffer->size());
+    CodedInputStream pb_stream(wrapped_buffer->data(), buffer_length);
+
+    // TODO(wesm): The 2-parameter version of this function is deprecated
+    pb_stream.SetTotalBytesLimit(buffer_length, -1 /* no threshold */);
+
+    // This is the bytes remaining when using CodedInputStream like this
+    while (pb_stream.BytesUntilTotalBytesLimit()) {
+      const uint32_t tag = pb_stream.ReadTag();
+      const int field_number = WireFormatLite::GetTagFieldNumber(tag);
+      switch (field_number) {
+        case pb::FlightData::kFlightDescriptorFieldNumber: {
+          pb::FlightDescriptor pb_descriptor;
+          if (!pb_descriptor.ParseFromCodedStream(&pb_stream)) {
+            return Status(StatusCode::INTERNAL, "Unable to parse FlightDescriptor");
+          }
+        } break;
+        case pb::FlightData::kDataHeaderFieldNumber: {
+          if (!ReadBytesZeroCopy(wrapped_buffer, &pb_stream, &out->metadata)) {
+            return Status(StatusCode::INTERNAL, "Unable to read FlightData metadata");
+          }
+        } break;
+        case pb::FlightData::kDataBodyFieldNumber: {
+          if (!ReadBytesZeroCopy(wrapped_buffer, &pb_stream, &out->body)) {
+            return Status(StatusCode::INTERNAL, "Unable to read FlightData body");
+          }
+        } break;
+        default:
+          DCHECK(false) << "cannot happen";
+      }
+    }
+    buffer->Clear();
+
+    // TODO(wesm): Where and when should we verify that the FlightData is not
+    // malformed or missing components?
+
+    return Status::OK;
+  }
+};
+
+}  // namespace grpc
+
+namespace arrow {
+namespace flight {
+
+struct ClientRpc {
+  grpc::ClientContext context;
+
+  ClientRpc() {
+    /// XXX workaround until we have a handshake in Connect
+    context.set_wait_for_ready(true);
+  }
+};
+
+class FlightStreamReader : public RecordBatchReader {
+ public:
+  FlightStreamReader(std::unique_ptr<ClientRpc> rpc,
+                     const std::shared_ptr<Schema>& schema,
+                     std::unique_ptr<grpc::ClientReader<pb::FlightData>> stream)
+      : rpc_(std::move(rpc)),
+        stream_finished_(false),
+        schema_(schema),
+        stream_(std::move(stream)) {}
+
+  std::shared_ptr<Schema> schema() const override { return schema_; }
+
+  Status ReadNext(std::shared_ptr<RecordBatch>* out) override {
+    FlightData data;
+
+    if (stream_finished_) {
+      *out = nullptr;
+      return Status::OK();
+    }
+
+    // For customizing read path for better memory/serialization efficiency
+    auto custom_reader = reinterpret_cast<grpc::ClientReader<FlightData>*>(stream_.get());
+
+    if (custom_reader->Read(&data)) {
+      std::unique_ptr<ipc::Message> message;
+
+      // Validate IPC message
+      RETURN_NOT_OK(ipc::Message::Open(data.metadata, data.body, &message));
+      return ipc::ReadRecordBatch(*message, schema_, out);
+    } else {
+      // Stream is completed
+      stream_finished_ = true;
+      *out = nullptr;
+      return internal::FromGrpcStatus(stream_->Finish());
+    }
+  }
+
+ private:
+  // The RPC context lifetime must be coupled to the ClientReader
+  std::unique_ptr<ClientRpc> rpc_;
+
+  bool stream_finished_;
+  std::shared_ptr<Schema> schema_;
+  std::unique_ptr<grpc::ClientReader<pb::FlightData>> stream_;
+};
+
+class FlightClient::FlightClientImpl {
+ public:
+  Status Connect(const std::string& host, int port) {
+    // TODO(wesm): Support other kinds of GRPC ChannelCredentials
+    std::stringstream ss;
+    ss << host << ":" << port;
+    std::string uri = ss.str();
+
+    stub_ = pb::FlightService::NewStub(
+        grpc::CreateChannel(ss.str(), grpc::InsecureChannelCredentials()));
+    return Status::OK();
+  }
+
+  Status ListFlights(const Criteria& criteria, std::unique_ptr<FlightListing>* listing) {
+    // TODO(wesm): populate criteria
+    pb::Criteria pb_criteria;
+
+    ClientRpc rpc;
+    std::unique_ptr<grpc::ClientReader<pb::FlightGetInfo>> stream(
+        stub_->ListFlights(&rpc.context, pb_criteria));
+
+    std::vector<FlightInfo> flights;
+
+    pb::FlightGetInfo pb_info;
+    FlightInfo::Data info_data;
+    while (stream->Read(&pb_info)) {
+      RETURN_NOT_OK(internal::FromProto(pb_info, &info_data));
+      flights.emplace_back(FlightInfo(std::move(info_data)));
+    }
+
+    listing->reset(new SimpleFlightListing(flights));
+    return internal::FromGrpcStatus(stream->Finish());
+  }
+
+  Status DoAction(const Action& action, std::unique_ptr<ResultStream>* results) {
+    pb::Action pb_action;
+    RETURN_NOT_OK(internal::ToProto(action, &pb_action));
+
+    ClientRpc rpc;
+    std::unique_ptr<grpc::ClientReader<pb::Result>> stream(
+        stub_->DoAction(&rpc.context, pb_action));
+
+    pb::Result pb_result;
+
+    std::vector<Result> materialized_results;
+    while (stream->Read(&pb_result)) {
+      Result result;
+      RETURN_NOT_OK(internal::FromProto(pb_result, &result));
+      materialized_results.emplace_back(std::move(result));
+    }
+
+    *results = std::unique_ptr<ResultStream>(
+        new SimpleResultStream(std::move(materialized_results)));
+    return internal::FromGrpcStatus(stream->Finish());
+  }
+
+  Status ListActions(std::vector<ActionType>* types) {
+    pb::Empty empty;
+
+    ClientRpc rpc;
+    std::unique_ptr<grpc::ClientReader<pb::ActionType>> stream(
+        stub_->ListActions(&rpc.context, empty));
+
+    pb::ActionType pb_type;
+    ActionType type;
+    while (stream->Read(&pb_type)) {
+      RETURN_NOT_OK(internal::FromProto(pb_type, &type));
+      types->emplace_back(std::move(type));
+    }
+    return internal::FromGrpcStatus(stream->Finish());
+  }
+
+  Status GetFlightInfo(const FlightDescriptor& descriptor,
+                       std::unique_ptr<FlightInfo>* info) {
+    pb::FlightDescriptor pb_descriptor;
+    pb::FlightGetInfo pb_response;
+
+    RETURN_NOT_OK(internal::ToProto(descriptor, &pb_descriptor));
+
+    ClientRpc rpc;
+    Status s = internal::FromGrpcStatus(
+        stub_->GetFlightInfo(&rpc.context, pb_descriptor, &pb_response));
+    RETURN_NOT_OK(s);
+
+    FlightInfo::Data info_data;
+    RETURN_NOT_OK(internal::FromProto(pb_response, &info_data));
+    info->reset(new FlightInfo(std::move(info_data)));
+    return Status::OK();
+  }
+
+  Status DoGet(const Ticket& ticket, const std::shared_ptr<Schema>& schema,
+               std::unique_ptr<RecordBatchReader>* out) {
+    pb::Ticket pb_ticket;
+    internal::ToProto(ticket, &pb_ticket);
+
+    // ClientRpc rpc;
+    std::unique_ptr<ClientRpc> rpc(new ClientRpc);
+    std::unique_ptr<grpc::ClientReader<pb::FlightData>> stream(
+        stub_->DoGet(&rpc->context, pb_ticket));
+
+    *out = std::unique_ptr<RecordBatchReader>(
+        new FlightStreamReader(std::move(rpc), schema, std::move(stream)));
+    return Status::OK();
+  }
+
+  Status DoPut(std::unique_ptr<FlightPutWriter>* stream) {
+    return Status::NotImplemented("DoPut");
+  }
+
+ private:
+  std::unique_ptr<pb::FlightService::Stub> stub_;
+};
+
+FlightClient::FlightClient() { impl_.reset(new FlightClientImpl); }
+
+FlightClient::~FlightClient() {}
+
+Status FlightClient::Connect(const std::string& host, int port,
+                             std::unique_ptr<FlightClient>* client) {
+  client->reset(new FlightClient);
+  return (*client)->impl_->Connect(host, port);
+}
+
+Status FlightClient::DoAction(const Action& action,
+                              std::unique_ptr<ResultStream>* results) {
+  return impl_->DoAction(action, results);
+}
+
+Status FlightClient::ListActions(std::vector<ActionType>* actions) {
+  return impl_->ListActions(actions);
+}
+
+Status FlightClient::GetFlightInfo(const FlightDescriptor& descriptor,
+                                   std::unique_ptr<FlightInfo>* info) {
+  return impl_->GetFlightInfo(descriptor, info);
+}
+
+Status FlightClient::ListFlights(std::unique_ptr<FlightListing>* listing) {
+  return ListFlights({}, listing);
+}
+
+Status FlightClient::ListFlights(const Criteria& criteria,
+                                 std::unique_ptr<FlightListing>* listing) {
+  return impl_->ListFlights(criteria, listing);
+}
+
+Status FlightClient::DoGet(const Ticket& ticket, const std::shared_ptr<Schema>& schema,
+                           std::unique_ptr<RecordBatchReader>* stream) {
+  return impl_->DoGet(ticket, schema, stream);
+}
+
+Status FlightClient::DoPut(const Schema& schema,
+                           std::unique_ptr<FlightPutWriter>* stream) {
+  return Status::NotImplemented("DoPut");
+}
+
+}  // namespace flight
+}  // namespace arrow
diff --git a/cpp/src/arrow/flight/client.h b/cpp/src/arrow/flight/client.h
new file mode 100644
index 0000000..be3d86a
--- /dev/null
+++ b/cpp/src/arrow/flight/client.h
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// \brief Implementation of Flight RPC client using gRPC. API should be
+// considered experimental for now
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/status.h"
+#include "arrow/util/visibility.h"
+
+#include "arrow/flight/types.h"
+
+namespace arrow {
+
+class RecordBatch;
+class RecordBatchReader;
+class Schema;
+
+namespace flight {
+
+/// \brief Client class for Arrow Flight RPC services (gRPC-based).
+/// API experimental for now
+class ARROW_EXPORT FlightClient {
+ public:
+  ~FlightClient();
+
+  /// \brief Connect to an unauthenticated flight service
+  /// \param[in] host the hostname or IP address
+  /// \param[in] port the port on the host
+  /// \param[out] client the created FlightClient
+  /// \return Status OK status may not indicate that the connection was
+  /// successful
+  static Status Connect(const std::string& host, int port,
+                        std::unique_ptr<FlightClient>* client);
+
+  /// \brief Perform the indicated action, returning an iterator to the stream
+  /// of results, if any
+  /// \param[in] action the action to be performed
+  /// \param[out] results an iterator object for reading the returned results
+  /// \return Status
+  Status DoAction(const Action& action, std::unique_ptr<ResultStream>* results);
+
+  /// \brief Retrieve a list of available Action types
+  /// \param[out] actions the available actions
+  /// \return Status
+  Status ListActions(std::vector<ActionType>* actions);
+
+  /// \brief Request access plan for a single flight, which may be an existing
+  /// dataset or a command to be executed
+  /// \param[in] descriptor the dataset request, whether a named dataset or
+  /// command
+  /// \param[out] info the FlightInfo describing where to access the dataset
+  /// \return Status
+  Status GetFlightInfo(const FlightDescriptor& descriptor,
+                       std::unique_ptr<FlightInfo>* info);
+
+  /// \brief List all available flights known to the server
+  /// \param[out] listing an iterator that returns a FlightInfo for each flight
+  /// \return Status
+  Status ListFlights(std::unique_ptr<FlightListing>* listing);
+
+  /// \brief List available flights given indicated filter criteria
+  /// \param[in] criteria the filter criteria (opaque)
+  /// \param[out] listing an iterator that returns a FlightInfo for each flight
+  /// \return Status
+  Status ListFlights(const Criteria& criteria, std::unique_ptr<FlightListing>* listing);
+
+  /// \brief Given a flight ticket and schema, request to be sent the
+  /// stream. Returns record batch stream reader
+  /// \param[in] ticket
+  /// \param[in] schema the arrow::Schema for the stream as computed by
+  /// GetFlightInfo
+  /// \param[out] stream the returned RecordBatchReader
+  /// \return Status
+  Status DoGet(const Ticket& ticket, const std::shared_ptr<Schema>& schema,
+               std::unique_ptr<RecordBatchReader>* stream);
+
+  /// \brief Initiate DoPut RPC, returns FlightPutWriter interface to
+  /// write. Not yet implemented
+  /// \param[out] stream the created stream to write record batches to
+  /// \return Status
+  Status DoPut(const Schema& schema, std::unique_ptr<FlightPutWriter>* stream);
+
+ private:
+  FlightClient();
+  class FlightClientImpl;
+  std::unique_ptr<FlightClientImpl> impl_;
+};
+
+}  // namespace flight
+}  // namespace arrow
diff --git a/cpp/src/arrow/flight/flight-benchmark.cc b/cpp/src/arrow/flight/flight-benchmark.cc
new file mode 100644
index 0000000..ac50ab0
--- /dev/null
+++ b/cpp/src/arrow/flight/flight-benchmark.cc
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <mutex>
+#include <sstream>
+#include <string>
+#include <vector>
+
+#include <gflags/gflags.h>
+
+#include "arrow/api.h"
+#include "arrow/io/memory.h"
+#include "arrow/ipc/api.h"
+#include "arrow/record_batch.h"
+#include "arrow/test-util.h"
+#include "arrow/util/stopwatch.h"
+#include "arrow/util/thread-pool.h"
+
+#include "arrow/flight/api.h"
+#include "arrow/flight/perf.pb.h"
+#include "arrow/flight/test-util.h"
+
+DEFINE_int32(num_servers, 1, "Number of performance servers to run");
+DEFINE_int32(num_streams, 4, "Number of streams for each server");
+DEFINE_int32(num_threads, 4, "Number of concurrent gets");
+DEFINE_int32(records_per_stream, 10000000, "Total records per stream");
+DEFINE_int32(records_per_batch, 4096, "Total records per batch within stream");
+
+namespace perf = arrow::flight::perf;
+
+using ThreadPool = ::arrow::internal::ThreadPool;
+
+namespace arrow {
+namespace flight {
+
+struct PerformanceStats {
+  PerformanceStats() : total_records(0), total_bytes(0) {}
+  std::mutex mutex;
+  int64_t total_records;
+  int64_t total_bytes;
+
+  void Update(const int64_t total_records, const int64_t total_bytes) {
+    std::lock_guard<std::mutex> lock(this->mutex);
+    this->total_records += total_records;
+    this->total_bytes += total_bytes;
+  }
+};
+
+Status RunPerformanceTest(const int port) {
+  // TODO(wesm): Multiple servers
+  // std::vector<std::unique_ptr<TestServer>> servers;
+
+  // schema not needed
+  perf::Perf perf;
+  perf.set_stream_count(FLAGS_num_streams);
+  perf.set_records_per_stream(FLAGS_records_per_stream);
+  perf.set_records_per_batch(FLAGS_records_per_batch);
+
+  // Construct client and plan the query
+  std::unique_ptr<FlightClient> client;
+  RETURN_NOT_OK(FlightClient::Connect("localhost", port, &client));
+
+  FlightDescriptor descriptor;
+  descriptor.type = FlightDescriptor::CMD;
+  perf.SerializeToString(&descriptor.cmd);
+
+  std::unique_ptr<FlightInfo> plan;
+  RETURN_NOT_OK(client->GetFlightInfo(descriptor, &plan));
+
+  // Read the streams in parallel
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(plan->GetSchema(&schema));
+
+  PerformanceStats stats;
+  auto ConsumeStream = [&stats, &schema, &port](const FlightEndpoint& endpoint) {
+    // TODO(wesm): Use location from endpoint, same host/port for now
+    std::unique_ptr<FlightClient> client;
+    RETURN_NOT_OK(FlightClient::Connect("localhost", port, &client));
+
+    perf::Token token;
+    token.ParseFromString(endpoint.ticket.ticket);
+
+    std::unique_ptr<RecordBatchReader> reader;
+    RETURN_NOT_OK(client->DoGet(endpoint.ticket, schema, &reader));
+
+    std::shared_ptr<RecordBatch> batch;
+
+    // This is hard-coded for right now, 4 columns each with int64
+    const int bytes_per_record = 32;
+
+    // This must also be set in perf-server.c
+    const bool verify = false;
+
+    int64_t num_bytes = 0;
+    int64_t num_records = 0;
+    while (true) {
+      RETURN_NOT_OK(reader->ReadNext(&batch));
+      if (!batch) {
+        break;
+      }
+
+      if (verify) {
+        auto values =
+            reinterpret_cast<const int64_t*>(batch->column_data(0)->buffers[1]->data());
+        const int64_t start = token.start() + num_records;
+        for (int64_t i = 0; i < batch->num_rows(); ++i) {
+          if (values[i] != start + i) {
+            return Status::Invalid("verification failure");
+          }
+        }
+      }
+
+      num_records += batch->num_rows();
+
+      // Hard-coded
+      num_bytes += batch->num_rows() * bytes_per_record;
+    }
+    stats.Update(num_records, num_bytes);
+    return Status::OK();
+  };
+
+  StopWatch timer;
+  timer.Start();
+
+  // XXX(wesm): Serial version for debugging
+  // for (const auto& endpoint : plan->endpoints()) {
+  //   RETURN_NOT_OK(ConsumeStream(endpoint));
+  // }
+
+  std::shared_ptr<ThreadPool> pool;
+  RETURN_NOT_OK(ThreadPool::Make(FLAGS_num_threads, &pool));
+  std::vector<std::future<Status>> tasks;
+  for (const auto& endpoint : plan->endpoints()) {
+    tasks.emplace_back(pool->Submit(ConsumeStream, endpoint));
+  }
+
+  // Wait for tasks to finish
+  for (auto&& task : tasks) {
+    RETURN_NOT_OK(task.get());
+  }
+
+  // Elapsed time in seconds
+  uint64_t elapsed_nanos = timer.Stop();
+  double time_elapsed = elapsed_nanos / static_cast<double>(1000000000LL);
+
+  constexpr double kMegabyte = static_cast<double>(1 << 20);
+
+  // Check that number of rows read is as expected
+  if (stats.total_records != static_cast<int64_t>(plan->total_records())) {
+    return Status::Invalid("Did not consume expected number of records");
+  }
+
+  std::cout << "Bytes read: " << stats.total_bytes << std::endl;
+  std::cout << "Nanos: " << elapsed_nanos << std::endl;
+  std::cout << "Speed: " << (stats.total_bytes / time_elapsed / kMegabyte) << " MB/s"
+            << std::endl;
+  return Status::OK();
+}
+
+}  // namespace flight
+}  // namespace arrow
+
+int main(int argc, char** argv) {
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  const int port = 31337;
+  arrow::flight::TestServer server("flight-perf-server", port);
+  server.Start();
+
+  arrow::Status s = arrow::flight::RunPerformanceTest(port);
+  server.Stop();
+
+  if (!s.ok()) {
+    std::cerr << "Failed with error: << " << s.ToString() << std::endl;
+  }
+
+  return 0;
+}
diff --git a/cpp/src/arrow/flight/flight-test.cc b/cpp/src/arrow/flight/flight-test.cc
new file mode 100644
index 0000000..2d1b2f8
--- /dev/null
+++ b/cpp/src/arrow/flight/flight-test.cc
@@ -0,0 +1,267 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef _WIN32
+#include <sys/stat.h>
+#include <sys/wait.h>
+#include <unistd.h>
+#endif
+
+#include <chrono>
+#include <cstdint>
+#include <cstdio>
+#include <cstring>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+#include <boost/process.hpp>
+
+#include "arrow/ipc/test-common.h"
+#include "arrow/status.h"
+#include "arrow/test-util.h"
+
+#include "arrow/flight/api.h"
+
+#ifdef GRPCPP_GRPCPP_H
+#error "gRPC headers should not be in public API"
+#endif
+
+#include "arrow/flight/Flight.pb.h"
+#include "arrow/flight/internal.h"
+#include "arrow/flight/test-util.h"
+
+namespace pb = arrow::flight::protocol;
+
+namespace arrow {
+namespace flight {
+
+TEST(TestFlight, StartStopTestServer) {
+  TestServer server("flight-test-server", 92385);
+  server.Start();
+  ASSERT_TRUE(server.IsRunning());
+
+  sleep_for(0.2);
+
+  ASSERT_TRUE(server.IsRunning());
+  int exit_code = server.Stop();
+  ASSERT_EQ(0, exit_code);
+}
+
+// ----------------------------------------------------------------------
+// Client tests
+
+class TestFlightClient : public ::testing::Test {
+ public:
+  // Uncomment these when you want to run the server separately for
+  // debugging/valgrind/gdb
+
+  // void SetUp() {
+  //   port_ = 92358;
+  //   ASSERT_OK(ConnectClient());
+  // }
+  // void TearDown() {}
+
+  void SetUp() {
+    port_ = 92358;
+    server_.reset(new TestServer("flight-test-server", port_));
+    server_->Start();
+    ASSERT_OK(ConnectClient());
+  }
+
+  void TearDown() { server_->Stop(); }
+
+  Status ConnectClient() { return FlightClient::Connect("localhost", port_, &client_); }
+
+ protected:
+  int port_;
+  std::unique_ptr<FlightClient> client_;
+  std::unique_ptr<TestServer> server_;
+};
+
+// The server implementation is in test-server.cc; to make changes to the
+// expected results, make edits there
+void AssertEqual(const FlightDescriptor& expected, const FlightDescriptor& actual) {}
+
+void AssertEqual(const Ticket& expected, const Ticket& actual) {
+  ASSERT_EQ(expected.ticket, actual.ticket);
+}
+
+void AssertEqual(const Location& expected, const Location& actual) {
+  ASSERT_EQ(expected.host, actual.host);
+  ASSERT_EQ(expected.port, actual.port);
+}
+
+void AssertEqual(const std::vector<FlightEndpoint>& expected,
+                 const std::vector<FlightEndpoint>& actual) {
+  ASSERT_EQ(expected.size(), actual.size());
+  for (size_t i = 0; i < expected.size(); ++i) {
+    AssertEqual(expected[i].ticket, actual[i].ticket);
+
+    ASSERT_EQ(expected[i].locations.size(), actual[i].locations.size());
+    for (size_t j = 0; j < expected[i].locations.size(); ++j) {
+      AssertEqual(expected[i].locations[j], actual[i].locations[j]);
+    }
+  }
+}
+
+void AssertEqual(const FlightInfo& expected, const FlightInfo& actual) {
+  std::shared_ptr<Schema> ex_schema, actual_schema;
+  ASSERT_OK(expected.GetSchema(&ex_schema));
+  ASSERT_OK(actual.GetSchema(&actual_schema));
+
+  AssertSchemaEqual(*ex_schema, *actual_schema);
+  ASSERT_EQ(expected.total_records(), actual.total_records());
+  ASSERT_EQ(expected.total_bytes(), actual.total_bytes());
+
+  AssertEqual(expected.descriptor(), actual.descriptor());
+  AssertEqual(expected.endpoints(), actual.endpoints());
+}
+
+void AssertEqual(const ActionType& expected, const ActionType& actual) {
+  ASSERT_EQ(expected.type, actual.type);
+  ASSERT_EQ(expected.description, actual.description);
+}
+
+template <typename T>
+void AssertEqual(const std::vector<T>& expected, const std::vector<T>& actual) {
+  ASSERT_EQ(expected.size(), actual.size());
+  for (size_t i = 0; i < expected.size(); ++i) {
+    AssertEqual(expected[i], actual[i]);
+  }
+}
+
+TEST_F(TestFlightClient, ListFlights) {
+  std::unique_ptr<FlightListing> listing;
+  ASSERT_OK(client_->ListFlights(&listing));
+  ASSERT_TRUE(listing != nullptr);
+
+  std::vector<FlightInfo> flights = ExampleFlightInfo();
+  std::unique_ptr<FlightInfo> info;
+
+  for (const FlightInfo& flight : flights) {
+    ASSERT_OK(listing->Next(&info));
+    AssertEqual(flight, *info);
+  }
+  ASSERT_OK(listing->Next(&info));
+  ASSERT_TRUE(info == nullptr);
+
+  ASSERT_OK(listing->Next(&info));
+}
+
+TEST_F(TestFlightClient, GetFlightInfo) {
+  FlightDescriptor descr{FlightDescriptor::PATH, "", {"foo", "bar"}};
+  std::unique_ptr<FlightInfo> info;
+  ASSERT_OK(client_->GetFlightInfo(descr, &info));
+
+  ASSERT_TRUE(info != nullptr);
+
+  std::vector<FlightInfo> flights = ExampleFlightInfo();
+  AssertEqual(flights[0], *info);
+}
+
+TEST(TestFlightProtocol, FlightDescriptor) {
+  FlightDescriptor descr_test;
+  pb::FlightDescriptor pb_descr;
+
+  FlightDescriptor descr1{FlightDescriptor::PATH, "", {"foo", "bar"}};
+  ASSERT_OK(internal::ToProto(descr1, &pb_descr));
+  ASSERT_OK(internal::FromProto(pb_descr, &descr_test));
+  AssertEqual(descr1, descr_test);
+
+  FlightDescriptor descr2{FlightDescriptor::CMD, "command", {}};
+  ASSERT_OK(internal::ToProto(descr2, &pb_descr));
+  ASSERT_OK(internal::FromProto(pb_descr, &descr_test));
+  AssertEqual(descr2, descr_test);
+}
+
+TEST_F(TestFlightClient, DoGet) {
+  FlightDescriptor descr{FlightDescriptor::PATH, "", {"foo", "bar"}};
+  std::unique_ptr<FlightInfo> info;
+  ASSERT_OK(client_->GetFlightInfo(descr, &info));
+
+  // Two endpoints in the example FlightInfo
+  ASSERT_EQ(2, info->endpoints().size());
+
+  Ticket ticket = info->endpoints()[0].ticket;
+  AssertEqual(Ticket{"ticket-id-1"}, ticket);
+
+  std::shared_ptr<Schema> schema;
+  ASSERT_OK(info->GetSchema(&schema));
+
+  auto expected_schema = ExampleSchema1();
+  AssertSchemaEqual(*expected_schema, *schema);
+
+  std::unique_ptr<RecordBatchReader> stream;
+  ASSERT_OK(client_->DoGet(ticket, schema, &stream));
+
+  BatchVector expected_batches;
+  const int num_batches = 5;
+  ASSERT_OK(SimpleIntegerBatches(num_batches, &expected_batches));
+  std::shared_ptr<RecordBatch> chunk;
+  for (int i = 0; i < num_batches; ++i) {
+    ASSERT_OK(stream->ReadNext(&chunk));
+    ASSERT_BATCHES_EQUAL(*expected_batches[i], *chunk);
+  }
+
+  // Stream exhausted
+  ASSERT_OK(stream->ReadNext(&chunk));
+  ASSERT_EQ(nullptr, chunk);
+}
+
+TEST_F(TestFlightClient, ListActions) {
+  std::vector<ActionType> actions;
+  ASSERT_OK(client_->ListActions(&actions));
+
+  std::vector<ActionType> expected = ExampleActionTypes();
+  AssertEqual(expected, actions);
+}
+
+TEST_F(TestFlightClient, DoAction) {
+  Action action;
+  std::unique_ptr<ResultStream> stream;
+  std::unique_ptr<Result> result;
+
+  // Run action1
+  action.type = "action1";
+
+  const std::string action1_value = "action1-content";
+  ASSERT_OK(Buffer::FromString(action1_value, &action.body));
+  ASSERT_OK(client_->DoAction(action, &stream));
+
+  for (int i = 0; i < 3; ++i) {
+    ASSERT_OK(stream->Next(&result));
+    std::string expected = action1_value + "-part" + std::to_string(i);
+    ASSERT_EQ(expected, result->body->ToString());
+  }
+
+  // stream consumed
+  ASSERT_OK(stream->Next(&result));
+  ASSERT_EQ(nullptr, result);
+
+  // Run action2, no results
+  action.type = "action2";
+  ASSERT_OK(client_->DoAction(action, &stream));
+
+  ASSERT_OK(stream->Next(&result));
+  ASSERT_EQ(nullptr, result);
+}
+
+}  // namespace flight
+}  // namespace arrow
diff --git a/cpp/src/arrow/flight/internal.cc b/cpp/src/arrow/flight/internal.cc
new file mode 100644
index 0000000..796e609
--- /dev/null
+++ b/cpp/src/arrow/flight/internal.cc
@@ -0,0 +1,235 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/flight/internal.h"
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include <grpcpp/grpcpp.h>
+
+#include "arrow/io/memory.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/ipc/writer.h"
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+namespace flight {
+namespace internal {
+
+Status FromGrpcStatus(const grpc::Status& grpc_status) {
+  if (grpc_status.ok()) {
+    return Status::OK();
+  }
+  std::stringstream ss;
+
+  if (grpc_status.error_code() == grpc::StatusCode::UNIMPLEMENTED) {
+    ss << "gRPC returned unimplemented error, with message: "
+       << grpc_status.error_message();
+    return Status::NotImplemented(ss.str());
+  } else {
+    ss << "gRPC failed with error code " << grpc_status.error_code()
+       << " and message: " << grpc_status.error_message();
+    return Status::IOError(ss.str());
+  }
+}
+
+grpc::Status ToGrpcStatus(const Status& arrow_status) {
+  if (arrow_status.ok()) {
+    return grpc::Status::OK;
+  } else {
+    grpc::StatusCode grpc_code = grpc::StatusCode::UNKNOWN;
+    if (arrow_status.IsNotImplemented()) {
+      grpc_code = grpc::StatusCode::UNIMPLEMENTED;
+    } else if (arrow_status.IsInvalid()) {
+      grpc_code = grpc::StatusCode::INVALID_ARGUMENT;
+    }
+    return grpc::Status(grpc_code, arrow_status.message());
+  }
+}
+
+// ActionType
+
+Status FromProto(const pb::ActionType& pb_type, ActionType* type) {
+  type->type = pb_type.type();
+  type->description = pb_type.description();
+  return Status::OK();
+}
+
+Status ToProto(const ActionType& type, pb::ActionType* pb_type) {
+  pb_type->set_type(type.type);
+  pb_type->set_description(type.description);
+  return Status::OK();
+}
+
+// Action
+
+Status FromProto(const pb::Action& pb_action, Action* action) {
+  action->type = pb_action.type();
+  return Buffer::FromString(pb_action.body(), &action->body);
+}
+
+Status ToProto(const Action& action, pb::Action* pb_action) {
+  pb_action->set_type(action.type);
+  pb_action->set_body(action.body->ToString());
+  return Status::OK();
+}
+
+// Result (of an Action)
+
+Status FromProto(const pb::Result& pb_result, Result* result) {
+  // ARROW-3250; can avoid copy. Can also write custom deserializer if it
+  // becomes an issue
+  return Buffer::FromString(pb_result.body(), &result->body);
+}
+
+Status ToProto(const Result& result, pb::Result* pb_result) {
+  pb_result->set_body(result.body->ToString());
+  return Status::OK();
+}
+
+// Criteria
+
+Status FromProto(const pb::Criteria& pb_criteria, Criteria* criteria) {
+  return Status::OK();
+}
+
+// Location
+
+Status FromProto(const pb::Location& pb_location, Location* location) {
+  location->host = pb_location.host();
+  location->port = pb_location.port();
+  return Status::OK();
+}
+
+void ToProto(const Location& location, pb::Location* pb_location) {
+  pb_location->set_host(location.host);
+  pb_location->set_port(location.port);
+}
+
+// Ticket
+
+Status FromProto(const pb::Ticket& pb_ticket, Ticket* ticket) {
+  ticket->ticket = pb_ticket.ticket();
+  return Status::OK();
+}
+
+void ToProto(const Ticket& ticket, pb::Ticket* pb_ticket) {
+  pb_ticket->set_ticket(ticket.ticket);
+}
+
+// FlightEndpoint
+
+Status FromProto(const pb::FlightEndpoint& pb_endpoint, FlightEndpoint* endpoint) {
+  RETURN_NOT_OK(FromProto(pb_endpoint.ticket(), &endpoint->ticket));
+  endpoint->locations.resize(pb_endpoint.location_size());
+  for (int i = 0; i < pb_endpoint.location_size(); ++i) {
+    RETURN_NOT_OK(FromProto(pb_endpoint.location(i), &endpoint->locations[i]));
+  }
+  return Status::OK();
+}
+
+void ToProto(const FlightEndpoint& endpoint, pb::FlightEndpoint* pb_endpoint) {
+  ToProto(endpoint.ticket, pb_endpoint->mutable_ticket());
+  pb_endpoint->clear_location();
+  for (const Location& location : endpoint.locations) {
+    ToProto(location, pb_endpoint->add_location());
+  }
+}
+
+// FlightDescriptor
+
+Status FromProto(const pb::FlightDescriptor& pb_descriptor,
+                 FlightDescriptor* descriptor) {
+  if (pb_descriptor.type() == pb::FlightDescriptor::PATH) {
+    descriptor->type = FlightDescriptor::PATH;
+    descriptor->path.resize(pb_descriptor.path_size());
+    for (int i = 0; i < pb_descriptor.path_size(); ++i) {
+      descriptor->path.emplace_back(pb_descriptor.path(i));
+    }
+  } else if (pb_descriptor.type() == pb::FlightDescriptor::CMD) {
+    descriptor->type = FlightDescriptor::CMD;
+    descriptor->cmd = pb_descriptor.cmd();
+  } else {
+    return Status::Invalid("Client sent UNKNOWN descriptor type");
+  }
+  return Status::OK();
+}
+
+Status ToProto(const FlightDescriptor& descriptor, pb::FlightDescriptor* pb_descriptor) {
+  if (descriptor.type == FlightDescriptor::PATH) {
+    pb_descriptor->set_type(pb::FlightDescriptor::PATH);
+    for (const std::string& path : descriptor.path) {
+      pb_descriptor->add_path(path);
+    }
+  } else {
+    pb_descriptor->set_type(pb::FlightDescriptor::CMD);
+    pb_descriptor->set_cmd(descriptor.cmd);
+  }
+  return Status::OK();
+}
+
+// FlightGetInfo
+
+Status FromProto(const pb::FlightGetInfo& pb_info, FlightInfo::Data* info) {
+  RETURN_NOT_OK(FromProto(pb_info.flight_descriptor(), &info->descriptor));
+
+  info->schema = pb_info.schema();
+
+  info->endpoints.resize(pb_info.endpoint_size());
+  for (int i = 0; i < pb_info.endpoint_size(); ++i) {
+    RETURN_NOT_OK(FromProto(pb_info.endpoint(i), &info->endpoints[i]));
+  }
+
+  info->total_records = pb_info.total_records();
+  info->total_bytes = pb_info.total_bytes();
+  return Status::OK();
+}
+
+Status SchemaToString(const Schema& schema, std::string* out) {
+  // TODO(wesm): Do we care about better memory efficiency here?
+  std::shared_ptr<Buffer> serialized_schema;
+  RETURN_NOT_OK(ipc::SerializeSchema(schema, default_memory_pool(), &serialized_schema));
+  *out = std::string(reinterpret_cast<const char*>(serialized_schema->data()),
+                     static_cast<size_t>(serialized_schema->size()));
+  return Status::OK();
+}
+
+Status ToProto(const FlightInfo& info, pb::FlightGetInfo* pb_info) {
+  // clear any repeated fields
+  pb_info->clear_endpoint();
+
+  pb_info->set_schema(info.serialized_schema());
+
+  // descriptor
+  RETURN_NOT_OK(ToProto(info.descriptor(), pb_info->mutable_flight_descriptor()));
+
+  // endpoints
+  for (const FlightEndpoint& endpoint : info.endpoints()) {
+    ToProto(endpoint, pb_info->add_endpoint());
+  }
+
+  pb_info->set_total_records(info.total_records());
+  pb_info->set_total_bytes(info.total_bytes());
+  return Status::OK();
+}
+
+}  // namespace internal
+}  // namespace flight
+}  // namespace arrow
diff --git a/cpp/src/arrow/flight/internal.h b/cpp/src/arrow/flight/internal.h
new file mode 100644
index 0000000..bae1eed
--- /dev/null
+++ b/cpp/src/arrow/flight/internal.h
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include <grpcpp/grpcpp.h>
+
+#include "arrow/buffer.h"
+#include "arrow/ipc/writer.h"
+#include "arrow/util/macros.h"
+
+#include "arrow/flight/Flight.grpc.pb.h"
+#include "arrow/flight/Flight.pb.h"
+#include "arrow/flight/types.h"
+
+namespace arrow {
+
+class Schema;
+class Status;
+
+namespace pb = arrow::flight::protocol;
+
+namespace flight {
+
+#define GRPC_RETURN_NOT_OK(s)                             \
+  do {                                                    \
+    ::arrow::Status _s = (s);                             \
+    if (ARROW_PREDICT_FALSE(!_s.ok())) {                  \
+      return ::arrow::flight::internal::ToGrpcStatus(_s); \
+    }                                                     \
+  } while (0)
+
+namespace internal {
+
+Status SchemaToString(const Schema& schema, std::string* out);
+
+Status FromProto(const pb::ActionType& pb_type, ActionType* type);
+Status FromProto(const pb::Action& pb_action, Action* action);
+Status FromProto(const pb::Result& pb_result, Result* result);
+Status FromProto(const pb::Criteria& pb_criteria, Criteria* criteria);
+Status FromProto(const pb::Location& pb_location, Location* location);
+Status FromProto(const pb::Ticket& pb_ticket, Ticket* ticket);
+Status FromProto(const pb::FlightDescriptor& pb_descr, FlightDescriptor* descr);
+Status FromProto(const pb::FlightEndpoint& pb_endpoint, FlightEndpoint* endpoint);
+Status FromProto(const pb::FlightGetInfo& pb_info, FlightInfo::Data* info);
+
+Status ToProto(const FlightDescriptor& descr, pb::FlightDescriptor* pb_descr);
+Status ToProto(const FlightInfo& info, pb::FlightGetInfo* pb_info);
+Status ToProto(const ActionType& type, pb::ActionType* pb_type);
+Status ToProto(const Action& action, pb::Action* pb_action);
+Status ToProto(const Result& result, pb::Result* pb_result);
+void ToProto(const Ticket& ticket, pb::Ticket* pb_ticket);
+
+Status FromGrpcStatus(const grpc::Status& grpc_status);
+
+grpc::Status ToGrpcStatus(const Status& arrow_status);
+
+}  // namespace internal
+}  // namespace flight
+}  // namespace arrow
diff --git a/cpp/src/arrow/flight/perf-server.cc b/cpp/src/arrow/flight/perf-server.cc
new file mode 100644
index 0000000..ce2ec7b
--- /dev/null
+++ b/cpp/src/arrow/flight/perf-server.cc
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Performance server for benchmarking purposes
+
+#include <signal.h>
+#include <cstdint>
+#include <iostream>
+#include <memory>
+#include <string>
+
+#include <gflags/gflags.h>
+
+#include "arrow/array.h"
+#include "arrow/io/test-common.h"
+#include "arrow/ipc/writer.h"
+#include "arrow/record_batch.h"
+
+#include "arrow/flight/perf.pb.h"
+#include "arrow/flight/server.h"
+#include "arrow/flight/test-util.h"
+
+DEFINE_int32(port, 31337, "Server port to listen on");
+
+namespace perf = arrow::flight::perf;
+namespace proto = arrow::flight::protocol;
+
+using IpcPayload = arrow::ipc::internal::IpcPayload;
+
+namespace arrow {
+namespace flight {
+
+#define CHECK_PARSE(EXPR)                              \
+  do {                                                 \
+    if (!EXPR) {                                       \
+      return Status::Invalid("cannot parse protobuf"); \
+    }                                                  \
+  } while (0)
+
+using ArrayVector = std::vector<std::shared_ptr<Array>>;
+
+// Create record batches with a unique "a" column so we can verify on the
+// client side that the results are correct
+class PerfDataStream : public FlightDataStream {
+ public:
+  PerfDataStream(bool verify, const int64_t start, const int64_t total_records,
+                 const std::shared_ptr<Schema>& schema, const ArrayVector& arrays)
+      : start_(start),
+        verify_(verify),
+        batch_length_(arrays[0]->length()),
+        total_records_(total_records),
+        records_sent_(0),
+        schema_(schema),
+        arrays_(arrays) {
+    batch_ = RecordBatch::Make(schema, batch_length_, arrays_);
+  }
+
+  Status Next(IpcPayload* payload) override {
+    if (records_sent_ >= total_records_) {
+      // Signal that iteration is over
+      payload->metadata = nullptr;
+      return Status::OK();
+    }
+
+    if (verify_) {
+      // mutate first array
+      auto data =
+          reinterpret_cast<int64_t*>(arrays_[0]->data()->buffers[1]->mutable_data());
+      for (int64_t i = 0; i < batch_length_; ++i) {
+        data[i] = start_ + records_sent_ + i;
+      }
+    }
+
+    auto batch = batch_;
+
+    // Last partial batch
+    if (records_sent_ + batch_length_ > total_records_) {
+      batch = batch_->Slice(0, total_records_ - records_sent_);
+      records_sent_ += total_records_ - records_sent_;
+    } else {
+      records_sent_ += batch_length_;
+    }
+    return ipc::internal::GetRecordBatchPayload(*batch, default_memory_pool(), payload);
+  }
+
+ private:
+  const int64_t start_;
+  bool verify_;
+  const int64_t batch_length_;
+  const int64_t total_records_;
+  int64_t records_sent_;
+  std::shared_ptr<Schema> schema_;
+  std::shared_ptr<RecordBatch> batch_;
+  ArrayVector arrays_;
+};
+
+Status GetPerfBatches(const perf::Token& token, const std::shared_ptr<Schema>& schema,
+                      bool use_verifier, std::unique_ptr<FlightDataStream>* data_stream) {
+  std::shared_ptr<ResizableBuffer> buffer;
+  std::vector<std::shared_ptr<Array>> arrays;
+
+  const int32_t length = token.definition().records_per_batch();
+  const int32_t ncolumns = 4;
+  for (int i = 0; i < ncolumns; ++i) {
+    RETURN_NOT_OK(MakeRandomBuffer<int64_t>(length, default_memory_pool(), &buffer));
+    arrays.push_back(std::make_shared<Int64Array>(length, buffer));
+  }
+
+  *data_stream = std::unique_ptr<FlightDataStream>(
+      new PerfDataStream(use_verifier, token.start(),
+                         token.definition().records_per_stream(), schema, arrays));
+  return Status::OK();
+}
+
+class FlightPerfServer : public FlightServerBase {
+ public:
+  FlightPerfServer() : location_(Location{"localhost", FLAGS_port}) {
+    perf_schema_ = schema({field("a", int64()), field("b", int64()), field("c", int64()),
+                           field("d", int64())});
+  }
+
+  Status GetFlightInfo(const FlightDescriptor& request,
+                       std::unique_ptr<FlightInfo>* info) override {
+    perf::Perf perf_request;
+    CHECK_PARSE(perf_request.ParseFromString(request.cmd));
+
+    perf::Token token;
+    token.mutable_definition()->CopyFrom(perf_request);
+
+    std::vector<FlightEndpoint> endpoints;
+    Ticket tmp_ticket;
+    for (int64_t i = 0; i < perf_request.stream_count(); ++i) {
+      token.set_start(i * perf_request.records_per_stream());
+      token.set_end((i + 1) * perf_request.records_per_stream());
+
+      (void)token.SerializeToString(&tmp_ticket.ticket);
+
+      // All endpoints same location for now
+      endpoints.push_back(FlightEndpoint{tmp_ticket, {location_}});
+    }
+
+    uint64_t total_records =
+        perf_request.stream_count() * perf_request.records_per_stream();
+
+    FlightInfo::Data data;
+    RETURN_NOT_OK(
+        MakeFlightInfo(*perf_schema_, request, endpoints, total_records, -1, &data));
+    *info = std::unique_ptr<FlightInfo>(new FlightInfo(data));
+    return Status::OK();
+  }
+
+  Status DoGet(const Ticket& request,
+               std::unique_ptr<FlightDataStream>* data_stream) override {
+    perf::Token token;
+    CHECK_PARSE(token.ParseFromString(request.ticket));
+    return GetPerfBatches(token, perf_schema_, false, data_stream);
+  }
+
+ private:
+  Location location_;
+  std::shared_ptr<Schema> perf_schema_;
+};
+
+}  // namespace flight
+}  // namespace arrow
+
+std::unique_ptr<arrow::flight::FlightPerfServer> g_server;
+
+void Shutdown(int signal) {
+  if (g_server != nullptr) {
+    g_server->Shutdown();
+  }
+}
+
+int main(int argc, char** argv) {
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  // SIGTERM shuts down the server
+  signal(SIGTERM, Shutdown);
+
+  g_server.reset(new arrow::flight::FlightPerfServer);
+
+  // TODO(wesm): How can we tell if the server failed to start for some reason?
+  g_server->Run(FLAGS_port);
+  return 0;
+}
diff --git a/cpp/src/arrow/flight/perf.proto b/cpp/src/arrow/flight/perf.proto
new file mode 100644
index 0000000..9123baf
--- /dev/null
+++ b/cpp/src/arrow/flight/perf.proto
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package arrow.flight.perf;
+
+message Perf {
+  bytes schema = 1;
+  int32 stream_count = 2;
+  int64 records_per_stream = 3;
+  int32 records_per_batch = 4;
+}
+
+/*
+ * Payload of ticket
+ */
+message Token {
+
+  // definition of entire flight.
+  Perf definition = 1;
+
+  // inclusive start
+  int64 start = 2;
+
+  // exclusive end
+  int64 end = 3;
+
+}
\ No newline at end of file
diff --git a/cpp/src/arrow/flight/server.cc b/cpp/src/arrow/flight/server.cc
new file mode 100644
index 0000000..967a254
--- /dev/null
+++ b/cpp/src/arrow/flight/server.cc
@@ -0,0 +1,385 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/flight/server.h"
+
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <string>
+
+#include "google/protobuf/io/coded_stream.h"
+#include "google/protobuf/io/zero_copy_stream.h"
+#include "google/protobuf/wire_format_lite.h"
+#include "grpcpp/grpcpp.h"
+
+#include "arrow/ipc/writer.h"
+#include "arrow/record_batch.h"
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+
+#include "arrow/flight/Flight.grpc.pb.h"
+#include "arrow/flight/Flight.pb.h"
+#include "arrow/flight/internal.h"
+#include "arrow/flight/types.h"
+
+using FlightService = arrow::flight::protocol::FlightService;
+using ServerContext = grpc::ServerContext;
+
+using arrow::ipc::internal::IpcPayload;
+
+template <typename T>
+using ServerWriter = grpc::ServerWriter<T>;
+
+namespace pb = arrow::flight::protocol;
+
+constexpr int64_t kInt32Max = std::numeric_limits<int32_t>::max();
+
+namespace grpc {
+
+using google::protobuf::internal::WireFormatLite;
+using google::protobuf::io::CodedOutputStream;
+
+// More efficient writing of FlightData to gRPC output buffer
+// Implementation of ZeroCopyOutputStream that writes to a fixed-size buffer
+class FixedSizeProtoWriter : public ::google::protobuf::io::ZeroCopyOutputStream {
+ public:
+  explicit FixedSizeProtoWriter(grpc_slice slice)
+      : slice_(slice),
+        bytes_written_(0),
+        total_size_(static_cast<int>(GRPC_SLICE_LENGTH(slice))) {}
+
+  bool Next(void** data, int* size) override {
+    // Consume the whole slice
+    *data = GRPC_SLICE_START_PTR(slice_) + bytes_written_;
+    *size = total_size_ - bytes_written_;
+    bytes_written_ = total_size_;
+    return true;
+  }
+
+  void BackUp(int count) override { bytes_written_ -= count; }
+
+  int64_t ByteCount() const override { return bytes_written_; }
+
+ private:
+  grpc_slice slice_;
+  int bytes_written_;
+  int total_size_;
+};
+
+// Write FlightData to a grpc::ByteBuffer without extra copying
+template <>
+class SerializationTraits<IpcPayload> {
+ public:
+  static grpc::Status Deserialize(ByteBuffer* buffer, IpcPayload* out) {
+    return grpc::Status(grpc::StatusCode::UNIMPLEMENTED,
+                        "IpcPayload deserialization not implemented");
+  }
+
+  static grpc::Status Serialize(const IpcPayload& msg, ByteBuffer* out,
+                                bool* own_buffer) {
+    size_t total_size = 0;
+
+    DCHECK_LT(msg.metadata->size(), kInt32Max);
+    const int32_t metadata_size = static_cast<int32_t>(msg.metadata->size());
+
+    // 1 byte for metadata tag
+    total_size += 1 + WireFormatLite::LengthDelimitedSize(metadata_size);
+
+    int64_t body_size = 0;
+    for (const auto& buffer : msg.body_buffers) {
+      body_size += buffer->size();
+
+      const int64_t remainder = buffer->size() % 8;
+      if (remainder) {
+        body_size += 8 - remainder;
+      }
+    }
+
+    // 2 bytes for body tag
+    total_size += 2 + WireFormatLite::LengthDelimitedSize(static_cast<size_t>(body_size));
+
+    // TODO(wesm): messages over 2GB unlikely to be yet supported
+    if (total_size > kInt32Max) {
+      return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
+                          "Cannot send record batches exceeding 2GB yet");
+    }
+
+    // Allocate slice, assign to output buffer
+    grpc::Slice slice(total_size);
+
+    // XXX(wesm): for debugging
+    // std::cout << "Writing record batch with total size " << total_size << std::endl;
+
+    FixedSizeProtoWriter writer(*reinterpret_cast<grpc_slice*>(&slice));
+    CodedOutputStream pb_stream(&writer);
+
+    // Write header
+    WireFormatLite::WriteTag(pb::FlightData::kDataHeaderFieldNumber,
+                             WireFormatLite::WIRETYPE_LENGTH_DELIMITED, &pb_stream);
+    pb_stream.WriteVarint32(metadata_size);
+    pb_stream.WriteRawMaybeAliased(msg.metadata->data(),
+                                   static_cast<int>(msg.metadata->size()));
+
+    // Write body
+    WireFormatLite::WriteTag(pb::FlightData::kDataBodyFieldNumber,
+                             WireFormatLite::WIRETYPE_LENGTH_DELIMITED, &pb_stream);
+    pb_stream.WriteVarint32(static_cast<uint32_t>(body_size));
+
+    constexpr uint8_t kPaddingBytes[8] = {0};
+
+    for (const auto& buffer : msg.body_buffers) {
+      pb_stream.WriteRawMaybeAliased(buffer->data(), static_cast<int>(buffer->size()));
+
+      // Write padding if not multiple of 8
+      const int remainder = buffer->size() % 8;
+      if (remainder) {
+        pb_stream.WriteRawMaybeAliased(kPaddingBytes, 8 - remainder);
+      }
+    }
+
+    DCHECK_EQ(static_cast<int>(total_size), pb_stream.ByteCount());
+
+    // Hand off the slice to the returned ByteBuffer
+    grpc::ByteBuffer tmp(&slice, 1);
+    out->Swap(&tmp);
+    *own_buffer = true;
+    return grpc::Status::OK;
+  }
+};
+
+}  // namespace grpc
+
+namespace arrow {
+namespace flight {
+
+#define CHECK_ARG_NOT_NULL(VAL, MESSAGE)                              \
+  if (VAL == nullptr) {                                               \
+    return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, MESSAGE); \
+  }
+
+// This class glues an implementation of FlightServerBase together with the
+// gRPC service definition, so the latter is not exposed in the public API
+class FlightServiceImpl : public FlightService::Service {
+ public:
+  explicit FlightServiceImpl(FlightServerBase* server) : server_(server) {}
+
+  template <typename UserType, typename Iterator, typename ProtoType>
+  grpc::Status WriteStream(Iterator* iterator, ServerWriter<ProtoType>* writer) {
+    // Write flight info to stream until listing is exhausted
+    ProtoType pb_value;
+    std::unique_ptr<UserType> value;
+    while (true) {
+      GRPC_RETURN_NOT_OK(iterator->Next(&value));
+      if (!value) {
+        break;
+      }
+      GRPC_RETURN_NOT_OK(internal::ToProto(*value, &pb_value));
+
+      // Blocking write
+      if (!writer->Write(pb_value)) {
+        // Write returns false if the stream is closed
+        break;
+      }
+    }
+    return grpc::Status::OK;
+  }
+
+  template <typename UserType, typename ProtoType>
+  grpc::Status WriteStream(const std::vector<UserType>& values,
+                           ServerWriter<ProtoType>* writer) {
+    // Write flight info to stream until listing is exhausted
+    ProtoType pb_value;
+    for (const UserType& value : values) {
+      GRPC_RETURN_NOT_OK(internal::ToProto(value, &pb_value));
+      // Blocking write
+      if (!writer->Write(pb_value)) {
+        // Write returns false if the stream is closed
+        break;
+      }
+    }
+    return grpc::Status::OK;
+  }
+
+  grpc::Status ListFlights(ServerContext* context, const pb::Criteria* request,
+                           ServerWriter<pb::FlightGetInfo>* writer) {
+    // Retrieve the listing from the implementation
+    std::unique_ptr<FlightListing> listing;
+
+    Criteria criteria;
+    if (request) {
+      GRPC_RETURN_NOT_OK(internal::FromProto(*request, &criteria));
+    }
+    GRPC_RETURN_NOT_OK(server_->ListFlights(&criteria, &listing));
+    return WriteStream<FlightInfo>(listing.get(), writer);
+  }
+
+  grpc::Status GetFlightInfo(ServerContext* context, const pb::FlightDescriptor* request,
+                             pb::FlightGetInfo* response) {
+    CHECK_ARG_NOT_NULL(request, "FlightDescriptor cannot be null");
+
+    FlightDescriptor descr;
+    GRPC_RETURN_NOT_OK(internal::FromProto(*request, &descr));
+
+    std::unique_ptr<FlightInfo> info;
+    GRPC_RETURN_NOT_OK(server_->GetFlightInfo(descr, &info));
+
+    GRPC_RETURN_NOT_OK(internal::ToProto(*info, response));
+    return grpc::Status::OK;
+  }
+
+  grpc::Status DoGet(ServerContext* context, const pb::Ticket* request,
+                     ServerWriter<pb::FlightData>* writer) {
+    CHECK_ARG_NOT_NULL(request, "ticket cannot be null");
+
+    Ticket ticket;
+    GRPC_RETURN_NOT_OK(internal::FromProto(*request, &ticket));
+
+    std::unique_ptr<FlightDataStream> data_stream;
+    GRPC_RETURN_NOT_OK(server_->DoGet(ticket, &data_stream));
+
+    // Requires ServerWriter customization in grpc_customizations.h
+    auto custom_writer = reinterpret_cast<ServerWriter<IpcPayload>*>(writer);
+
+    while (true) {
+      IpcPayload payload;
+      GRPC_RETURN_NOT_OK(data_stream->Next(&payload));
+      if (payload.metadata == nullptr ||
+          !custom_writer->Write(payload, grpc::WriteOptions())) {
+        // No more messages to write, or connection terminated for some other
+        // reason
+        break;
+      }
+    }
+    return grpc::Status::OK;
+  }
+
+  grpc::Status DoPut(ServerContext* context, grpc::ServerReader<pb::FlightData>* reader,
+                     pb::PutResult* response) {
+    return grpc::Status(grpc::StatusCode::UNIMPLEMENTED, "");
+  }
+
+  grpc::Status ListActions(ServerContext* context, const pb::Empty* request,
+                           ServerWriter<pb::ActionType>* writer) {
+    // Retrieve the listing from the implementation
+    std::vector<ActionType> types;
+    GRPC_RETURN_NOT_OK(server_->ListActions(&types));
+    return WriteStream<ActionType>(types, writer);
+  }
+
+  grpc::Status DoAction(ServerContext* context, const pb::Action* request,
+                        ServerWriter<pb::Result>* writer) {
+    CHECK_ARG_NOT_NULL(request, "Action cannot be null");
+    Action action;
+    GRPC_RETURN_NOT_OK(internal::FromProto(*request, &action));
+
+    std::unique_ptr<ResultStream> results;
+    GRPC_RETURN_NOT_OK(server_->DoAction(action, &results));
+
+    std::unique_ptr<Result> result;
+    pb::Result pb_result;
+    while (true) {
+      GRPC_RETURN_NOT_OK(results->Next(&result));
+      if (!result) {
+        // No more results
+        break;
+      }
+      GRPC_RETURN_NOT_OK(internal::ToProto(*result, &pb_result));
+      if (!writer->Write(pb_result)) {
+        // Stream may be closed
+        break;
+      }
+    }
+    return grpc::Status::OK;
+  }
+
+ private:
+  FlightServerBase* server_;
+};
+
+struct FlightServerBase::FlightServerBaseImpl {
+  std::unique_ptr<grpc::Server> server;
+};
+
+FlightServerBase::FlightServerBase() { impl_.reset(new FlightServerBaseImpl); }
+
+FlightServerBase::~FlightServerBase() {}
+
+void FlightServerBase::Run(int port) {
+  std::string address = "localhost:" + std::to_string(port);
+
+  FlightServiceImpl service(this);
+  grpc::ServerBuilder builder;
+  builder.AddListeningPort(address, grpc::InsecureServerCredentials());
+  builder.RegisterService(&service);
+
+  impl_->server = builder.BuildAndStart();
+  std::cout << "Server listening on " << address << std::endl;
+  impl_->server->Wait();
+}
+
+void FlightServerBase::Shutdown() {
+  DCHECK(impl_->server);
+  impl_->server->Shutdown();
+}
+
+Status FlightServerBase::ListFlights(const Criteria* criteria,
+                                     std::unique_ptr<FlightListing>* listings) {
+  return Status::NotImplemented("NYI");
+}
+
+Status FlightServerBase::GetFlightInfo(const FlightDescriptor& request,
+                                       std::unique_ptr<FlightInfo>* info) {
+  std::cout << "GetFlightInfo" << std::endl;
+  return Status::NotImplemented("NYI");
+}
+
+Status FlightServerBase::DoGet(const Ticket& request,
+                               std::unique_ptr<FlightDataStream>* data_stream) {
+  return Status::NotImplemented("NYI");
+}
+
+Status FlightServerBase::DoAction(const Action& action,
+                                  std::unique_ptr<ResultStream>* result) {
+  return Status::NotImplemented("NYI");
+}
+
+Status FlightServerBase::ListActions(std::vector<ActionType>* actions) {
+  return Status::NotImplemented("NYI");
+}
+
+// ----------------------------------------------------------------------
+// Implement RecordBatchStream
+
+RecordBatchStream::RecordBatchStream(const std::shared_ptr<RecordBatchReader>& reader)
+    : pool_(default_memory_pool()), reader_(reader) {}
+
+Status RecordBatchStream::Next(IpcPayload* payload) {
+  std::shared_ptr<RecordBatch> batch;
+  RETURN_NOT_OK(reader_->ReadNext(&batch));
+
+  if (!batch) {
+    // Signal that iteration is over
+    payload->metadata = nullptr;
+    return Status::OK();
+  } else {
+    return ipc::internal::GetRecordBatchPayload(*batch, pool_, payload);
+  }
+}
+
+}  // namespace flight
+}  // namespace arrow
diff --git a/cpp/src/arrow/flight/server.h b/cpp/src/arrow/flight/server.h
new file mode 100644
index 0000000..89154ac
--- /dev/null
+++ b/cpp/src/arrow/flight/server.h
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Interfaces to use for defining Flight RPC servers. API should be considered
+// experimental for now
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/util/visibility.h"
+
+#include "arrow/flight/types.h"
+
+namespace arrow {
+
+class MemoryPool;
+class RecordBatchReader;
+class Status;
+
+namespace ipc {
+namespace internal {
+
+struct IpcPayload;
+
+}  // namespace internal
+}  // namespace ipc
+
+namespace io {
+
+class OutputStream;
+
+}  // namespace io
+
+namespace flight {
+
+/// \brief Interface that produces a sequence of IPC payloads to be sent in
+/// FlightData protobuf messages
+class ARROW_EXPORT FlightDataStream {
+ public:
+  virtual ~FlightDataStream() = default;
+
+  // When the stream is completed, the last payload written will have null
+  // metadata
+  virtual Status Next(ipc::internal::IpcPayload* payload) = 0;
+};
+
+/// \brief A basic implementation of FlightDataStream that will provide
+/// a sequence of FlightData messages to be written to a gRPC stream
+/// \param[in] reader produces a sequence of record batches
+class ARROW_EXPORT RecordBatchStream : public FlightDataStream {
+ public:
+  explicit RecordBatchStream(const std::shared_ptr<RecordBatchReader>& reader);
+
+  Status Next(ipc::internal::IpcPayload* payload) override;
+
+ private:
+  MemoryPool* pool_;
+  std::shared_ptr<RecordBatchReader> reader_;
+};
+
+/// \brief Skeleton RPC server implementation which can be used to create
+/// custom servers by implementing its abstract methods
+class ARROW_EXPORT FlightServerBase {
+ public:
+  FlightServerBase();
+  virtual ~FlightServerBase();
+
+  /// \brief Run an insecure server on localhost at the indicated port. Block
+  /// until server is shut down or otherwise terminates
+  /// \param[in] port
+  /// \return Status
+  void Run(int port);
+
+  /// \brief Shut down the server. Can be called from signal handler or another
+  /// thread while Run blocks
+  ///
+  /// TODO(wesm): Shutdown with deadline
+  void Shutdown();
+
+  // Implement these methods to create your own server. The default
+  // implementations will return a not-implemented result to the client
+
+  /// \brief Retrieve a list of available fields given an optional opaque
+  /// criteria
+  /// \param[in] criteria may be null
+  /// \param[out] listings the returned listings iterator
+  /// \return Status
+  virtual Status ListFlights(const Criteria* criteria,
+                             std::unique_ptr<FlightListing>* listings);
+
+  /// \brief Retrieve the schema and an access plan for the indicated
+  /// descriptor
+  /// \param[in] request may be null
+  /// \param[out] info the returned flight info provider
+  /// \return Status
+  virtual Status GetFlightInfo(const FlightDescriptor& request,
+                               std::unique_ptr<FlightInfo>* info);
+
+  /// \brief Get a stream of IPC payloads to put on the wire
+  /// \param[in] ticket an opaque ticket
+  /// \param[out] stream the returned stream provider
+  /// \return Status
+  virtual Status DoGet(const Ticket& request, std::unique_ptr<FlightDataStream>* stream);
+
+  // virtual Status DoPut(std::unique_ptr<FlightMessageReader>* reader) = 0;
+
+  /// \brief Execute an action, return stream of zero or more results
+  /// \param[in] action the action to execute, with type and body
+  /// \param[out] result the result iterator
+  /// \return Status
+  virtual Status DoAction(const Action& action, std::unique_ptr<ResultStream>* result);
+
+  /// \brief Retrieve the list of available actions
+  /// \param[out] actions a vector of available action types
+  /// \return Status
+  virtual Status ListActions(std::vector<ActionType>* actions);
+
+ private:
+  struct FlightServerBaseImpl;
+  std::unique_ptr<FlightServerBaseImpl> impl_;
+};
+
+}  // namespace flight
+}  // namespace arrow
diff --git a/cpp/src/arrow/flight/test-server.cc b/cpp/src/arrow/flight/test-server.cc
new file mode 100644
index 0000000..14b03d9
--- /dev/null
+++ b/cpp/src/arrow/flight/test-server.cc
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Example server implementation to use for unit testing and benchmarking
+// purposes
+
+#include <signal.h>
+#include <iostream>
+#include <memory>
+#include <string>
+
+#include <gflags/gflags.h>
+
+#include "arrow/io/test-common.h"
+#include "arrow/record_batch.h"
+
+#include "arrow/flight/server.h"
+#include "arrow/flight/test-util.h"
+
+DEFINE_int32(port, 31337, "Server port to listen on");
+
+namespace arrow {
+namespace flight {
+
+Status GetBatchForFlight(const Ticket& ticket, std::shared_ptr<RecordBatchReader>* out) {
+  if (ticket.ticket == "ticket-id-1") {
+    BatchVector batches;
+    RETURN_NOT_OK(SimpleIntegerBatches(5, &batches));
+    *out = std::make_shared<BatchIterator>(batches[0]->schema(), batches);
+    return Status::OK();
+  } else {
+    return Status::NotImplemented("no stream implemented for this ticket");
+  }
+}
+
+class FlightTestServer : public FlightServerBase {
+  Status ListFlights(const Criteria* criteria,
+                     std::unique_ptr<FlightListing>* listings) override {
+    std::vector<FlightInfo> flights = ExampleFlightInfo();
+    *listings = std::unique_ptr<FlightListing>(new SimpleFlightListing(flights));
+    return Status::OK();
+  }
+
+  Status GetFlightInfo(const FlightDescriptor& request,
+                       std::unique_ptr<FlightInfo>* info) override {
+    std::vector<FlightInfo> flights = ExampleFlightInfo();
+
+    const FlightInfo* value;
+
+    // We only have one kind of flight for each descriptor type
+    if (request.type == FlightDescriptor::PATH) {
+      value = &flights[0];
+    } else {
+      value = &flights[1];
+    }
+
+    *info = std::unique_ptr<FlightInfo>(new FlightInfo(*value));
+    return Status::OK();
+  }
+
+  Status DoGet(const Ticket& request,
+               std::unique_ptr<FlightDataStream>* data_stream) override {
+    std::shared_ptr<RecordBatchReader> batch_reader;
+    RETURN_NOT_OK(GetBatchForFlight(request, &batch_reader));
+
+    *data_stream = std::unique_ptr<FlightDataStream>(new RecordBatchStream(batch_reader));
+    return Status::OK();
+  }
+
+  Status RunAction1(const Action& action, std::unique_ptr<ResultStream>* out) {
+    std::vector<Result> results;
+    for (int i = 0; i < 3; ++i) {
+      Result result;
+      std::string value = action.body->ToString() + "-part" + std::to_string(i);
+      RETURN_NOT_OK(Buffer::FromString(value, &result.body));
+      results.push_back(result);
+    }
+    *out = std::unique_ptr<ResultStream>(new SimpleResultStream(std::move(results)));
+    return Status::OK();
+  }
+
+  Status RunAction2(std::unique_ptr<ResultStream>* out) {
+    // Empty
+    *out = std::unique_ptr<ResultStream>(new SimpleResultStream({}));
+    return Status::OK();
+  }
+
+  Status DoAction(const Action& action, std::unique_ptr<ResultStream>* out) override {
+    if (action.type == "action1") {
+      return RunAction1(action, out);
+    } else if (action.type == "action2") {
+      return RunAction2(out);
+    } else {
+      return Status::NotImplemented(action.type);
+    }
+  }
+
+  Status ListActions(std::vector<ActionType>* out) override {
+    std::vector<ActionType> actions = ExampleActionTypes();
+    *out = std::move(actions);
+    return Status::OK();
+  }
+};
+
+}  // namespace flight
+}  // namespace arrow
+
+std::unique_ptr<arrow::flight::FlightTestServer> g_server;
+
+void Shutdown(int signal) {
+  if (g_server != nullptr) {
+    g_server->Shutdown();
+  }
+}
+
+int main(int argc, char** argv) {
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  // SIGTERM shuts down the server
+  signal(SIGTERM, Shutdown);
+
+  g_server.reset(new arrow::flight::FlightTestServer);
+
+  // TODO(wesm): How can we tell if the server failed to start for some reason?
+  g_server->Run(FLAGS_port);
+  return 0;
+}
diff --git a/cpp/src/arrow/flight/test-util.h b/cpp/src/arrow/flight/test-util.h
new file mode 100644
index 0000000..4a12997
--- /dev/null
+++ b/cpp/src/arrow/flight/test-util.h
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstdio>
+#include <cstring>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <boost/process.hpp>
+
+#include "arrow/ipc/test-common.h"
+#include "arrow/status.h"
+#include "arrow/test-util.h"
+
+#include "arrow/flight/api.h"
+#include "arrow/flight/internal.h"
+
+namespace bp = boost::process;
+
+namespace arrow {
+namespace flight {
+
+// ----------------------------------------------------------------------
+// Fixture to use for running test servers
+
+struct TestServer {
+ public:
+  explicit TestServer(const std::string& executable_name, int port)
+      : executable_name_(executable_name), port_(port) {}
+
+  void Start() {
+    std::string str_port = std::to_string(port_);
+    server_process_.reset(
+        new bp::child(bp::search_path(executable_name_), "-port", str_port));
+    std::cout << "Server running with pid " << server_process_->id() << std::endl;
+  }
+
+  int Stop() {
+    kill(server_process_->id(), SIGTERM);
+    server_process_->wait();
+    return server_process_->exit_code();
+  }
+
+  bool IsRunning() { return server_process_->running(); }
+
+  int port() const { return port_; }
+
+ private:
+  std::string executable_name_;
+  int port_;
+  std::unique_ptr<bp::child> server_process_;
+};
+
+// ----------------------------------------------------------------------
+// A RecordBatchReader for serving a sequence of in-memory record batches
+
+class BatchIterator : public RecordBatchReader {
+ public:
+  BatchIterator(const std::shared_ptr<Schema>& schema,
+                const std::vector<std::shared_ptr<RecordBatch>>& batches)
+      : schema_(schema), batches_(batches), position_(0) {}
+
+  std::shared_ptr<Schema> schema() const override { return schema_; }
+
+  Status ReadNext(std::shared_ptr<RecordBatch>* out) override {
+    if (position_ >= batches_.size()) {
+      *out = nullptr;
+    } else {
+      *out = batches_[position_++];
+    }
+    return Status::OK();
+  }
+
+ private:
+  std::shared_ptr<Schema> schema_;
+  std::vector<std::shared_ptr<RecordBatch>> batches_;
+  size_t position_;
+};
+
+// ----------------------------------------------------------------------
+// Example data for test-server and unit tests
+
+using BatchVector = std::vector<std::shared_ptr<RecordBatch>>;
+
+std::shared_ptr<Schema> ExampleSchema1() {
+  auto f0 = field("f0", int32());
+  auto f1 = field("f1", int32());
+  return ::arrow::schema({f0, f1});
+}
+
+std::shared_ptr<Schema> ExampleSchema2() {
+  auto f0 = field("f0", utf8());
+  auto f1 = field("f1", binary());
+  return ::arrow::schema({f0, f1});
+}
+
+Status MakeFlightInfo(const Schema& schema, const FlightDescriptor& descriptor,
+                      const std::vector<FlightEndpoint>& endpoints,
+                      uint64_t total_records, uint64_t total_bytes,
+                      FlightInfo::Data* out) {
+  out->descriptor = descriptor;
+  out->endpoints = endpoints;
+  out->total_records = total_records;
+  out->total_bytes = total_bytes;
+  return internal::SchemaToString(schema, &out->schema);
+}
+
+std::vector<FlightInfo> ExampleFlightInfo() {
+  FlightEndpoint endpoint1({{"ticket-id-1"}, {{"foo1.bar.com", 92385}}});
+  FlightEndpoint endpoint2({{"ticket-id-2"}, {{"foo2.bar.com", 92385}}});
+  FlightEndpoint endpoint3({{"ticket-id-3"}, {{"foo3.bar.com", 92385}}});
+  FlightDescriptor descr1{FlightDescriptor::PATH, "", {"foo", "bar"}};
+  FlightDescriptor descr2{FlightDescriptor::CMD, "my_command", {}};
+
+  auto schema1 = ExampleSchema1();
+  auto schema2 = ExampleSchema2();
+
+  FlightInfo::Data flight1, flight2;
+  EXPECT_OK(
+      MakeFlightInfo(*schema1, descr1, {endpoint1, endpoint2}, 1000, 100000, &flight1));
+  EXPECT_OK(MakeFlightInfo(*schema2, descr2, {endpoint3}, 1000, 100000, &flight2));
+  return {FlightInfo(flight1), FlightInfo(flight2)};
+}
+
+Status SimpleIntegerBatches(const int num_batches, BatchVector* out) {
+  std::shared_ptr<RecordBatch> batch;
+  for (int i = 0; i < num_batches; ++i) {
+    // Make all different sizes, use different random seed
+    RETURN_NOT_OK(ipc::MakeIntBatchSized(10 + i, &batch, i));
+    out->push_back(batch);
+  }
+  return Status::OK();
+}
+
+std::vector<ActionType> ExampleActionTypes() {
+  return {{"drop", "drop a dataset"}, {"cache", "cache a dataset"}};
+}
+
+}  // namespace flight
+}  // namespace arrow
diff --git a/cpp/src/arrow/flight/types.cc b/cpp/src/arrow/flight/types.cc
new file mode 100644
index 0000000..8c7588d
--- /dev/null
+++ b/cpp/src/arrow/flight/types.cc
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/flight/types.h"
+
+#include <memory>
+#include <sstream>
+#include <string>
+#include <utility>
+
+#include "arrow/io/memory.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/status.h"
+
+namespace arrow {
+namespace flight {
+
+Status FlightInfo::GetSchema(std::shared_ptr<Schema>* out) const {
+  if (reconstructed_schema_) {
+    *out = schema_;
+    return Status::OK();
+  }
+  /// XXX(wesm): arrow::ipc::ReadSchema in its current form will not suffice
+  /// for reading schemas with dictionaries. See ARROW-3144
+  io::BufferReader schema_reader(reinterpret_cast<const uint8_t*>(data_.schema.c_str()),
+                                 static_cast<int64_t>(data_.schema.size()));
+  RETURN_NOT_OK(ipc::ReadSchema(&schema_reader, &schema_));
+  reconstructed_schema_ = true;
+  *out = schema_;
+  return Status::OK();
+}
+
+SimpleFlightListing::SimpleFlightListing(const std::vector<FlightInfo>& flights)
+    : position_(0), flights_(flights) {}
+
+SimpleFlightListing::SimpleFlightListing(std::vector<FlightInfo>&& flights)
+    : position_(0), flights_(std::move(flights)) {}
+
+Status SimpleFlightListing::Next(std::unique_ptr<FlightInfo>* info) {
+  if (position_ >= static_cast<int>(flights_.size())) {
+    *info = nullptr;
+    return Status::OK();
+  }
+  *info = std::unique_ptr<FlightInfo>(new FlightInfo(std::move(flights_[position_++])));
+  return Status::OK();
+}
+
+SimpleResultStream::SimpleResultStream(std::vector<Result>&& results)
+    : results_(std::move(results)), position_(0) {}
+
+Status SimpleResultStream::Next(std::unique_ptr<Result>* result) {
+  if (position_ >= results_.size()) {
+    *result = nullptr;
+    return Status::OK();
+  }
+  *result = std::unique_ptr<Result>(new Result(std::move(results_[position_++])));
+  return Status::OK();
+}
+
+}  // namespace flight
+}  // namespace arrow
diff --git a/cpp/src/arrow/flight/types.h b/cpp/src/arrow/flight/types.h
new file mode 100644
index 0000000..0362105
--- /dev/null
+++ b/cpp/src/arrow/flight/types.h
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Data structure for Flight RPC. API should be considered experimental for now
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class Buffer;
+class Schema;
+class Status;
+
+namespace flight {
+
+/// \brief A type of action that can be performed with the DoAction RPC
+struct ActionType {
+  /// Name of action
+  std::string type;
+
+  /// Opaque action description
+  std::string description;
+};
+
+/// \brief Opaque selection critera for ListFlights RPC
+struct Criteria {
+  /// Opaque criteria expression, dependent on server implementation
+  std::string expression;
+};
+
+/// \brief An action to perform with the DoAction RPC
+struct Action {
+  /// The action type
+  std::string type;
+
+  /// The action content as a Buffer
+  std::shared_ptr<Buffer> body;
+};
+
+/// \brief Opaque result returned after executing an action
+struct Result {
+  std::shared_ptr<Buffer> body;
+};
+
+/// \brief A message received after completing a DoPut stream
+struct PutResult {};
+
+/// \brief A request to retrieve or generate a dataset
+struct FlightDescriptor {
+  enum DescriptorType {
+    UNKNOWN = 0,  /// Unused
+    PATH = 1,     /// Named path identifying a dataset
+    CMD = 2       /// Opaque command to generate a dataset
+  };
+
+  /// The descriptor type
+  DescriptorType type;
+
+  /// Opaque value used to express a command. Should only be defined when type
+  /// is CMD
+  std::string cmd;
+
+  /// List of strings identifying a particular dataset. Should only be defined
+  /// when type is PATH
+  std::vector<std::string> path;
+};
+
+/// \brief Data structure providing an opaque identifier or credential to use
+/// when requesting a data stream with the DoGet RPC
+struct Ticket {
+  std::string ticket;
+};
+
+/// \brief A host location (hostname and port)
+struct Location {
+  std::string host;
+  int32_t port;
+};
+
+/// \brief A flight ticket and list of locations where the ticket can be
+/// redeemed
+struct FlightEndpoint {
+  /// Opaque ticket identify; use with DoGet RPC
+  Ticket ticket;
+
+  /// List of locations where ticket can be redeemed. If the list is empty, the
+  /// ticket can only be redeemed on the current service where the ticket was
+  /// generated
+  std::vector<Location> locations;
+};
+
+/// \brief The access coordinates for retireval of a dataset, returned by
+/// GetFlightInfo
+class FlightInfo {
+ public:
+  struct Data {
+    std::string schema;
+    FlightDescriptor descriptor;
+    std::vector<FlightEndpoint> endpoints;
+    uint64_t total_records;
+    uint64_t total_bytes;
+  };
+
+  explicit FlightInfo(const Data& data) : data_(data), reconstructed_schema_(false) {}
+  explicit FlightInfo(Data&& data)
+      : data_(std::move(data)), reconstructed_schema_(false) {}
+
+  /// Deserialize the Arrow schema of the dataset, to be passed to each call to
+  /// DoGet
+  Status GetSchema(std::shared_ptr<Schema>* out) const;
+
+  const std::string& serialized_schema() const { return data_.schema; }
+
+  /// The descriptor associated with this flight, may not be set
+  const FlightDescriptor& descriptor() const { return data_.descriptor; }
+
+  /// A list of endpoints associated with the flight (dataset). To consume the
+  /// whole flight, all endpoints must be consumed
+  const std::vector<FlightEndpoint>& endpoints() const { return data_.endpoints; }
+
+  /// The total number of records (rows) in the dataset. If unknown, set to -1
+  uint64_t total_records() const { return data_.total_records; }
+
+  /// The total number of bytes in the dataset. If unknown, set to -1
+  uint64_t total_bytes() const { return data_.total_bytes; }
+
+ private:
+  Data data_;
+  mutable std::shared_ptr<Schema> schema_;
+  mutable bool reconstructed_schema_;
+};
+
+// TODO(wesm): NYI
+class ARROW_EXPORT FlightPutWriter {
+ public:
+  virtual ~FlightPutWriter() = default;
+};
+
+/// \brief An iterator to FlightInfo instances returned by ListFlights
+class ARROW_EXPORT FlightListing {
+ public:
+  virtual ~FlightListing() = default;
+
+  /// \brief Retrieve the next FlightInfo from the iterator. Returns nullptr
+  /// when there are none left
+  /// \param[out] info a single FlightInfo
+  /// \return Status
+  virtual Status Next(std::unique_ptr<FlightInfo>* info) = 0;
+};
+
+/// \brief An iterator to Result instances returned by DoAction
+class ARROW_EXPORT ResultStream {
+ public:
+  virtual ~ResultStream() = default;
+
+  /// \brief Retrieve the next Result from the iterator. Returns nullptr
+  /// when there are none left
+  /// \param[out] info a single Result
+  /// \return Status
+  virtual Status Next(std::unique_ptr<Result>* info) = 0;
+};
+
+// \brief Create a FlightListing from a vector of FlightInfo objects. This can
+// be iterated once, then it is consumed
+class ARROW_EXPORT SimpleFlightListing : public FlightListing {
+ public:
+  explicit SimpleFlightListing(const std::vector<FlightInfo>& flights);
+  explicit SimpleFlightListing(std::vector<FlightInfo>&& flights);
+
+  Status Next(std::unique_ptr<FlightInfo>* info) override;
+
+ private:
+  int position_;
+  std::vector<FlightInfo> flights_;
+};
+
+class ARROW_EXPORT SimpleResultStream : public ResultStream {
+ public:
+  explicit SimpleResultStream(std::vector<Result>&& results);
+  Status Next(std::unique_ptr<Result>* result) override;
+
+ private:
+  std::vector<Result> results_;
+  size_t position_;
+};
+
+}  // namespace flight
+}  // namespace arrow
diff --git a/cpp/src/arrow/gpu/cuda-test.cc b/cpp/src/arrow/gpu/cuda-test.cc
index 5eb5cd7..cb37545 100644
--- a/cpp/src/arrow/gpu/cuda-test.cc
+++ b/cpp/src/arrow/gpu/cuda-test.cc
@@ -340,7 +340,7 @@ TEST_F(TestCudaArrowIpc, BasicWriteRead) {
   io::BufferReader cpu_reader(host_buffer);
   ASSERT_OK(ipc::ReadRecordBatch(batch->schema(), &cpu_reader, &cpu_batch));
 
-  ipc::CompareBatch(*batch, *cpu_batch);
+  CompareBatch(*batch, *cpu_batch);
 }
 
 }  // namespace gpu
diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc
index 4ee0c34..d77248a 100644
--- a/cpp/src/arrow/ipc/message.cc
+++ b/cpp/src/arrow/ipc/message.cc
@@ -73,6 +73,8 @@ class Message::MessageImpl {
 
   const void* header() const { return message_->header(); }
 
+  int64_t body_length() const { return message_->bodyLength(); }
+
   std::shared_ptr<Buffer> body() const { return body_; }
 
   std::shared_ptr<Buffer> metadata() const { return metadata_; }
@@ -101,6 +103,8 @@ Message::~Message() {}
 
 std::shared_ptr<Buffer> Message::body() const { return impl_->body(); }
 
+int64_t Message::body_length() const { return impl_->body_length(); }
+
 std::shared_ptr<Buffer> Message::metadata() const { return impl_->metadata(); }
 
 Message::Type Message::type() const { return impl_->type(); }
diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h
index d150eab..08176ab 100644
--- a/cpp/src/arrow/ipc/message.h
+++ b/cpp/src/arrow/ipc/message.h
@@ -125,6 +125,10 @@ class ARROW_EXPORT Message {
   /// \return buffer is null if no body
   std::shared_ptr<Buffer> body() const;
 
+  /// \brief The expected body length according to the metadata, for
+  /// verification purposes
+  int64_t body_length() const;
+
   /// \brief The Message type
   Type type() const;
 
diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc
index 530262d..0bf18f3 100644
--- a/cpp/src/arrow/ipc/metadata-internal.cc
+++ b/cpp/src/arrow/ipc/metadata-internal.cc
@@ -960,6 +960,8 @@ Status WriteMessage(const Buffer& message, io::OutputStream* file,
   int64_t start_offset;
   RETURN_NOT_OK(file->Tell(&start_offset));
 
+  // TODO(wesm): Should we depend on the position of the OutputStream? See
+  // ARROW-3212
   int32_t padded_message_length = static_cast<int32_t>(message.size()) + 4;
   const int32_t remainder =
       (padded_message_length + static_cast<int32_t>(start_offset)) % 8;
diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h
index 380f3c9..730a1a5 100644
--- a/cpp/src/arrow/ipc/metadata-internal.h
+++ b/cpp/src/arrow/ipc/metadata-internal.h
@@ -106,7 +106,7 @@ Status WriteMessage(const Buffer& message, io::OutputStream* file,
 // Serialize arrow::Schema as a Flatbuffer
 //
 // \param[in] schema a Schema instance
-// \param[inout] dictionary_memo class for tracking dictionaries and assigning
+// \param[in,out] dictionary_memo class for tracking dictionaries and assigning
 // dictionary ids
 // \param[out] out the serialized arrow::Buffer
 // \return Status outcome
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 5ffbe6f..92cf75b 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -578,6 +578,9 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl {
     std::unique_ptr<Message> message;
     RETURN_NOT_OK(ReadMessage(block.offset, block.metadata_length, file_, &message));
 
+    // TODO(wesm): this breaks integration tests, see ARROW-3256
+    // DCHECK_EQ(message->body_length(), block.body_length);
+
     io::BufferReader reader(message->body());
     return ::arrow::ipc::ReadRecordBatch(*message->metadata(), schema_, &reader, batch);
   }
@@ -596,6 +599,9 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl {
       std::unique_ptr<Message> message;
       RETURN_NOT_OK(ReadMessage(block.offset, block.metadata_length, file_, &message));
 
+      // TODO(wesm): this breaks integration tests, see ARROW-3256
+      // DCHECK_EQ(message->body_length(), block.body_length);
+
       io::BufferReader reader(message->body());
 
       std::shared_ptr<Array> dictionary;
diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h
index 299f050..4f7de26 100644
--- a/cpp/src/arrow/ipc/test-common.h
+++ b/cpp/src/arrow/ipc/test-common.h
@@ -39,36 +39,6 @@
 namespace arrow {
 namespace ipc {
 
-static inline void AssertSchemaEqual(const Schema& lhs, const Schema& rhs) {
-  if (!lhs.Equals(rhs)) {
-    std::stringstream ss;
-    ss << "left schema: " << lhs.ToString() << std::endl
-       << "right schema: " << rhs.ToString() << std::endl;
-    FAIL() << ss.str();
-  }
-}
-
-static inline void CompareBatch(const RecordBatch& left, const RecordBatch& right) {
-  if (!left.schema()->Equals(*right.schema())) {
-    FAIL() << "Left schema: " << left.schema()->ToString()
-           << "\nRight schema: " << right.schema()->ToString();
-  }
-  ASSERT_EQ(left.num_columns(), right.num_columns())
-      << left.schema()->ToString() << " result: " << right.schema()->ToString();
-  ASSERT_EQ(left.num_rows(), right.num_rows());
-  for (int i = 0; i < left.num_columns(); ++i) {
-    if (!left.column(i)->Equals(right.column(i))) {
-      std::stringstream ss;
-      ss << "Idx: " << i << " Name: " << left.column_name(i);
-      ss << std::endl << "Left: ";
-      ASSERT_OK(PrettyPrint(*left.column(i), 0, &ss));
-      ss << std::endl << "Right: ";
-      ASSERT_OK(PrettyPrint(*right.column(i), 0, &ss));
-      FAIL() << ss.str();
-    }
-  }
-}
-
 static inline void CompareArraysDetailed(int index, const Array& result,
                                          const Array& expected) {
   if (!expected.Equals(result)) {
@@ -96,9 +66,9 @@ const auto kListInt32 = list(int32());
 const auto kListListInt32 = list(kListInt32);
 
 Status MakeRandomInt32Array(int64_t length, bool include_nulls, MemoryPool* pool,
-                            std::shared_ptr<Array>* out) {
+                            std::shared_ptr<Array>* out, uint32_t seed = 0) {
   std::shared_ptr<ResizableBuffer> data;
-  RETURN_NOT_OK(MakeRandomInt32Buffer(length, pool, &data));
+  RETURN_NOT_OK(MakeRandomBuffer<int32_t>(length, pool, &data, seed));
   Int32Builder builder(int32(), pool);
   RETURN_NOT_OK(builder.Resize(length));
   if (include_nulls) {
@@ -195,7 +165,8 @@ Status MakeBooleanBatch(std::shared_ptr<RecordBatch>* out) {
   return MakeBooleanBatchSized(1000, out);
 }
 
-Status MakeIntBatchSized(int length, std::shared_ptr<RecordBatch>* out) {
+Status MakeIntBatchSized(int length, std::shared_ptr<RecordBatch>* out,
+                         uint32_t seed = 0) {
   // Make the schema
   auto f0 = field("f0", int32());
   auto f1 = field("f1", int32());
@@ -204,8 +175,8 @@ Status MakeIntBatchSized(int length, std::shared_ptr<RecordBatch>* out) {
   // Example data
   std::shared_ptr<Array> a0, a1;
   MemoryPool* pool = default_memory_pool();
-  RETURN_NOT_OK(MakeRandomInt32Array(length, false, pool, &a0));
-  RETURN_NOT_OK(MakeRandomInt32Array(length, true, pool, &a1));
+  RETURN_NOT_OK(MakeRandomInt32Array(length, false, pool, &a0, seed));
+  RETURN_NOT_OK(MakeRandomInt32Array(length, true, pool, &a1, seed + 1));
   *out = RecordBatch::Make(schema, length, {a0, a1});
   return Status::OK();
 }
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index ec6c5d0..60ca34e 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -96,11 +96,14 @@ static inline bool NeedTruncate(int64_t offset, const Buffer* buffer,
   return offset != 0 || min_length < buffer->size();
 }
 
+namespace internal {
+
 class RecordBatchSerializer : public ArrayVisitor {
  public:
   RecordBatchSerializer(MemoryPool* pool, int64_t buffer_start_offset,
-                        int max_recursion_depth, bool allow_64bit)
-      : pool_(pool),
+                        int max_recursion_depth, bool allow_64bit, IpcPayload* out)
+      : out_(out),
+        pool_(pool),
         max_recursion_depth_(max_recursion_depth),
         buffer_start_offset_(buffer_start_offset),
         allow_64bit_(allow_64bit) {
@@ -125,19 +128,25 @@ class RecordBatchSerializer : public ArrayVisitor {
       std::shared_ptr<Buffer> bitmap;
       RETURN_NOT_OK(GetTruncatedBitmap(arr.offset(), arr.length(), arr.null_bitmap(),
                                        pool_, &bitmap));
-      buffers_.push_back(bitmap);
+      out_->body_buffers.emplace_back(bitmap);
     } else {
       // Push a dummy zero-length buffer, not to be copied
-      buffers_.push_back(std::make_shared<Buffer>(nullptr, 0));
+      out_->body_buffers.emplace_back(std::make_shared<Buffer>(nullptr, 0));
     }
     return arr.Accept(this);
   }
 
-  Status Assemble(const RecordBatch& batch, int64_t* body_length) {
+  // Override this for writing dictionary metadata
+  virtual Status SerializeMetadata(int64_t num_rows) {
+    return WriteRecordBatchMessage(num_rows, out_->body_length, field_nodes_,
+                                   buffer_meta_, &out_->metadata);
+  }
+
+  Status Assemble(const RecordBatch& batch) {
     if (field_nodes_.size() > 0) {
       field_nodes_.clear();
       buffer_meta_.clear();
-      buffers_.clear();
+      out_->body_buffers.clear();
     }
 
     // Perform depth-first traversal of the row-batch
@@ -149,11 +158,11 @@ class RecordBatchSerializer : public ArrayVisitor {
     // reference. May be 0 or some other position in an address space
     int64_t offset = buffer_start_offset_;
 
-    buffer_meta_.reserve(buffers_.size());
+    buffer_meta_.reserve(out_->body_buffers.size());
 
     // Construct the buffer metadata for the record batch header
-    for (size_t i = 0; i < buffers_.size(); ++i) {
-      const Buffer* buffer = buffers_[i].get();
+    for (size_t i = 0; i < out_->body_buffers.size(); ++i) {
+      const Buffer* buffer = out_->body_buffers[i].get();
       int64_t size = 0;
       int64_t padding = 0;
 
@@ -167,69 +176,15 @@ class RecordBatchSerializer : public ArrayVisitor {
       offset += size + padding;
     }
 
-    *body_length = offset - buffer_start_offset_;
-    DCHECK(BitUtil::IsMultipleOf8(*body_length));
-
-    return Status::OK();
-  }
-
-  // Override this for writing dictionary metadata
-  virtual Status WriteMetadataMessage(int64_t num_rows, int64_t body_length,
-                                      std::shared_ptr<Buffer>* out) {
-    return WriteRecordBatchMessage(num_rows, body_length, field_nodes_, buffer_meta_,
-                                   out);
-  }
-
-  Status Write(const RecordBatch& batch, io::OutputStream* dst, int32_t* metadata_length,
-               int64_t* body_length) {
-    RETURN_NOT_OK(Assemble(batch, body_length));
-
-#ifndef NDEBUG
-    int64_t start_position, current_position;
-    RETURN_NOT_OK(dst->Tell(&start_position));
-#endif
+    out_->body_length = offset - buffer_start_offset_;
+    DCHECK(BitUtil::IsMultipleOf8(out_->body_length));
 
     // Now that we have computed the locations of all of the buffers in shared
     // memory, the data header can be converted to a flatbuffer and written out
     //
     // Note: The memory written here is prefixed by the size of the flatbuffer
     // itself as an int32_t.
-    std::shared_ptr<Buffer> metadata_fb;
-    RETURN_NOT_OK(WriteMetadataMessage(batch.num_rows(), *body_length, &metadata_fb));
-    RETURN_NOT_OK(internal::WriteMessage(*metadata_fb, dst, metadata_length));
-
-#ifndef NDEBUG
-    RETURN_NOT_OK(dst->Tell(&current_position));
-    DCHECK(BitUtil::IsMultipleOf8(current_position));
-#endif
-
-    // Now write the buffers
-    for (size_t i = 0; i < buffers_.size(); ++i) {
-      const Buffer* buffer = buffers_[i].get();
-      int64_t size = 0;
-      int64_t padding = 0;
-
-      // The buffer might be null if we are handling zero row lengths.
-      if (buffer) {
-        size = buffer->size();
-        padding = BitUtil::RoundUpToMultipleOf8(size) - size;
-      }
-
-      if (size > 0) {
-        RETURN_NOT_OK(dst->Write(buffer->data(), size));
-      }
-
-      if (padding > 0) {
-        RETURN_NOT_OK(dst->Write(kPaddingBytes, padding));
-      }
-    }
-
-#ifndef NDEBUG
-    RETURN_NOT_OK(dst->Tell(&current_position));
-    DCHECK(BitUtil::IsMultipleOf8(current_position));
-#endif
-
-    return Status::OK();
+    return SerializeMetadata(batch.num_rows());
   }
 
  protected:
@@ -251,7 +206,7 @@ class RecordBatchSerializer : public ArrayVisitor {
                    data->size() - byte_offset);
       data = SliceBuffer(data, byte_offset, buffer_length);
     }
-    buffers_.push_back(data);
+    out_->body_buffers.emplace_back(data);
     return Status::OK();
   }
 
@@ -303,8 +258,8 @@ class RecordBatchSerializer : public ArrayVisitor {
       data = SliceBuffer(data, start_offset, slice_length);
     }
 
-    buffers_.push_back(value_offsets);
-    buffers_.push_back(data);
+    out_->body_buffers.emplace_back(value_offsets);
+    out_->body_buffers.emplace_back(data);
     return Status::OK();
   }
 
@@ -312,12 +267,12 @@ class RecordBatchSerializer : public ArrayVisitor {
     std::shared_ptr<Buffer> data;
     RETURN_NOT_OK(
         GetTruncatedBitmap(array.offset(), array.length(), array.values(), pool_, &data));
-    buffers_.push_back(data);
+    out_->body_buffers.emplace_back(data);
     return Status::OK();
   }
 
   Status Visit(const NullArray& array) override {
-    buffers_.push_back(nullptr);
+    out_->body_buffers.emplace_back(nullptr);
     return Status::OK();
   }
 
@@ -352,7 +307,7 @@ class RecordBatchSerializer : public ArrayVisitor {
   Status Visit(const ListArray& array) override {
     std::shared_ptr<Buffer> value_offsets;
     RETURN_NOT_OK(GetZeroBasedValueOffsets<ListArray>(array, &value_offsets));
-    buffers_.push_back(value_offsets);
+    out_->body_buffers.emplace_back(value_offsets);
 
     --max_recursion_depth_;
     std::shared_ptr<Array> values = array.values();
@@ -390,7 +345,7 @@ class RecordBatchSerializer : public ArrayVisitor {
     std::shared_ptr<Buffer> type_ids;
     RETURN_NOT_OK(GetTruncatedBuffer<UnionArray::type_id_t>(
         offset, length, array.type_ids(), pool_, &type_ids));
-    buffers_.push_back(type_ids);
+    out_->body_buffers.emplace_back(type_ids);
 
     --max_recursion_depth_;
     if (array.mode() == UnionMode::DENSE) {
@@ -449,7 +404,7 @@ class RecordBatchSerializer : public ArrayVisitor {
 
         value_offsets = shifted_offsets_buffer;
       }
-      buffers_.push_back(value_offsets);
+      out_->body_buffers.emplace_back(value_offsets);
 
       // Visit children and slice accordingly
       for (int i = 0; i < type.num_children(); ++i) {
@@ -487,12 +442,14 @@ class RecordBatchSerializer : public ArrayVisitor {
     return array.indices()->Accept(this);
   }
 
+  // Destination for output buffers
+  IpcPayload* out_;
+
   // In some cases, intermediate buffers may need to be allocated (with sliced arrays)
   MemoryPool* pool_;
 
   std::vector<internal::FieldMetadata> field_nodes_;
   std::vector<internal::BufferMetadata> buffer_meta_;
-  std::vector<std::shared_ptr<Buffer>> buffers_;
 
   int64_t max_recursion_depth_;
   int64_t buffer_start_offset_;
@@ -501,29 +458,79 @@ class RecordBatchSerializer : public ArrayVisitor {
 
 class DictionaryWriter : public RecordBatchSerializer {
  public:
-  using RecordBatchSerializer::RecordBatchSerializer;
-
-  Status WriteMetadataMessage(int64_t num_rows, int64_t body_length,
-                              std::shared_ptr<Buffer>* out) override {
-    return WriteDictionaryMessage(dictionary_id_, num_rows, body_length, field_nodes_,
-                                  buffer_meta_, out);
+  DictionaryWriter(int64_t dictionary_id, MemoryPool* pool, int64_t buffer_start_offset,
+                   int max_recursion_depth, bool allow_64bit, IpcPayload* out)
+      : RecordBatchSerializer(pool, buffer_start_offset, max_recursion_depth, allow_64bit,
+                              out),
+        dictionary_id_(dictionary_id) {}
+
+  Status SerializeMetadata(int64_t num_rows) override {
+    return WriteDictionaryMessage(dictionary_id_, num_rows, out_->body_length,
+                                  field_nodes_, buffer_meta_, &out_->metadata);
   }
 
-  Status Write(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,
-               io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length) {
-    dictionary_id_ = dictionary_id;
-
+  Status Assemble(const std::shared_ptr<Array>& dictionary) {
     // Make a dummy record batch. A bit tedious as we have to make a schema
     auto schema = arrow::schema({arrow::field("dictionary", dictionary->type())});
     auto batch = RecordBatch::Make(schema, dictionary->length(), {dictionary});
-    return RecordBatchSerializer::Write(*batch, dst, metadata_length, body_length);
+    return RecordBatchSerializer::Assemble(*batch);
   }
 
  private:
-  // TODO(wesm): Setting this in Write is a bit unclean, but it works
   int64_t dictionary_id_;
 };
 
+Status WriteIpcPayload(const IpcPayload& payload, io::OutputStream* dst,
+                       int32_t* metadata_length) {
+#ifndef NDEBUG
+  int64_t start_position, current_position;
+  RETURN_NOT_OK(dst->Tell(&start_position));
+#endif
+
+  RETURN_NOT_OK(internal::WriteMessage(*payload.metadata, dst, metadata_length));
+
+#ifndef NDEBUG
+  RETURN_NOT_OK(dst->Tell(&current_position));
+  DCHECK(BitUtil::IsMultipleOf8(current_position));
+#endif
+
+  // Now write the buffers
+  for (size_t i = 0; i < payload.body_buffers.size(); ++i) {
+    const Buffer* buffer = payload.body_buffers[i].get();
+    int64_t size = 0;
+    int64_t padding = 0;
+
+    // The buffer might be null if we are handling zero row lengths.
+    if (buffer) {
+      size = buffer->size();
+      padding = BitUtil::RoundUpToMultipleOf8(size) - size;
+    }
+
+    if (size > 0) {
+      RETURN_NOT_OK(dst->Write(buffer->data(), size));
+    }
+
+    if (padding > 0) {
+      RETURN_NOT_OK(dst->Write(kPaddingBytes, padding));
+    }
+  }
+
+#ifndef NDEBUG
+  RETURN_NOT_OK(dst->Tell(&current_position));
+  DCHECK(BitUtil::IsMultipleOf8(current_position));
+#endif
+
+  return Status::OK();
+}
+
+Status GetRecordBatchPayload(const RecordBatch& batch, MemoryPool* pool,
+                             IpcPayload* out) {
+  RecordBatchSerializer writer(pool, 0, kMaxNestingDepth, true, out);
+  return writer.Assemble(batch);
+}
+
+}  // namespace internal
+
 // Adds padding bytes if necessary to ensure all memory blocks are written on
 // 64-byte boundaries.
 Status AlignStreamPosition(io::OutputStream* stream) {
@@ -540,9 +547,18 @@ Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
                         io::OutputStream* dst, int32_t* metadata_length,
                         int64_t* body_length, MemoryPool* pool, int max_recursion_depth,
                         bool allow_64bit) {
-  RecordBatchSerializer writer(pool, buffer_start_offset, max_recursion_depth,
-                               allow_64bit);
-  return writer.Write(batch, dst, metadata_length, body_length);
+  internal::IpcPayload payload;
+  internal::RecordBatchSerializer writer(pool, buffer_start_offset, max_recursion_depth,
+                                         allow_64bit, &payload);
+  RETURN_NOT_OK(writer.Assemble(batch));
+
+  // TODO(wesm): it's a rough edge that the metadata and body length here are
+  // computed separately
+
+  // The body size is computed in the payload
+  *body_length = payload.body_length;
+
+  return internal::WriteIpcPayload(payload, dst, metadata_length);
 }
 
 Status WriteRecordBatchStream(const std::vector<std::shared_ptr<RecordBatch>>& batches,
@@ -675,8 +691,14 @@ Status GetTensorMessage(const Tensor& tensor, MemoryPool* pool,
 Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,
                        int64_t buffer_start_offset, io::OutputStream* dst,
                        int32_t* metadata_length, int64_t* body_length, MemoryPool* pool) {
-  DictionaryWriter writer(pool, buffer_start_offset, kMaxNestingDepth, false);
-  return writer.Write(dictionary_id, dictionary, dst, metadata_length, body_length);
+  internal::IpcPayload payload;
+  internal::DictionaryWriter writer(dictionary_id, pool, buffer_start_offset,
+                                    kMaxNestingDepth, true, &payload);
+  RETURN_NOT_OK(writer.Assemble(dictionary));
+
+  // The body size is computed in the payload
+  *body_length = payload.body_length;
+  return internal::WriteIpcPayload(payload, dst, metadata_length);
 }
 
 Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) {
diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h
index 6dbf29d..bcf09aa 100644
--- a/cpp/src/arrow/ipc/writer.h
+++ b/cpp/src/arrow/ipc/writer.h
@@ -269,6 +269,36 @@ ARROW_EXPORT
 Status WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t* metadata_length,
                    int64_t* body_length);
 
+namespace internal {
+
+// These internal APIs may change without warning or deprecation
+
+// Intermediate data structure with metadata header plus zero or more buffers
+// for the message body. This data can either be written out directly as an
+// encapsulated IPC message or used with Flight RPCs
+struct IpcPayload {
+  Message::Type type;
+  std::shared_ptr<Buffer> metadata;
+  std::vector<std::shared_ptr<Buffer>> body_buffers;
+  int64_t body_length;
+};
+
+/// \brief Extract IPC payloads from given schema for purposes of wire
+/// transport, separate from using the *StreamWriter classes
+ARROW_EXPORT
+Status GetDictionaryPayloads(const Schema& schema,
+                             std::vector<std::unique_ptr<IpcPayload>>* out);
+
+/// \brief Compute IpcPayload for the given record batch
+/// \param[in] batch the RecordBatch that is being serialized
+/// \param[in,out] pool for any required temporary memory allocations
+/// \param[out] out the returned IpcPayload
+/// \return Status
+ARROW_EXPORT
+Status GetRecordBatchPayload(const RecordBatch& batch, MemoryPool* pool, IpcPayload* out);
+
+}  // namespace internal
+
 }  // namespace ipc
 }  // namespace arrow
 
diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h
index 1a50a07..b7179a3 100644
--- a/cpp/src/arrow/test-util.h
+++ b/cpp/src/arrow/test-util.h
@@ -18,7 +18,14 @@
 #ifndef ARROW_TEST_UTIL_H_
 #define ARROW_TEST_UTIL_H_
 
+#ifndef _WIN32
+#include <sys/stat.h>
+#include <sys/wait.h>
+#include <unistd.h>
+#endif
+
 #include <algorithm>
+#include <chrono>
 #include <cstdint>
 #include <cstdlib>
 #include <iostream>
@@ -27,6 +34,7 @@
 #include <random>
 #include <sstream>
 #include <string>
+#include <thread>
 #include <vector>
 
 #include <gtest/gtest.h>
@@ -44,6 +52,11 @@
 #include "arrow/util/decimal.h"
 #include "arrow/util/logging.h"
 
+static inline void sleep_for(double seconds) {
+  std::this_thread::sleep_for(
+      std::chrono::nanoseconds(static_cast<int64_t>(seconds * 1e9)));
+}
+
 #define STRINGIFY(x) #x
 
 #define ASSERT_RAISES(ENUM, expr)                                         \
@@ -272,6 +285,17 @@ void rand_uniform_int(int64_t n, uint32_t seed, T min_value, T max_value, U* out
   std::generate(out, out + n, [&d, &gen] { return static_cast<U>(d(gen)); });
 }
 
+template <typename T, typename Enable = void>
+struct GenerateRandom {};
+
+template <typename T>
+struct GenerateRandom<T, typename std::enable_if<std::is_integral<T>::value>::type> {
+  static void Gen(int64_t length, uint32_t seed, void* out) {
+    rand_uniform_int(length, seed, std::numeric_limits<T>::min(),
+                     std::numeric_limits<T>::max(), reinterpret_cast<T*>(out));
+  }
+};
+
 static inline void random_ascii(int64_t n, uint32_t seed, uint8_t* out) {
   rand_uniform_int(n, seed, static_cast<int32_t>('A'), static_cast<int32_t>('z'), out);
 }
@@ -280,13 +304,13 @@ static inline int64_t CountNulls(const std::vector<uint8_t>& valid_bytes) {
   return static_cast<int64_t>(std::count(valid_bytes.cbegin(), valid_bytes.cend(), '\0'));
 }
 
-Status MakeRandomInt32Buffer(int64_t length, MemoryPool* pool,
-                             std::shared_ptr<ResizableBuffer>* out, uint32_t seed = 0) {
+template <typename T>
+Status MakeRandomBuffer(int64_t length, MemoryPool* pool,
+                        std::shared_ptr<ResizableBuffer>* out, uint32_t seed = 0) {
   DCHECK(pool);
   std::shared_ptr<ResizableBuffer> result;
-  RETURN_NOT_OK(AllocateResizableBuffer(pool, sizeof(int32_t) * length, &result));
-  rand_uniform_int(length, seed, 0, std::numeric_limits<int32_t>::max(),
-                   reinterpret_cast<int32_t*>(result->mutable_data()));
+  RETURN_NOT_OK(AllocateResizableBuffer(pool, sizeof(T) * length, &result));
+  GenerateRandom<T>::Gen(length, seed, result->mutable_data());
   *out = result;
   return Status::OK();
 }
@@ -344,6 +368,15 @@ void AssertBufferEqual(const Buffer& buffer, const Buffer& expected) {
   ASSERT_TRUE(buffer.Equals(expected));
 }
 
+static inline void AssertSchemaEqual(const Schema& lhs, const Schema& rhs) {
+  if (!lhs.Equals(rhs)) {
+    std::stringstream ss;
+    ss << "left schema: " << lhs.ToString() << std::endl
+       << "right schema: " << rhs.ToString() << std::endl;
+    FAIL() << ss.str();
+  }
+}
+
 void PrintColumn(const Column& col, std::stringstream* ss) {
   const ChunkedArray& carr = *col.data();
   for (int i = 0; i < carr.num_chunks(); ++i) {
@@ -456,6 +489,53 @@ Status MakeArray(const std::vector<uint8_t>& valid_bytes, const std::vector<T>&
     }                                        \
   } while (false)
 
+static inline void CompareBatch(const RecordBatch& left, const RecordBatch& right) {
+  if (!left.schema()->Equals(*right.schema())) {
+    FAIL() << "Left schema: " << left.schema()->ToString()
+           << "\nRight schema: " << right.schema()->ToString();
+  }
+  ASSERT_EQ(left.num_columns(), right.num_columns())
+      << left.schema()->ToString() << " result: " << right.schema()->ToString();
+  ASSERT_EQ(left.num_rows(), right.num_rows());
+  for (int i = 0; i < left.num_columns(); ++i) {
+    if (!left.column(i)->Equals(right.column(i))) {
+      std::stringstream ss;
+      ss << "Idx: " << i << " Name: " << left.column_name(i);
+      ss << std::endl << "Left: ";
+      ASSERT_OK(PrettyPrint(*left.column(i), 0, &ss));
+      ss << std::endl << "Right: ";
+      ASSERT_OK(PrettyPrint(*right.column(i), 0, &ss));
+      FAIL() << ss.str();
+    }
+  }
+}
+
+// ----------------------------------------------------------------------
+// A RecordBatchReader for serving a sequence of in-memory record batches
+
+class BatchIterator : public RecordBatchReader {
+ public:
+  BatchIterator(const std::shared_ptr<Schema>& schema,
+                const std::vector<std::shared_ptr<RecordBatch>>& batches)
+      : schema_(schema), batches_(batches), position_(0) {}
+
+  std::shared_ptr<Schema> schema() const override { return schema_; }
+
+  Status ReadNext(std::shared_ptr<RecordBatch>* out) override {
+    if (position_ >= batches_.size()) {
+      *out = nullptr;
+    } else {
+      *out = batches_[position_++];
+    }
+    return Status::OK();
+  }
+
+ private:
+  std::shared_ptr<Schema> schema_;
+  std::vector<std::shared_ptr<RecordBatch>> batches_;
+  size_t position_;
+};
+
 }  // namespace arrow
 
 #endif  // ARROW_TEST_UTIL_H_
diff --git a/cpp/src/arrow/util/memory.h b/cpp/src/arrow/util/memory.h
index 56fadca..121f301 100644
--- a/cpp/src/arrow/util/memory.h
+++ b/cpp/src/arrow/util/memory.h
@@ -59,8 +59,8 @@ void parallel_memcopy(uint8_t* dst, const uint8_t* src, int64_t nbytes,
   std::vector<std::future<void*>> futures;
 
   for (int i = 0; i < num_threads; i++) {
-    futures.push_back(pool->Submit(memcpy, dst + prefix + i * chunk_size,
-                                   left + i * chunk_size, chunk_size));
+    futures.emplace_back(pool->Submit(memcpy, dst + prefix + i * chunk_size,
+                                      left + i * chunk_size, chunk_size));
   }
   memcpy(dst, src, prefix);
   memcpy(dst + prefix + num_threads * chunk_size, right, suffix);
diff --git a/cpp/src/parquet/util/stopwatch.h b/cpp/src/arrow/util/stopwatch.h
similarity index 71%
rename from cpp/src/parquet/util/stopwatch.h
rename to cpp/src/arrow/util/stopwatch.h
index 68cf792..f16c2ec 100644
--- a/cpp/src/parquet/util/stopwatch.h
+++ b/cpp/src/arrow/util/stopwatch.h
@@ -15,8 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef PARQUET_UTIL_STOPWATCH_H
-#define PARQUET_UTIL_STOPWATCH_H
+#pragma once
 
 #include <stdio.h>
 #ifndef _MSC_VER
@@ -26,27 +25,25 @@
 #include <ctime>
 #include <iostream>
 
-namespace parquet {
+namespace arrow {
+
+uint64_t CurrentTime() {
+  timespec time;
+  clock_gettime(CLOCK_MONOTONIC, &time);
+  return 1000000000L * time.tv_sec + time.tv_nsec;
+}
 
 class StopWatch {
  public:
   StopWatch() {}
 
-  void Start() { gettimeofday(&start_time, 0); }
+  void Start() { start_ = CurrentTime(); }
 
   // Returns time in nanoseconds.
-  uint64_t Stop() {
-    struct timeval t_time;
-    gettimeofday(&t_time, 0);
-
-    return (1000L * 1000L * 1000L * (t_time.tv_sec - start_time.tv_sec) +
-            (t_time.tv_usec - start_time.tv_usec));
-  }
+  uint64_t Stop() { return CurrentTime() - start_; }
 
  private:
-  struct timeval start_time;
+  uint64_t start_;
 };
 
-}  // namespace parquet
-
-#endif
+}  // namespace arrow
diff --git a/cpp/src/parquet/util/CMakeLists.txt b/cpp/src/parquet/util/CMakeLists.txt
index debf4b3..72d4ca2 100644
--- a/cpp/src/parquet/util/CMakeLists.txt
+++ b/cpp/src/parquet/util/CMakeLists.txt
@@ -20,7 +20,6 @@ install(FILES
   comparison.h
   macros.h
   memory.h
-  stopwatch.h
   visibility.h
   DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/parquet/util")
 
diff --git a/cpp/thirdparty/versions.txt b/cpp/thirdparty/versions.txt
index cdea520..381282b 100644
--- a/cpp/thirdparty/versions.txt
+++ b/cpp/thirdparty/versions.txt
@@ -29,8 +29,8 @@ BROTLI_VERSION=v0.6.0
 LZ4_VERSION=v1.7.5
 ZLIB_VERSION=1.2.8
 ZSTD_VERSION=v1.2.0
-PROTOBUF_VERSION=v2.6.0
-GRPC_VERSION=v1.12.1
+PROTOBUF_VERSION=v3.6.1
+GRPC_VERSION=1.14.1
 ORC_VERSION=1.5.1
 THRIFT_VERSION=0.11.0
 GLOG_VERSION=v0.3.5
diff --git a/format/Flight.proto b/format/Flight.proto
new file mode 100644
index 0000000..ab1d620
--- /dev/null
+++ b/format/Flight.proto
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+option java_package = "org.apache.arrow.flight.impl";
+package arrow.flight.protocol;
+
+/*
+ * A flight service is an endpoint for retrieving or storing Arrow data. A
+ * flight service can expose one or more predefined endpoints that can be
+ * access using the Arrow Flight Protocol.  Additionally, the a flight service
+ * and expose a set of actions that are available.
+ */
+service FlightService {
+
+  /*
+   * Handshake between client and server. Depending on the server, the
+   * handshake may be required to determine the token that should be used for
+   * future operations. Both request and response are streams to allow multiple
+   * roundtrips depending on auth mechanism.
+   */
+  rpc Handshake(stream HandshakeRequest) returns (stream HandshakeResponse) {}
+
+  /*
+   * Get a list of available streams given a particular criteria. Most flight
+   * services will expose one or more streams that are readily available for
+   * retrieval. This api allows listing the streams available for
+   * consumption. A user can also provide a criteria. The criteria can limit
+   * the subset of streams that can be listed via this interface. Each flight
+   * service allows its own definition of how to consume criteria.
+   */
+  rpc ListFlights(Criteria) returns (stream FlightGetInfo) {}
+
+  /*
+   * For a given FlightDescriptor, get information about how the flight can be
+   * consumed. This is a useful interface if the consumer of the interface
+   * already can identify the specific flight to consume. This interface can
+   * also allow a consumer to generate a flight stream through a specified
+   * descriptor. For example, a flight descriptor might be something that
+   * includes a SQL statement or a Pickled Python operation that will be
+   * executed. In those cases, the descriptor will not be previously available
+   * within the list of available streams provided by ListFlights but will be
+   * available for consumption for the duration defined by the specific flight
+   * service.
+   */
+  rpc GetFlightInfo(FlightDescriptor) returns (FlightGetInfo) {}
+
+  /*
+   * Retrieve a single stream associated with a particular descriptor
+   * associated with the referenced ticket. A Flight can be composed of one or
+   * more streams where each stream can be retrieved using a separate opaque
+   * ticket that the flight service uses for managing a collection of streams.
+   */
+  rpc DoGet(Ticket) returns (stream FlightData) {}
+
+  /*
+   * Push a stream to the flight service using associated with a particular
+   * flight stream. This allows a client of a flight service to upload a stream
+   * of data. Depending on the particular flight service, a client consumer
+   * could be allowed to upload a single stream per descriptor or an unlimited
+   * number. (In the latter, the service might implement a 'seal' action that
+   * can be applied to a descriptor once all streams are uploaded.
+   */
+  rpc DoPut(stream FlightData) returns (PutResult) {}
+
+  /*
+   * Flight services can support an arbitrary number of simple actions in
+   * addition to the possible ListFlights, GetFlightInfo, DoGet, DoPut
+   * operations that are potentially available. DoAction allows a flight client
+   * to do a specific action against a flight service. An action includes
+   * opaque request and response objects that are specific to the type action
+   * being undertaken.
+   */
+  rpc DoAction(Action) returns (stream Result) {}
+
+  /*
+   * A flight service exposes all of the available action types that it has
+   * along with descriptions.  This allows different flight consumers to
+   * understand the capabilities of the flight servic
+   */
+  rpc ListActions(Empty) returns (stream ActionType) {}
+
+}
+
+/*
+ * The request that a client provides to a server on handshake.
+ */
+message HandshakeRequest {
+
+  /*
+   * A defined protocol version
+   */
+  uint64 protocol_version = 1;
+
+  /*
+   * Arbitrary auth/handshake info.
+   */
+  bytes payload = 2;
+}
+
+message HandshakeResponse {
+
+  /*
+   * A defined protocol version
+   */
+  uint64 protocol_version = 1;
+
+  /*
+   * Arbitrary auth/handshake info.
+   */
+  bytes payload = 2;
+}
+
+/*
+ * A message for doing simple auth.
+ */
+message BasicAuth {
+  string username = 2;
+  string password = 3;
+}
+
+message Empty {}
+
+/*
+ * Describes an available action, including both the name used for execution
+ * along with a short description of the purpose of the action.
+ */
+message ActionType {
+  string type = 1;
+  string description = 2;
+}
+
+/*
+ * A service specific expression that can be used to return a limited the set
+ * of available Arrow Flight streams.
+ */
+message Criteria {
+  bytes expression = 1;
+}
+
+/*
+ * An opaque action specific for the service.
+ */
+message Action {
+  string type = 1;
+  bytes body = 2;
+}
+
+/*
+ * An opaque result returned after execution an action.
+ */
+message Result {
+  bytes body = 1;
+}
+
+/*
+ * The name or tag for a Flight. May be used as a way to retrieve or generate
+ * a flight or be used to expose a set of previously defined flights.
+ */
+message FlightDescriptor {
+
+  /*
+   * Describes what type of descriptor is defined.
+   */
+  enum DescriptorType {
+
+    // Protobuf pattern, not used.
+    UNKNOWN = 0;
+
+    /*
+     * A named path that identifies a dataset. A path is composed of a string
+     * or list of strings describing a particular dataset. This is conceptually
+     *  similar to a path inside a filesystem.
+     */
+    PATH = 1;
+
+    /*
+     * An opaque command to generate a dataset.
+     */
+    CMD = 2;
+  }
+
+  DescriptorType type = 1;
+
+  /*
+   * Opaque value used to express a command. Should only be defined when
+   * type = CMD.
+   */
+  bytes cmd = 2;
+
+  /*
+   * List of strings identifying a particular dataset. Should only be defined
+   * when type = PATH.
+   */
+  repeated string path = 3;
+}
+
+/*
+ * The access coordinates for retrieval of a dataset. With a FlightGetInfo, a
+ * consumer is able to determine how to retrieve a dataset.
+ */
+message FlightGetInfo {
+  // schema of the dataset as described in Schema.fbs::Schema
+  bytes schema = 1;
+
+  /*
+   * The descriptor associated with this info.
+   */
+  FlightDescriptor flight_descriptor = 2;
+
+  /*
+   * A list of endpoints associated with the flight. To consume the whole
+   * flight, all endpoints must be consumed.
+   */
+  repeated FlightEndpoint endpoint = 3;
+
+  uint64 total_records = 4;
+  uint64 total_bytes = 5;
+}
+
+/*
+ * A particular stream or split associated with a flight.
+ */
+message FlightEndpoint {
+
+  /*
+   * Token used to retrieve this stream.
+   */
+  Ticket ticket = 1;
+
+  /*
+   * A list of locations where this ticket can be redeemed. If the list is
+   * empty, the expectation is that the ticket can only be redeemed on the
+   * current service where the ticket was generated.
+   */
+  repeated Location location = 2;
+}
+
+/*
+ * A location where a flight service will accept retrieval of a particular
+ *  stream given a ticket.
+ */
+message Location {
+  string host = 1;
+  int32 port = 2;
+}
+
+/*
+ * An opaque identifier that the service can use to retrieve a particular
+ * portion of a stream.
+ */
+message Ticket {
+  bytes ticket = 1;
+}
+
+/*
+ * A batch of Arrow data as part of a stream of batches.
+ */
+message FlightData {
+
+  /*
+   * The descriptor of the data. This is only relevant when a client is
+   * starting a new DoPut stream
+   */
+  FlightDescriptor flight_descriptor = 1;
+
+  /*
+   * Header for message data as described in Message.fbs::Message
+   */
+  bytes data_header = 2;
+
+  /*
+   * The actual batch of Arrow data. Preferrably handled with minimal-copies
+   * comes last in the definition to help with sidecar patterns.
+   */
+  bytes data_body = 1000;
+}
+
+/**
+ * The response message (currently empty) associated with the submission of a
+ * DoPut.
+ */
+message PutResult {}
diff --git a/integration/README.md b/integration/README.md
index 0a428e6..760a175 100644
--- a/integration/README.md
+++ b/integration/README.md
@@ -70,7 +70,7 @@ Java `arrow-tool` JAR and the build path for the C++ executables:
 JAVA_DIR=$ARROW_HOME/java
 CPP_BUILD_DIR=$ARROW_HOME/cpp/build
 
-VERSION=0.1.1-SNAPSHOT
+VERSION=0.11.0-SNAPSHOT
 export ARROW_JAVA_INTEGRATION_JAR=$JAVA_DIR/tools/target/arrow-tools-$VERSION-jar-with-dependencies.jar
 export ARROW_CPP_EXE_PATH=$CPP_BUILD_DIR/debug
 ```
diff --git a/python/.gitignore b/python/.gitignore
index fac4e99..3346aa6 100644
--- a/python/.gitignore
+++ b/python/.gitignore
@@ -42,5 +42,4 @@ pyarrow/_table_api.h
 manylinux1/arrow
 
 # plasma store
-pyarrow/plasma_store
 pyarrow/plasma_store_server
diff --git a/python/README.md b/python/README.md
index c732e3b..e905862 100644
--- a/python/README.md
+++ b/python/README.md
@@ -48,8 +48,8 @@ The code must pass `flake8` (available from pip or conda) or it will fail the
 build. Check for style errors before submitting your pull request with:
 
 ```
-flake8 pyarrow
-flake8 --config=.flake8.cython pyarrow
+flake8 .
+flake8 --config=.flake8.cython .
 ```
 
 ### Building from Source