You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/02/12 22:43:33 UTC

[arrow] branch master updated: ARROW-3292: [C++] Test Flight RPC in Travis CI

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new af60c2e  ARROW-3292: [C++] Test Flight RPC in Travis CI
af60c2e is described below

commit af60c2e730236cfbbd207e1c0bd655bbb230bae1
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Tue Feb 12 16:43:24 2019 -0600

    ARROW-3292: [C++] Test Flight RPC in Travis CI
    
    Enabled on Ubuntu 16.04 and macOS.
    
    Author: Antoine Pitrou <an...@python.org>
    
    Closes #3626 from pitrou/ARROW-3292-flight-travis-ci and squashes the following commits:
    
    afd1fd31 <Antoine Pitrou> ARROW-3292:  Test Flight RPC in Travis-CI
---
 .travis.yml                                        |   5 +-
 ci/conda_env_cpp.yml                               |   2 +
 ci/travis_before_script_cpp.sh                     |   4 +
 cpp/cmake_modules/Findc-ares.cmake                 |   7 +-
 cpp/src/arrow/flight/CMakeLists.txt                |  51 ++++---
 cpp/src/arrow/flight/client.cc                     |   5 +-
 cpp/src/arrow/flight/client.h                      |   2 +-
 cpp/src/arrow/flight/flight-test.cc                |   3 +-
 cpp/src/arrow/flight/perf-server.cc                |   4 +-
 cpp/src/arrow/flight/serialization-internal.h      |  33 +----
 cpp/src/arrow/flight/test-integration-client.cc    |   4 +-
 cpp/src/arrow/flight/test-integration-server.cc    |   2 +
 cpp/src/arrow/flight/{test-util.h => test-util.cc} | 151 +++++++++++----------
 cpp/src/arrow/flight/test-util.h                   |  86 ++++--------
 cpp/src/arrow/gpu/cuda-benchmark.cc                |   1 +
 cpp/src/arrow/util/stopwatch.h                     |  29 ++--
 16 files changed, 184 insertions(+), 205 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 2873dc0..60c4446 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -62,6 +62,7 @@ matrix:
     env:
     - ARROW_TRAVIS_VALGRIND=1
     - ARROW_TRAVIS_USE_TOOLCHAIN=1
+    - ARROW_TRAVIS_FLIGHT=1
     - ARROW_TRAVIS_PLASMA=1
     - ARROW_TRAVIS_ORC=1
     - ARROW_TRAVIS_PARQUET=1
@@ -86,10 +87,11 @@ matrix:
     os: linux
     jdk: openjdk8
     env:
+    - ARROW_TRAVIS_COVERAGE=1
     - ARROW_TRAVIS_USE_TOOLCHAIN=1
+    - ARROW_TRAVIS_FLIGHT=1
     - ARROW_TRAVIS_PLASMA=1
     - ARROW_TRAVIS_ORC=1
-    - ARROW_TRAVIS_COVERAGE=1
     - ARROW_TRAVIS_PARQUET=1
     - ARROW_TRAVIS_GANDIVA=1
     - ARROW_TRAVIS_GANDIVA_JAVA=1
@@ -181,6 +183,7 @@ matrix:
     env:
     - ARROW_TRAVIS_USE_TOOLCHAIN=1
     - ARROW_TRAVIS_PLASMA=1
+    - ARROW_TRAVIS_FLIGHT=1
     - ARROW_TRAVIS_ORC=1
     - ARROW_TRAVIS_PARQUET=1
     - ARROW_TRAVIS_GANDIVA=1
diff --git a/ci/conda_env_cpp.yml b/ci/conda_env_cpp.yml
index 3d50b21..fbe4576 100644
--- a/ci/conda_env_cpp.yml
+++ b/ci/conda_env_cpp.yml
@@ -20,12 +20,14 @@
 boost-cpp=1.68.0
 brotli
 bzip2
+c-ares
 cmake
 double-conversion
 flatbuffers
 gflags
 glog
 gmock
+grpc-cpp
 gtest
 libprotobuf
 lz4-c
