You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/02/12 22:43:33 UTC
[arrow] branch master updated: ARROW-3292: [C++] Test Flight RPC in
Travis CI
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new af60c2e ARROW-3292: [C++] Test Flight RPC in Travis CI
af60c2e is described below
commit af60c2e730236cfbbd207e1c0bd655bbb230bae1
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Tue Feb 12 16:43:24 2019 -0600
ARROW-3292: [C++] Test Flight RPC in Travis CI
Enabled on Ubuntu 16.04 and macOS.
Author: Antoine Pitrou <an...@python.org>
Closes #3626 from pitrou/ARROW-3292-flight-travis-ci and squashes the following commits:
afd1fd31 <Antoine Pitrou> ARROW-3292: Test Flight RPC in Travis-CI
---
.travis.yml | 5 +-
ci/conda_env_cpp.yml | 2 +
ci/travis_before_script_cpp.sh | 4 +
cpp/cmake_modules/Findc-ares.cmake | 7 +-
cpp/src/arrow/flight/CMakeLists.txt | 51 ++++---
cpp/src/arrow/flight/client.cc | 5 +-
cpp/src/arrow/flight/client.h | 2 +-
cpp/src/arrow/flight/flight-test.cc | 3 +-
cpp/src/arrow/flight/perf-server.cc | 4 +-
cpp/src/arrow/flight/serialization-internal.h | 33 +----
cpp/src/arrow/flight/test-integration-client.cc | 4 +-
cpp/src/arrow/flight/test-integration-server.cc | 2 +
cpp/src/arrow/flight/{test-util.h => test-util.cc} | 151 +++++++++++----------
cpp/src/arrow/flight/test-util.h | 86 ++++--------
cpp/src/arrow/gpu/cuda-benchmark.cc | 1 +
cpp/src/arrow/util/stopwatch.h | 29 ++--
16 files changed, 184 insertions(+), 205 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index 2873dc0..60c4446 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -62,6 +62,7 @@ matrix:
env:
- ARROW_TRAVIS_VALGRIND=1
- ARROW_TRAVIS_USE_TOOLCHAIN=1
+ - ARROW_TRAVIS_FLIGHT=1
- ARROW_TRAVIS_PLASMA=1
- ARROW_TRAVIS_ORC=1
- ARROW_TRAVIS_PARQUET=1
@@ -86,10 +87,11 @@ matrix:
os: linux
jdk: openjdk8
env:
+ - ARROW_TRAVIS_COVERAGE=1
- ARROW_TRAVIS_USE_TOOLCHAIN=1
+ - ARROW_TRAVIS_FLIGHT=1
- ARROW_TRAVIS_PLASMA=1
- ARROW_TRAVIS_ORC=1
- - ARROW_TRAVIS_COVERAGE=1
- ARROW_TRAVIS_PARQUET=1
- ARROW_TRAVIS_GANDIVA=1
- ARROW_TRAVIS_GANDIVA_JAVA=1
@@ -181,6 +183,7 @@ matrix:
env:
- ARROW_TRAVIS_USE_TOOLCHAIN=1
- ARROW_TRAVIS_PLASMA=1
+ - ARROW_TRAVIS_FLIGHT=1
- ARROW_TRAVIS_ORC=1
- ARROW_TRAVIS_PARQUET=1
- ARROW_TRAVIS_GANDIVA=1
diff --git a/ci/conda_env_cpp.yml b/ci/conda_env_cpp.yml
index 3d50b21..fbe4576 100644
--- a/ci/conda_env_cpp.yml
+++ b/ci/conda_env_cpp.yml
@@ -20,12 +20,14 @@
boost-cpp=1.68.0
brotli
bzip2
+c-ares
cmake
double-conversion
flatbuffers
gflags
glog
gmock
+grpc-cpp
gtest
libprotobuf
lz4-c
diff --git a/ci/travis_before_script_cpp.sh b/ci/travis_before_script_cpp.sh
index 6c65cd6..a1e407f 100755
--- a/ci/travis_before_script_cpp.sh
+++ b/ci/travis_before_script_cpp.sh
@@ -86,6 +86,10 @@ if [ "$ARROW_TRAVIS_USE_TOOLCHAIN" == "1" ]; then
fi
fi
+if [ "$ARROW_TRAVIS_FLIGHT" == "1" ]; then
+ CMAKE_COMMON_FLAGS="$CMAKE_COMMON_FLAGS -DARROW_FLIGHT=ON"
+fi
+
if [ "$ARROW_TRAVIS_PLASMA" == "1" ]; then
CMAKE_COMMON_FLAGS="$CMAKE_COMMON_FLAGS -DARROW_PLASMA=ON"
fi
diff --git a/cpp/cmake_modules/Findc-ares.cmake b/cpp/cmake_modules/Findc-ares.cmake
index 1366ce3..2b7b589 100644
--- a/cpp/cmake_modules/Findc-ares.cmake
+++ b/cpp/cmake_modules/Findc-ares.cmake
@@ -42,8 +42,9 @@ if (MSVC)
else ()
set(CARES_LIB_NAME
${CMAKE_SHARED_LIBRARY_PREFIX}cares${CMAKE_SHARED_LIBRARY_SUFFIX})
- set(CARES_STATIC_LIB_NAME
- ${CMAKE_STATIC_LIBRARY_PREFIX}cares${CMAKE_STATIC_LIBRARY_SUFFIX})
+ set(CARES_STATIC_LIB_NAMES
+ ${CMAKE_STATIC_LIBRARY_PREFIX}cares${CMAKE_STATIC_LIBRARY_SUFFIX}
+ ${CMAKE_STATIC_LIBRARY_PREFIX}cares_static${CMAKE_STATIC_LIBRARY_SUFFIX})
endif ()
# Try the parameterized roots, if they exist
@@ -56,7 +57,7 @@ if (_cares_roots)
PATHS ${_cares_roots} NO_DEFAULT_PATH
PATH_SUFFIXES "lib")
find_library(CARES_STATIC_LIB
- NAMES ${CARES_STATIC_LIB_NAME}
+ NAMES ${CARES_STATIC_LIB_NAMES}
PATHS ${_cares_roots} NO_DEFAULT_PATH
PATH_SUFFIXES "lib")
else ()
diff --git a/cpp/src/arrow/flight/CMakeLists.txt b/cpp/src/arrow/flight/CMakeLists.txt
index 3e0323f..a32a5fa 100644
--- a/cpp/src/arrow/flight/CMakeLists.txt
+++ b/cpp/src/arrow/flight/CMakeLists.txt
@@ -28,12 +28,11 @@ set(ARROW_FLIGHT_STATIC_LINK_LIBS
grpc_address_sorting_static
cares_static)
-set(ARROW_FLIGHT_TEST_STATIC_LINK_LIBS
- arrow_static
- arrow_flight_static
- arrow_testing_static
- ${ARROW_FLIGHT_STATIC_LINK_LIBS}
- ${PROTOBUF_LIBRARY})
+set(ARROW_FLIGHT_TEST_LINK_LIBS
+ arrow_flight_shared
+ arrow_flight_testing_shared
+ ${ARROW_TEST_SHARED_LINK_LIBS}
+ ${ARROW_FLIGHT_STATIC_LINK_LIBS})
# TODO(wesm): Protobuf shared vs static linking
@@ -78,25 +77,43 @@ add_arrow_lib(arrow_flight
arrow_static
${ARROW_FLIGHT_STATIC_LINK_LIBS})
+# Define arrow_flight_testing library
+if(ARROW_BUILD_TESTS OR ARROW_BUILD_BENCHMARKS)
+ add_arrow_lib(arrow_flight_testing
+ SOURCES
+ test-util.cc
+ DEPENDENCIES
+ ${GTEST_LIBRARY}
+ SHARED_LINK_LIBS
+ arrow_shared
+ arrow_flight_shared
+ ${BOOST_FILESYSTEM_LIBRARY}
+ ${BOOST_SYSTEM_LIBRARY}
+ ${GTEST_LIBRARY}
+ STATIC_LINK_LIBS
+ arrow_static
+ arrow_flight_static)
+endif()
+
add_arrow_test(flight-test
EXTRA_LINK_LIBS
- ${ARROW_FLIGHT_TEST_STATIC_LINK_LIBS}
+ ${ARROW_FLIGHT_TEST_LINK_LIBS}
LABELS
"arrow_flight")
# Build test server for unit tests or benchmarks
if(ARROW_BUILD_TESTS OR ARROW_BUILD_BENCHMARKS)
add_executable(flight-test-server test-server.cc)
- target_link_libraries(flight-test-server ${ARROW_FLIGHT_TEST_STATIC_LINK_LIBS}
- gflags_static ${GTEST_LIBRARY})
+ target_link_libraries(flight-test-server ${ARROW_FLIGHT_TEST_LINK_LIBS} gflags_static
+ ${GTEST_LIBRARY})
add_executable(flight-test-integration-server test-integration-server.cc)
- target_link_libraries(flight-test-integration-server
- ${ARROW_FLIGHT_TEST_STATIC_LINK_LIBS} gflags_static gtest_static)
+ target_link_libraries(flight-test-integration-server ${ARROW_FLIGHT_TEST_LINK_LIBS}
+ gflags_static gtest_static)
add_executable(flight-test-integration-client test-integration-client.cc)
- target_link_libraries(flight-test-integration-client
- ${ARROW_FLIGHT_TEST_STATIC_LINK_LIBS} gflags_static gtest_static)
+ target_link_libraries(flight-test-integration-client ${ARROW_FLIGHT_TEST_LINK_LIBS}
+ gflags_static gtest_static)
# This is needed for the unit tests
if(ARROW_BUILD_TESTS)
@@ -116,9 +133,9 @@ if(ARROW_BUILD_BENCHMARKS)
add_executable(flight-perf-server perf-server.cc perf.pb.cc)
target_link_libraries(flight-perf-server
- arrow_flight_static
- arrow_testing_static
- ${ARROW_FLIGHT_STATIC_LINK_LIBS}
+ arrow_flight_shared
+ arrow_flight_testing_shared
+ ${ARROW_FLIGHT_TEST_LINK_LIBS}
gflags_static
${GTEST_LIBRARY})
@@ -126,7 +143,7 @@ if(ARROW_BUILD_BENCHMARKS)
target_link_libraries(flight-benchmark
arrow_flight_static
arrow_testing_static
- ${ARROW_FLIGHT_STATIC_LINK_LIBS}
+ ${ARROW_FLIGHT_TEST_LINK_LIBS}
gflags_static
${GTEST_LIBRARY})
diff --git a/cpp/src/arrow/flight/client.cc b/cpp/src/arrow/flight/client.cc
index a58c2b5..fd13f79 100644
--- a/cpp/src/arrow/flight/client.cc
+++ b/cpp/src/arrow/flight/client.cc
@@ -196,8 +196,11 @@ class FlightClient::FlightClientImpl {
ss << host << ":" << port;
std::string uri = ss.str();
+ grpc::ChannelArguments args;
+ // Try to reconnect quickly at first, in case the server is still starting up
+ args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, 100);
stub_ = pb::FlightService::NewStub(
- grpc::CreateChannel(ss.str(), grpc::InsecureChannelCredentials()));
+ grpc::CreateCustomChannel(ss.str(), grpc::InsecureChannelCredentials(), args));
return Status::OK();
}
diff --git a/cpp/src/arrow/flight/client.h b/cpp/src/arrow/flight/client.h
index ef96041..334158d 100644
--- a/cpp/src/arrow/flight/client.h
+++ b/cpp/src/arrow/flight/client.h
@@ -116,7 +116,7 @@ class ARROW_EXPORT FlightClient {
/// \brief An interface to upload record batches to a Flight server
class ARROW_EXPORT FlightPutWriter : public ipc::RecordBatchWriter {
public:
- ~FlightPutWriter();
+ ~FlightPutWriter() override;
Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override;
Status Close() override;
diff --git a/cpp/src/arrow/flight/flight-test.cc b/cpp/src/arrow/flight/flight-test.cc
index 12fe503..9268aec 100644
--- a/cpp/src/arrow/flight/flight-test.cc
+++ b/cpp/src/arrow/flight/flight-test.cc
@@ -28,10 +28,10 @@
#include <iostream>
#include <memory>
#include <string>
+#include <thread>
#include <vector>
#include <gtest/gtest.h>
-#include <boost/process.hpp>
#include "arrow/ipc/test-common.h"
#include "arrow/status.h"
@@ -62,6 +62,7 @@ TEST(TestFlight, StartStopTestServer) {
ASSERT_TRUE(server.IsRunning());
int exit_code = server.Stop();
ASSERT_EQ(0, exit_code);
+ ASSERT_FALSE(server.IsRunning());
}
// ----------------------------------------------------------------------
diff --git a/cpp/src/arrow/flight/perf-server.cc b/cpp/src/arrow/flight/perf-server.cc
index add5442..b470283 100644
--- a/cpp/src/arrow/flight/perf-server.cc
+++ b/cpp/src/arrow/flight/perf-server.cc
@@ -29,9 +29,11 @@
#include "arrow/io/test-common.h"
#include "arrow/ipc/writer.h"
#include "arrow/record_batch.h"
+#include "arrow/testing/util.h"
+#include "arrow/flight/api.h"
+#include "arrow/flight/internal.h"
#include "arrow/flight/perf.pb.h"
-#include "arrow/flight/server.h"
#include "arrow/flight/test-util.h"
DEFINE_int32(port, 31337, "Server port to listen on");
diff --git a/cpp/src/arrow/flight/serialization-internal.h b/cpp/src/arrow/flight/serialization-internal.h
index d4254d6..06cdcdf 100644
--- a/cpp/src/arrow/flight/serialization-internal.h
+++ b/cpp/src/arrow/flight/serialization-internal.h
@@ -25,6 +25,7 @@
#include "google/protobuf/io/coded_stream.h"
#include "google/protobuf/io/zero_copy_stream.h"
+#include "google/protobuf/io/zero_copy_stream_impl_lite.h"
#include "google/protobuf/wire_format_lite.h"
#include "grpc/byte_buffer_reader.h"
#include "grpcpp/grpcpp.h"
@@ -66,33 +67,6 @@ namespace internal {
using google::protobuf::io::CodedInputStream;
using google::protobuf::io::CodedOutputStream;
-// More efficient writing of FlightData to gRPC output buffer
-// Implementation of ZeroCopyOutputStream that writes to a fixed-size buffer
-class FixedSizeProtoWriter : public ::google::protobuf::io::ZeroCopyOutputStream {
- public:
- explicit FixedSizeProtoWriter(grpc_slice slice)
- : slice_(slice),
- bytes_written_(0),
- total_size_(static_cast<int>(GRPC_SLICE_LENGTH(slice))) {}
-
- bool Next(void** data, int* size) override {
- // Consume the whole slice
- *data = GRPC_SLICE_START_PTR(slice_) + bytes_written_;
- *size = total_size_ - bytes_written_;
- bytes_written_ = total_size_;
- return true;
- }
-
- void BackUp(int count) override { bytes_written_ -= count; }
-
- int64_t ByteCount() const override { return bytes_written_; }
-
- private:
- grpc_slice slice_;
- int bytes_written_;
- int total_size_;
-};
-
bool ReadBytesZeroCopy(const std::shared_ptr<arrow::Buffer>& source_data,
CodedInputStream* input, std::shared_ptr<arrow::Buffer>* out);
@@ -159,11 +133,11 @@ class GrpcBuffer : public arrow::MutableBuffer {
namespace grpc {
using arrow::flight::FlightData;
-using arrow::flight::internal::FixedSizeProtoWriter;
using arrow::flight::internal::GrpcBuffer;
using arrow::flight::internal::ReadBytesZeroCopy;
using google::protobuf::internal::WireFormatLite;
+using google::protobuf::io::ArrayOutputStream;
using google::protobuf::io::CodedInputStream;
using google::protobuf::io::CodedOutputStream;
@@ -298,7 +272,8 @@ class SerializationTraits<IpcPayload> {
// XXX(wesm): for debugging
// std::cout << "Writing record batch with total size " << total_size << std::endl;
- FixedSizeProtoWriter writer(*reinterpret_cast<grpc_slice*>(&slice));
+ ArrayOutputStream writer(const_cast<uint8_t*>(slice.begin()),
+ static_cast<int>(slice.size()));
CodedOutputStream pb_stream(&writer);
// Write header
diff --git a/cpp/src/arrow/flight/test-integration-client.cc b/cpp/src/arrow/flight/test-integration-client.cc
index 6252283..89ae88c 100644
--- a/cpp/src/arrow/flight/test-integration-client.cc
+++ b/cpp/src/arrow/flight/test-integration-client.cc
@@ -29,10 +29,12 @@
#include "arrow/io/test-common.h"
#include "arrow/ipc/json.h"
+#include "arrow/ipc/writer.h"
#include "arrow/record_batch.h"
#include "arrow/table.h"
+#include "arrow/util/logging.h"
-#include "arrow/flight/server.h"
+#include "arrow/flight/api.h"
#include "arrow/flight/test-util.h"
DEFINE_string(host, "localhost", "Server port to connect to");
diff --git a/cpp/src/arrow/flight/test-integration-server.cc b/cpp/src/arrow/flight/test-integration-server.cc
index 7e201a0..0381e90 100644
--- a/cpp/src/arrow/flight/test-integration-server.cc
+++ b/cpp/src/arrow/flight/test-integration-server.cc
@@ -28,7 +28,9 @@
#include "arrow/ipc/json.h"
#include "arrow/record_batch.h"
#include "arrow/table.h"
+#include "arrow/util/logging.h"
+#include "arrow/flight/internal.h"
#include "arrow/flight/server.h"
#include "arrow/flight/test-util.h"
diff --git a/cpp/src/arrow/flight/test-util.h b/cpp/src/arrow/flight/test-util.cc
similarity index 55%
copy from cpp/src/arrow/flight/test-util.h
copy to cpp/src/arrow/flight/test-util.cc
index 5dea008..7b8a7f3 100644
--- a/cpp/src/arrow/flight/test-util.h
+++ b/cpp/src/arrow/flight/test-util.cc
@@ -15,102 +15,109 @@
// specific language governing permissions and limitations
// under the License.
-#include <cstdint>
-#include <cstdio>
-#include <cstring>
-#include <iostream>
-#include <memory>
-#include <string>
-#include <vector>
+#ifdef __APPLE__
+#include <limits.h>
+#include <mach-o/dyld.h>
+#endif
+#include <sstream>
+
+#include <boost/filesystem.hpp>
#include <boost/process.hpp>
+#include <gtest/gtest.h>
+
#include "arrow/ipc/test-common.h"
-#include "arrow/status.h"
#include "arrow/testing/gtest_util.h"
+#include "arrow/util/logging.h"
#include "arrow/flight/api.h"
#include "arrow/flight/internal.h"
-
-namespace bp = boost::process;
+#include "arrow/flight/test-util.h"
namespace arrow {
namespace flight {
-// ----------------------------------------------------------------------
-// Fixture to use for running test servers
-
-struct TestServer {
- public:
- explicit TestServer(const std::string& executable_name, int port)
- : executable_name_(executable_name), port_(port) {}
-
- void Start() {
- std::string str_port = std::to_string(port_);
- server_process_.reset(
- new bp::child(bp::search_path(executable_name_), "-port", str_port));
- std::cout << "Server running with pid " << server_process_->id() << std::endl;
- }
-
- int Stop() {
- kill(server_process_->id(), SIGTERM);
- server_process_->wait();
- return server_process_->exit_code();
- }
-
- bool IsRunning() { return server_process_->running(); }
-
- int port() const { return port_; }
-
- private:
- std::string executable_name_;
- int port_;
- std::unique_ptr<bp::child> server_process_;
-};
+namespace bp = boost::process;
+namespace fs = boost::filesystem;
-// ----------------------------------------------------------------------
-// A RecordBatchReader for serving a sequence of in-memory record batches
+namespace {
-class BatchIterator : public RecordBatchReader {
- public:
- BatchIterator(const std::shared_ptr<Schema>& schema,
- const std::vector<std::shared_ptr<RecordBatch>>& batches)
- : schema_(schema), batches_(batches), position_(0) {}
+Status ResolveCurrentExecutable(fs::path* out) {
+ // See https://stackoverflow.com/a/1024937/10194 for various
+ // platform-specific recipes.
- std::shared_ptr<Schema> schema() const override { return schema_; }
+ boost::system::error_code ec;
- Status ReadNext(std::shared_ptr<RecordBatch>* out) override {
- if (position_ >= batches_.size()) {
- *out = nullptr;
- } else {
- *out = batches_[position_++];
- }
+#if defined(__linux__)
+ *out = fs::canonical("/proc/self/exe", ec);
+#elif defined(__APPLE__)
+ char buf[PATH_MAX + 1];
+ uint32_t bufsize = sizeof(buf);
+ if (_NSGetExecutablePath(buf, &bufsize) < 0) {
+ return Status::Invalid("Can't resolve current exe: path too large");
+ }
+ *out = fs::canonical(buf, ec);
+#else
+ ARROW_UNUSED(ec);
+ return Status::NotImplemented("Not available on this system");
+#endif
+ if (ec) {
+ // XXX fold this into the Status class?
+ return Status::IOError("Can't resolve current exe: ", ec.message());
+ } else {
return Status::OK();
}
+}
- private:
- std::shared_ptr<Schema> schema_;
- std::vector<std::shared_ptr<RecordBatch>> batches_;
- size_t position_;
-};
-
-// ----------------------------------------------------------------------
-// Example data for test-server and unit tests
-
-using BatchVector = std::vector<std::shared_ptr<RecordBatch>>;
+} // namespace
+
+void TestServer::Start() {
+ namespace fs = boost::filesystem;
+
+ std::string str_port = std::to_string(port_);
+ std::vector<fs::path> search_path = ::boost::this_process::path();
+ // If possible, prepend current executable directory to search path,
+ // since it's likely that the test server executable is located in
+ // the same directory as the running unit test.
+ fs::path current_exe;
+ Status st = ResolveCurrentExecutable(¤t_exe);
+ if (st.ok()) {
+ search_path.insert(search_path.begin(), current_exe.parent_path());
+ } else if (st.IsNotImplemented()) {
+ ARROW_CHECK(st.IsNotImplemented()) << st.ToString();
+ }
-std::shared_ptr<Schema> ExampleSchema1() {
- auto f0 = field("f0", int32());
- auto f1 = field("f1", int32());
- return ::arrow::schema({f0, f1});
+ try {
+ server_process_ = std::make_shared<bp::child>(
+ bp::search_path(executable_name_, search_path), "-port", str_port);
+ } catch (...) {
+ std::stringstream ss;
+ ss << "Failed to launch test server '" << executable_name_ << "', looked in ";
+ for (const auto& path : search_path) {
+ ss << path << " : ";
+ }
+ ARROW_LOG(FATAL) << ss.str();
+ throw;
+ }
+ std::cout << "Server running with pid " << server_process_->id() << std::endl;
}
-std::shared_ptr<Schema> ExampleSchema2() {
- auto f0 = field("f0", utf8());
- auto f1 = field("f1", binary());
- return ::arrow::schema({f0, f1});
+int TestServer::Stop() {
+ if (server_process_ && server_process_->valid()) {
+ kill(server_process_->id(), SIGTERM);
+ server_process_->wait();
+ return server_process_->exit_code();
+ } else {
+ // Presumably the server wasn't able to start
+ return -1;
+ }
}
+bool TestServer::IsRunning() { return server_process_->running(); }
+
+int TestServer::port() const { return port_; }
+
Status MakeFlightInfo(const Schema& schema, const FlightDescriptor& descriptor,
const std::vector<FlightEndpoint>& endpoints,
uint64_t total_records, uint64_t total_bytes,
diff --git a/cpp/src/arrow/flight/test-util.h b/cpp/src/arrow/flight/test-util.h
index 5dea008..f955e3d 100644
--- a/cpp/src/arrow/flight/test-util.h
+++ b/cpp/src/arrow/flight/test-util.h
@@ -16,23 +16,21 @@
// under the License.
#include <cstdint>
-#include <cstdio>
-#include <cstring>
-#include <iostream>
#include <memory>
#include <string>
#include <vector>
-#include <boost/process.hpp>
-
-#include "arrow/ipc/test-common.h"
#include "arrow/status.h"
-#include "arrow/testing/gtest_util.h"
-#include "arrow/flight/api.h"
-#include "arrow/flight/internal.h"
+#include "arrow/flight/types.h"
+
+namespace boost {
+namespace process {
+
+class child;
-namespace bp = boost::process;
+} // namespace process
+} // namespace boost
namespace arrow {
namespace flight {
@@ -40,32 +38,23 @@ namespace flight {
// ----------------------------------------------------------------------
// Fixture to use for running test servers
-struct TestServer {
+class ARROW_EXPORT TestServer {
public:
explicit TestServer(const std::string& executable_name, int port)
: executable_name_(executable_name), port_(port) {}
- void Start() {
- std::string str_port = std::to_string(port_);
- server_process_.reset(
- new bp::child(bp::search_path(executable_name_), "-port", str_port));
- std::cout << "Server running with pid " << server_process_->id() << std::endl;
- }
+ void Start();
- int Stop() {
- kill(server_process_->id(), SIGTERM);
- server_process_->wait();
- return server_process_->exit_code();
- }
+ int Stop();
- bool IsRunning() { return server_process_->running(); }
+ bool IsRunning();
- int port() const { return port_; }
+ int port() const;
private:
std::string executable_name_;
int port_;
- std::unique_ptr<bp::child> server_process_;
+ std::shared_ptr<::boost::process::child> server_process_;
};
// ----------------------------------------------------------------------
@@ -99,59 +88,32 @@ class BatchIterator : public RecordBatchReader {
using BatchVector = std::vector<std::shared_ptr<RecordBatch>>;
-std::shared_ptr<Schema> ExampleSchema1() {
+inline std::shared_ptr<Schema> ExampleSchema1() {
auto f0 = field("f0", int32());
auto f1 = field("f1", int32());
return ::arrow::schema({f0, f1});
}
-std::shared_ptr<Schema> ExampleSchema2() {
+inline std::shared_ptr<Schema> ExampleSchema2() {
auto f0 = field("f0", utf8());
auto f1 = field("f1", binary());
return ::arrow::schema({f0, f1});
}
+ARROW_EXPORT
Status MakeFlightInfo(const Schema& schema, const FlightDescriptor& descriptor,
const std::vector<FlightEndpoint>& endpoints,
uint64_t total_records, uint64_t total_bytes,
- FlightInfo::Data* out) {
- out->descriptor = descriptor;
- out->endpoints = endpoints;
- out->total_records = total_records;
- out->total_bytes = total_bytes;
- return internal::SchemaToString(schema, &out->schema);
-}
+ FlightInfo::Data* out);
-std::vector<FlightInfo> ExampleFlightInfo() {
- FlightEndpoint endpoint1({{"ticket-id-1"}, {{"foo1.bar.com", 92385}}});
- FlightEndpoint endpoint2({{"ticket-id-2"}, {{"foo2.bar.com", 92385}}});
- FlightEndpoint endpoint3({{"ticket-id-3"}, {{"foo3.bar.com", 92385}}});
- FlightDescriptor descr1{FlightDescriptor::PATH, "", {"foo", "bar"}};
- FlightDescriptor descr2{FlightDescriptor::CMD, "my_command", {}};
-
- auto schema1 = ExampleSchema1();
- auto schema2 = ExampleSchema2();
-
- FlightInfo::Data flight1, flight2;
- EXPECT_OK(
- MakeFlightInfo(*schema1, descr1, {endpoint1, endpoint2}, 1000, 100000, &flight1));
- EXPECT_OK(MakeFlightInfo(*schema2, descr2, {endpoint3}, 1000, 100000, &flight2));
- return {FlightInfo(flight1), FlightInfo(flight2)};
-}
+ARROW_EXPORT
+std::vector<FlightInfo> ExampleFlightInfo();
-Status SimpleIntegerBatches(const int num_batches, BatchVector* out) {
- std::shared_ptr<RecordBatch> batch;
- for (int i = 0; i < num_batches; ++i) {
- // Make all different sizes, use different random seed
- RETURN_NOT_OK(ipc::MakeIntBatchSized(10 + i, &batch, i));
- out->push_back(batch);
- }
- return Status::OK();
-}
+ARROW_EXPORT
+Status SimpleIntegerBatches(const int num_batches, BatchVector* out);
-std::vector<ActionType> ExampleActionTypes() {
- return {{"drop", "drop a dataset"}, {"cache", "cache a dataset"}};
-}
+ARROW_EXPORT
+std::vector<ActionType> ExampleActionTypes();
} // namespace flight
} // namespace arrow
diff --git a/cpp/src/arrow/gpu/cuda-benchmark.cc b/cpp/src/arrow/gpu/cuda-benchmark.cc
index 32b5f1f..a61eb92 100644
--- a/cpp/src/arrow/gpu/cuda-benchmark.cc
+++ b/cpp/src/arrow/gpu/cuda-benchmark.cc
@@ -24,6 +24,7 @@
#include "arrow/array.h"
#include "arrow/memory_pool.h"
#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/util.h"
#include "arrow/gpu/cuda_api.h"
diff --git a/cpp/src/arrow/util/stopwatch.h b/cpp/src/arrow/util/stopwatch.h
index e90d0ba..db4e67f 100644
--- a/cpp/src/arrow/util/stopwatch.h
+++ b/cpp/src/arrow/util/stopwatch.h
@@ -17,34 +17,31 @@
#pragma once
-#include <stdio.h>
-#ifndef _MSC_VER
-#include <sys/time.h>
-#endif
-
-#include <ctime>
-#include <iostream>
+#include <cassert>
+#include <chrono>
namespace arrow {
namespace internal {
-uint64_t CurrentTime() {
- timespec time;
- clock_gettime(CLOCK_MONOTONIC, &time);
- return 1000000000L * time.tv_sec + time.tv_nsec;
-}
-
class StopWatch {
+ // This clock should give us wall clock time
+ using ClockType = std::chrono::steady_clock;
+
public:
StopWatch() {}
- void Start() { start_ = CurrentTime(); }
+ void Start() { start_ = ClockType::now(); }
// Returns time in nanoseconds.
- uint64_t Stop() { return CurrentTime() - start_; }
+ uint64_t Stop() {
+ auto stop = ClockType::now();
+ std::chrono::nanoseconds d = stop - start_;
+ assert(d.count() >= 0);
+ return static_cast<uint64_t>(d.count());
+ }
private:
- uint64_t start_;
+ std::chrono::time_point<ClockType> start_;
};
} // namespace internal