diff --git a/ci/travis_before_script_cpp.sh b/ci/travis_before_script_cpp.sh
index 6c65cd6..a1e407f 100755
--- a/ci/travis_before_script_cpp.sh
+++ b/ci/travis_before_script_cpp.sh
@@ -86,6 +86,10 @@ if [ "$ARROW_TRAVIS_USE_TOOLCHAIN" == "1" ]; then
   fi
 fi
 
+if [ "$ARROW_TRAVIS_FLIGHT" == "1" ]; then
+  CMAKE_COMMON_FLAGS="$CMAKE_COMMON_FLAGS -DARROW_FLIGHT=ON"
+fi
+
 if [ "$ARROW_TRAVIS_PLASMA" == "1" ]; then
   CMAKE_COMMON_FLAGS="$CMAKE_COMMON_FLAGS -DARROW_PLASMA=ON"
 fi
diff --git a/cpp/cmake_modules/Findc-ares.cmake b/cpp/cmake_modules/Findc-ares.cmake
index 1366ce3..2b7b589 100644
--- a/cpp/cmake_modules/Findc-ares.cmake
+++ b/cpp/cmake_modules/Findc-ares.cmake
@@ -42,8 +42,9 @@ if (MSVC)
 else ()
   set(CARES_LIB_NAME
     ${CMAKE_SHARED_LIBRARY_PREFIX}cares${CMAKE_SHARED_LIBRARY_SUFFIX})
-  set(CARES_STATIC_LIB_NAME
-    ${CMAKE_STATIC_LIBRARY_PREFIX}cares${CMAKE_STATIC_LIBRARY_SUFFIX})
+  set(CARES_STATIC_LIB_NAMES
+    ${CMAKE_STATIC_LIBRARY_PREFIX}cares${CMAKE_STATIC_LIBRARY_SUFFIX}
+    ${CMAKE_STATIC_LIBRARY_PREFIX}cares_static${CMAKE_STATIC_LIBRARY_SUFFIX})
 endif ()
 
 # Try the parameterized roots, if they exist
@@ -56,7 +57,7 @@ if (_cares_roots)
     PATHS ${_cares_roots} NO_DEFAULT_PATH
     PATH_SUFFIXES "lib")
   find_library(CARES_STATIC_LIB
-    NAMES ${CARES_STATIC_LIB_NAME}
+    NAMES ${CARES_STATIC_LIB_NAMES}
     PATHS ${_cares_roots} NO_DEFAULT_PATH
     PATH_SUFFIXES "lib")
 else ()
diff --git a/cpp/src/arrow/flight/CMakeLists.txt b/cpp/src/arrow/flight/CMakeLists.txt
index 3e0323f..a32a5fa 100644
--- a/cpp/src/arrow/flight/CMakeLists.txt
+++ b/cpp/src/arrow/flight/CMakeLists.txt
@@ -28,12 +28,11 @@ set(ARROW_FLIGHT_STATIC_LINK_LIBS
     grpc_address_sorting_static
     cares_static)
 
-set(ARROW_FLIGHT_TEST_STATIC_LINK_LIBS
-    arrow_static
-    arrow_flight_static
-    arrow_testing_static
-    ${ARROW_FLIGHT_STATIC_LINK_LIBS}
-    ${PROTOBUF_LIBRARY})
+set(ARROW_FLIGHT_TEST_LINK_LIBS
+    arrow_flight_shared
+    arrow_flight_testing_shared
+    ${ARROW_TEST_SHARED_LINK_LIBS}
+    ${ARROW_FLIGHT_STATIC_LINK_LIBS})
 
 # TODO(wesm): Protobuf shared vs static linking
 
@@ -78,25 +77,43 @@ add_arrow_lib(arrow_flight
               arrow_static
               ${ARROW_FLIGHT_STATIC_LINK_LIBS})
 
+# Define arrow_flight_testing library
+if(ARROW_BUILD_TESTS OR ARROW_BUILD_BENCHMARKS)
+  add_arrow_lib(arrow_flight_testing
+                SOURCES
+                test-util.cc
+                DEPENDENCIES
+                ${GTEST_LIBRARY}
+                SHARED_LINK_LIBS
+                arrow_shared
+                arrow_flight_shared
+                ${BOOST_FILESYSTEM_LIBRARY}
+                ${BOOST_SYSTEM_LIBRARY}
+                ${GTEST_LIBRARY}
+                STATIC_LINK_LIBS
+                arrow_static
+                arrow_flight_static)
+endif()
+
 add_arrow_test(flight-test
                EXTRA_LINK_LIBS
-               ${ARROW_FLIGHT_TEST_STATIC_LINK_LIBS}
+               ${ARROW_FLIGHT_TEST_LINK_LIBS}
                LABELS
                "arrow_flight")
 
 # Build test server for unit tests or benchmarks
 if(ARROW_BUILD_TESTS OR ARROW_BUILD_BENCHMARKS)
   add_executable(flight-test-server test-server.cc)
-  target_link_libraries(flight-test-server ${ARROW_FLIGHT_TEST_STATIC_LINK_LIBS}
-                        gflags_static ${GTEST_LIBRARY})
+  target_link_libraries(flight-test-server ${ARROW_FLIGHT_TEST_LINK_LIBS} gflags_static
+                        ${GTEST_LIBRARY})
 
   add_executable(flight-test-integration-server test-integration-server.cc)
-  target_link_libraries(flight-test-integration-server
-                        ${ARROW_FLIGHT_TEST_STATIC_LINK_LIBS} gflags_static gtest_static)
+  target_link_libraries(flight-test-integration-server ${ARROW_FLIGHT_TEST_LINK_LIBS}
+                        gflags_static gtest_static)
 
   add_executable(flight-test-integration-client test-integration-client.cc)
-  target_link_libraries(flight-test-integration-client
-                        ${ARROW_FLIGHT_TEST_STATIC_LINK_LIBS} gflags_static gtest_static)
+  target_link_libraries(flight-test-integration-client ${ARROW_FLIGHT_TEST_LINK_LIBS}
+                        gflags_static gtest_static)
 
   # This is needed for the unit tests
   if(ARROW_BUILD_TESTS)
@@ -116,9 +133,9 @@ if(ARROW_BUILD_BENCHMARKS)
 
   add_executable(flight-perf-server perf-server.cc perf.pb.cc)
   target_link_libraries(flight-perf-server
-                        arrow_flight_static
-                        arrow_testing_static
-                        ${ARROW_FLIGHT_STATIC_LINK_LIBS}
+                        arrow_flight_shared
+                        arrow_flight_testing_shared
+                        ${ARROW_FLIGHT_TEST_LINK_LIBS}
                         gflags_static
                         ${GTEST_LIBRARY})
 
@@ -126,7 +143,7 @@ if(ARROW_BUILD_BENCHMARKS)
   target_link_libraries(flight-benchmark
                         arrow_flight_static
                         arrow_testing_static
-                        ${ARROW_FLIGHT_STATIC_LINK_LIBS}
+                        ${ARROW_FLIGHT_TEST_LINK_LIBS}
                         gflags_static
                         ${GTEST_LIBRARY})
 
diff --git a/cpp/src/arrow/flight/client.cc b/cpp/src/arrow/flight/client.cc
index a58c2b5..fd13f79 100644
--- a/cpp/src/arrow/flight/client.cc
+++ b/cpp/src/arrow/flight/client.cc
@@ -196,8 +196,11 @@ class FlightClient::FlightClientImpl {
     ss << host << ":" << port;
     std::string uri = ss.str();
 
+    grpc::ChannelArguments args;
+    // Try to reconnect quickly at first, in case the server is still starting up
+    args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, 100);
     stub_ = pb::FlightService::NewStub(
-        grpc::CreateChannel(ss.str(), grpc::InsecureChannelCredentials()));
+        grpc::CreateCustomChannel(ss.str(), grpc::InsecureChannelCredentials(), args));
     return Status::OK();
   }
 
diff --git a/cpp/src/arrow/flight/client.h b/cpp/src/arrow/flight/client.h
index ef96041..334158d 100644
--- a/cpp/src/arrow/flight/client.h
+++ b/cpp/src/arrow/flight/client.h
@@ -116,7 +116,7 @@ class ARROW_EXPORT FlightClient {
 /// \brief An interface to upload record batches to a Flight server
 class ARROW_EXPORT FlightPutWriter : public ipc::RecordBatchWriter {
  public:
-  ~FlightPutWriter();
+  ~FlightPutWriter() override;
 
   Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override;
   Status Close() override;
diff --git a/cpp/src/arrow/flight/flight-test.cc b/cpp/src/arrow/flight/flight-test.cc
index 12fe503..9268aec 100644
--- a/cpp/src/arrow/flight/flight-test.cc
+++ b/cpp/src/arrow/flight/flight-test.cc
@@ -28,10 +28,10 @@
 #include <iostream>
 #include <memory>
 #include <string>
+#include <thread>
 #include <vector>
 
 #include <gtest/gtest.h>
-#include <boost/process.hpp>
 
 #include "arrow/ipc/test-common.h"
 #include "arrow/status.h"
@@ -62,6 +62,7 @@ TEST(TestFlight, StartStopTestServer) {
   ASSERT_TRUE(server.IsRunning());
   int exit_code = server.Stop();
   ASSERT_EQ(0, exit_code);
+  ASSERT_FALSE(server.IsRunning());
 }
 
 // ----------------------------------------------------------------------
diff --git a/cpp/src/arrow/flight/perf-server.cc b/cpp/src/arrow/flight/perf-server.cc
index add5442..b470283 100644
--- a/cpp/src/arrow/flight/perf-server.cc
+++ b/cpp/src/arrow/flight/perf-server.cc
@@ -29,9 +29,11 @@
 #include "arrow/io/test-common.h"
 #include "arrow/ipc/writer.h"
 #include "arrow/record_batch.h"
+#include "arrow/testing/util.h"
 
+#include "arrow/flight/api.h"
+#include "arrow/flight/internal.h"
 #include "arrow/flight/perf.pb.h"
-#include "arrow/flight/server.h"
 #include "arrow/flight/test-util.h"
 
 DEFINE_int32(port, 31337, "Server port to listen on");
diff --git a/cpp/src/arrow/flight/serialization-internal.h b/cpp/src/arrow/flight/serialization-internal.h
index d4254d6..06cdcdf 100644
--- a/cpp/src/arrow/flight/serialization-internal.h
+++ b/cpp/src/arrow/flight/serialization-internal.h
@@ -25,6 +25,7 @@
 
 #include "google/protobuf/io/coded_stream.h"
 #include "google/protobuf/io/zero_copy_stream.h"
+#include "google/protobuf/io/zero_copy_stream_impl_lite.h"
 #include "google/protobuf/wire_format_lite.h"
 #include "grpc/byte_buffer_reader.h"
 #include "grpcpp/grpcpp.h"
@@ -66,33 +67,6 @@ namespace internal {
 using google::protobuf::io::CodedInputStream;
 using google::protobuf::io::CodedOutputStream;
 
-// More efficient writing of FlightData to gRPC output buffer
-// Implementation of ZeroCopyOutputStream that writes to a fixed-size buffer
-class FixedSizeProtoWriter : public ::google::protobuf::io::ZeroCopyOutputStream {
- public:
-  explicit FixedSizeProtoWriter(grpc_slice slice)
-      : slice_(slice),
-        bytes_written_(0),
-        total_size_(static_cast<int>(GRPC_SLICE_LENGTH(slice))) {}
-
-  bool Next(void** data, int* size) override {
-    // Consume the whole slice
-    *data = GRPC_SLICE_START_PTR(slice_) + bytes_written_;
-    *size = total_size_ - bytes_written_;
-    bytes_written_ = total_size_;
-    return true;
-  }
-
-  void BackUp(int count) override { bytes_written_ -= count; }
-
-  int64_t ByteCount() const override { return bytes_written_; }
-
- private:
-  grpc_slice slice_;
-  int bytes_written_;
-  int total_size_;
-};
-
 bool ReadBytesZeroCopy(const std::shared_ptr<arrow::Buffer>& source_data,
                        CodedInputStream* input, std::shared_ptr<arrow::Buffer>* out);
 
@@ -159,11 +133,11 @@ class GrpcBuffer : public arrow::MutableBuffer {
 namespace grpc {
 
 using arrow::flight::FlightData;
-using arrow::flight::internal::FixedSizeProtoWriter;
 using arrow::flight::internal::GrpcBuffer;
 using arrow::flight::internal::ReadBytesZeroCopy;
 
 using google::protobuf::internal::WireFormatLite;
+using google::protobuf::io::ArrayOutputStream;
 using google::protobuf::io::CodedInputStream;
 using google::protobuf::io::CodedOutputStream;
 
@@ -298,7 +272,8 @@ class SerializationTraits<IpcPayload> {
     // XXX(wesm): for debugging
     // std::cout << "Writing record batch with total size " << total_size << std::endl;
 
-    FixedSizeProtoWriter writer(*reinterpret_cast<grpc_slice*>(&slice));
+    ArrayOutputStream writer(const_cast<uint8_t*>(slice.begin()),
+                             static_cast<int>(slice.size()));
     CodedOutputStream pb_stream(&writer);
 
     // Write header
diff --git a/cpp/src/arrow/flight/test-integration-client.cc b/cpp/src/arrow/flight/test-integration-client.cc
index 6252283..89ae88c 100644
--- a/cpp/src/arrow/flight/test-integration-client.cc
+++ b/cpp/src/arrow/flight/test-integration-client.cc
@@ -29,10 +29,12 @@
 
 #include "arrow/io/test-common.h"
 #include "arrow/ipc/json.h"
+#include "arrow/ipc/writer.h"
 #include "arrow/record_batch.h"
 #include "arrow/table.h"
+#include "arrow/util/logging.h"
 
-#include "arrow/flight/server.h"
+#include "arrow/flight/api.h"
 #include "arrow/flight/test-util.h"
 
 DEFINE_string(host, "localhost", "Server port to connect to");
diff --git a/cpp/src/arrow/flight/test-integration-server.cc b/cpp/src/arrow/flight/test-integration-server.cc
index 7e201a0..0381e90 100644
--- a/cpp/src/arrow/flight/test-integration-server.cc
+++ b/cpp/src/arrow/flight/test-integration-server.cc
@@ -28,7 +28,9 @@
 #include "arrow/ipc/json.h"
 #include "arrow/record_batch.h"
 #include "arrow/table.h"
+#include "arrow/util/logging.h"
 
+#include "arrow/flight/internal.h"
 #include "arrow/flight/server.h"
 #include "arrow/flight/test-util.h"
 
diff --git a/cpp/src/arrow/flight/test-util.h b/cpp/src/arrow/flight/test-util.cc
similarity index 55%
copy from cpp/src/arrow/flight/test-util.h
copy to cpp/src/arrow/flight/test-util.cc
index 5dea008..7b8a7f3 100644
--- a/cpp/src/arrow/flight/test-util.h
+++ b/cpp/src/arrow/flight/test-util.cc
@@ -15,102 +15,109 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <cstdint>
-#include <cstdio>
-#include <cstring>
-#include <iostream>
-#include <memory>
-#include <string>
-#include <vector>
+#ifdef __APPLE__
+#include <limits.h>
+#include <mach-o/dyld.h>
+#endif
 
+#include <sstream>
+
+#include <boost/filesystem.hpp>
 #include <boost/process.hpp>
 
+#include <gtest/gtest.h>
+
 #include "arrow/ipc/test-common.h"
-#include "arrow/status.h"
 #include "arrow/testing/gtest_util.h"
+#include "arrow/util/logging.h"
 
 #include "arrow/flight/api.h"
 #include "arrow/flight/internal.h"
-
-namespace bp = boost::process;
+#include "arrow/flight/test-util.h"
 
 namespace arrow {
 namespace flight {
 
-// ----------------------------------------------------------------------
-// Fixture to use for running test servers
-
-struct TestServer {
- public:
-  explicit TestServer(const std::string& executable_name, int port)
-      : executable_name_(executable_name), port_(port) {}
-
-  void Start() {
-    std::string str_port = std::to_string(port_);
-    server_process_.reset(
-        new bp::child(bp::search_path(executable_name_), "-port", str_port));
-    std::cout << "Server running with pid " << server_process_->id() << std::endl;
-  }
-
-  int Stop() {
-    kill(server_process_->id(), SIGTERM);
-    server_process_->wait();
-    return server_process_->exit_code();
-  }
-
-  bool IsRunning() { return server_process_->running(); }
-
-  int port() const { return port_; }
-
- private:
-  std::string executable_name_;
-  int port_;
-  std::unique_ptr<bp::child> server_process_;
-};
+namespace bp = boost::process;
+namespace fs = boost::filesystem;
 
-// ----------------------------------------------------------------------
-// A RecordBatchReader for serving a sequence of in-memory record batches
+namespace {
 
-class BatchIterator : public RecordBatchReader {
- public:
-  BatchIterator(const std::shared_ptr<Schema>& schema,
-                const std::vector<std::shared_ptr<RecordBatch>>& batches)
-      : schema_(schema), batches_(batches), position_(0) {}
+Status ResolveCurrentExecutable(fs::path* out) {
+  // See https://stackoverflow.com/a/1024937/10194 for various
+  // platform-specific recipes.
 
-  std::shared_ptr<Schema> schema() const override { return schema_; }
+  boost::system::error_code ec;
 
-  Status ReadNext(std::shared_ptr<RecordBatch>* out) override {
-    if (position_ >= batches_.size()) {
-      *out = nullptr;
-    } else {
-      *out = batches_[position_++];
-    }
+#if defined(__linux__)
+  *out = fs::canonical("/proc/self/exe", ec);
+#elif defined(__APPLE__)
+  char buf[PATH_MAX + 1];
+  uint32_t bufsize = sizeof(buf);
+  if (_NSGetExecutablePath(buf, &bufsize) < 0) {
+    return Status::Invalid("Can't resolve current exe: path too large");
+  }
+  *out = fs::canonical(buf, ec);
+#else
+  ARROW_UNUSED(ec);
+  return Status::NotImplemented("Not available on this system");
+#endif
+  if (ec) {
+    // XXX fold this into the Status class?
+    return Status::IOError("Can't resolve current exe: ", ec.message());
+  } else {
     return Status::OK();
   }
+}
 
- private:
-  std::shared_ptr<Schema> schema_;
-  std::vector<std::shared_ptr<RecordBatch>> batches_;
-  size_t position_;
-};
-
-// ----------------------------------------------------------------------
-// Example data for test-server and unit tests
-
-using BatchVector = std::vector<std::shared_ptr<RecordBatch>>;
+}  // namespace
+
+void TestServer::Start() {
+  namespace fs = boost::filesystem;
+
+  std::string str_port = std::to_string(port_);
+  std::vector<fs::path> search_path = ::boost::this_process::path();
+  // If possible, prepend current executable directory to search path,
+  // since it's likely that the test server executable is located in
+  // the same directory as the running unit test.
+  fs::path current_exe;
+  Status st = ResolveCurrentExecutable(&current_exe);
+  if (st.ok()) {
+    search_path.insert(search_path.begin(), current_exe.parent_path());
+  } else if (st.IsNotImplemented()) {
+    ARROW_CHECK(st.IsNotImplemented()) << st.ToString();
+  }
 
-std::shared_ptr<Schema> ExampleSchema1() {
-  auto f0 = field("f0", int32());
-  auto f1 = field("f1", int32());
-  return ::arrow::schema({f0, f1});
+  try {
+    server_process_ = std::make_shared<bp::child>(
+        bp::search_path(executable_name_, search_path), "-port", str_port);
+  } catch (...) {
+    std::stringstream ss;
+    ss << "Failed to launch test server '" << executable_name_ << "', looked in ";
+    for (const auto& path : search_path) {
+      ss << path << " : ";
+    }
+    ARROW_LOG(FATAL) << ss.str();
+    throw;
+  }
+  std::cout << "Server running with pid " << server_process_->id() << std::endl;
 }
 
-std::shared_ptr<Schema> ExampleSchema2() {
-  auto f0 = field("f0", utf8());
-  auto f1 = field("f1", binary());
-  return ::arrow::schema({f0, f1});
+int TestServer::Stop() {
+  if (server_process_ && server_process_->valid()) {
+    kill(server_process_->id(), SIGTERM);
+    server_process_->wait();
+    return server_process_->exit_code();
+  } else {
+    // Presumably the server wasn't able to start
+    return -1;
+  }
 }
 
+bool TestServer::IsRunning() { return server_process_->running(); }
+
+int TestServer::port() const { return port_; }
+
 Status MakeFlightInfo(const Schema& schema, const FlightDescriptor& descriptor,
                       const std::vector<FlightEndpoint>& endpoints,
                       uint64_t total_records, uint64_t total_bytes,
diff --git a/cpp/src/arrow/flight/test-util.h b/cpp/src/arrow/flight/test-util.h
index 5dea008..f955e3d 100644
--- a/cpp/src/arrow/flight/test-util.h
+++ b/cpp/src/arrow/flight/test-util.h
@@ -16,23 +16,21 @@
 // under the License.
 
 #include <cstdint>
-#include <cstdio>
-#include <cstring>
-#include <iostream>
 #include <memory>
 #include <string>
 #include <vector>
 
-#include <boost/process.hpp>
-
-#include "arrow/ipc/test-common.h"
 #include "arrow/status.h"
-#include "arrow/testing/gtest_util.h"
 
-#include "arrow/flight/api.h"
-#include "arrow/flight/internal.h"
+#include "arrow/flight/types.h"
+
+namespace boost {
+namespace process {
+
+class child;
 
-namespace bp = boost::process;
+}  // namespace process
+}  // namespace boost
 
 namespace arrow {
 namespace flight {
@@ -40,32 +38,23 @@ namespace flight {
 // ----------------------------------------------------------------------
 // Fixture to use for running test servers
 
-struct TestServer {
+class ARROW_EXPORT TestServer {
  public:
   explicit TestServer(const std::string& executable_name, int port)
       : executable_name_(executable_name), port_(port) {}
 
-  void Start() {
-    std::string str_port = std::to_string(port_);
-    server_process_.reset(
-        new bp::child(bp::search_path(executable_name_), "-port", str_port));
-    std::cout << "Server running with pid " << server_process_->id() << std::endl;
-  }
+  void Start();
 
-  int Stop() {
-    kill(server_process_->id(), SIGTERM);
-    server_process_->wait();
-    return server_process_->exit_code();
-  }
+  int Stop();
 
-  bool IsRunning() { return server_process_->running(); }
+  bool IsRunning();
 
-  int port() const { return port_; }
+  int port() const;
 
  private:
   std::string executable_name_;
   int port_;
-  std::unique_ptr<bp::child> server_process_;
+  std::shared_ptr<::boost::process::child> server_process_;
 };
 
 // ----------------------------------------------------------------------
@@ -99,59 +88,32 @@ class BatchIterator : public RecordBatchReader {
 
 using BatchVector = std::vector<std::shared_ptr<RecordBatch>>;
 
-std::shared_ptr<Schema> ExampleSchema1() {
+inline std::shared_ptr<Schema> ExampleSchema1() {
   auto f0 = field("f0", int32());
   auto f1 = field("f1", int32());
   return ::arrow::schema({f0, f1});
 }
 
-std::shared_ptr<Schema> ExampleSchema2() {
+inline std::shared_ptr<Schema> ExampleSchema2() {
   auto f0 = field("f0", utf8());
   auto f1 = field("f1", binary());
   return ::arrow::schema({f0, f1});
 }
 
+ARROW_EXPORT
 Status MakeFlightInfo(const Schema& schema, const FlightDescriptor& descriptor,
                       const std::vector<FlightEndpoint>& endpoints,
                       uint64_t total_records, uint64_t total_bytes,
-                      FlightInfo::Data* out) {
-  out->descriptor = descriptor;
-  out->endpoints = endpoints;
-  out->total_records = total_records;
-  out->total_bytes = total_bytes;
-  return internal::SchemaToString(schema, &out->schema);
-}
+                      FlightInfo::Data* out);
 
-std::vector<FlightInfo> ExampleFlightInfo() {
-  FlightEndpoint endpoint1({{"ticket-id-1"}, {{"foo1.bar.com", 92385}}});
-  FlightEndpoint endpoint2({{"ticket-id-2"}, {{"foo2.bar.com", 92385}}});
-  FlightEndpoint endpoint3({{"ticket-id-3"}, {{"foo3.bar.com", 92385}}});
-  FlightDescriptor descr1{FlightDescriptor::PATH, "", {"foo", "bar"}};
-  FlightDescriptor descr2{FlightDescriptor::CMD, "my_command", {}};
-
-  auto schema1 = ExampleSchema1();
-  auto schema2 = ExampleSchema2();
-
-  FlightInfo::Data flight1, flight2;
-  EXPECT_OK(
-      MakeFlightInfo(*schema1, descr1, {endpoint1, endpoint2}, 1000, 100000, &flight1));
-  EXPECT_OK(MakeFlightInfo(*schema2, descr2, {endpoint3}, 1000, 100000, &flight2));
-  return {FlightInfo(flight1), FlightInfo(flight2)};
-}
+ARROW_EXPORT
+std::vector<FlightInfo> ExampleFlightInfo();
 
-Status SimpleIntegerBatches(const int num_batches, BatchVector* out) {
-  std::shared_ptr<RecordBatch> batch;
-  for (int i = 0; i < num_batches; ++i) {
-    // Make all different sizes, use different random seed
-    RETURN_NOT_OK(ipc::MakeIntBatchSized(10 + i, &batch, i));
-    out->push_back(batch);
-  }
-  return Status::OK();
-}
+ARROW_EXPORT
+Status SimpleIntegerBatches(const int num_batches, BatchVector* out);
 
-std::vector<ActionType> ExampleActionTypes() {
-  return {{"drop", "drop a dataset"}, {"cache", "cache a dataset"}};
-}
+ARROW_EXPORT
+std::vector<ActionType> ExampleActionTypes();
 
 }  // namespace flight
 }  // namespace arrow
diff --git a/cpp/src/arrow/gpu/cuda-benchmark.cc b/cpp/src/arrow/gpu/cuda-benchmark.cc
index 32b5f1f..a61eb92 100644
--- a/cpp/src/arrow/gpu/cuda-benchmark.cc
+++ b/cpp/src/arrow/gpu/cuda-benchmark.cc
@@ -24,6 +24,7 @@
 #include "arrow/array.h"
 #include "arrow/memory_pool.h"
 #include "arrow/testing/gtest_util.h"
+#include "arrow/testing/util.h"
 
 #include "arrow/gpu/cuda_api.h"
 
diff --git a/cpp/src/arrow/util/stopwatch.h b/cpp/src/arrow/util/stopwatch.h
index e90d0ba..db4e67f 100644
--- a/cpp/src/arrow/util/stopwatch.h
+++ b/cpp/src/arrow/util/stopwatch.h
@@ -17,34 +17,31 @@
 
 #pragma once
 
-#include <stdio.h>
-#ifndef _MSC_VER
-#include <sys/time.h>
-#endif
-
-#include <ctime>
-#include <iostream>
+#include <cassert>
+#include <chrono>
 
 namespace arrow {
 namespace internal {
 
-uint64_t CurrentTime() {
-  timespec time;
-  clock_gettime(CLOCK_MONOTONIC, &time);
-  return 1000000000L * time.tv_sec + time.tv_nsec;
-}
-
 class StopWatch {
+  // This clock should give us wall clock time
+  using ClockType = std::chrono::steady_clock;
+
  public:
   StopWatch() {}
 
-  void Start() { start_ = CurrentTime(); }
+  void Start() { start_ = ClockType::now(); }
 
   // Returns time in nanoseconds.
-  uint64_t Stop() { return CurrentTime() - start_; }
+  uint64_t Stop() {
+    auto stop = ClockType::now();
+    std::chrono::nanoseconds d = stop - start_;
+    assert(d.count() >= 0);
+    return static_cast<uint64_t>(d.count());
+  }
 
  private:
-  uint64_t start_;
+  std::chrono::time_point<ClockType> start_;
 };
 
 }  // namespace internal