You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/05/31 15:29:25 UTC
[arrow] branch master updated: ARROW-3294: [C++][Flight] Support
Flight on Windows
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new dbeab70 ARROW-3294: [C++][Flight] Support Flight on Windows
dbeab70 is described below
commit dbeab70863c4d0cd3da800f18f7e474da624bb5f
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Fri May 31 10:29:18 2019 -0500
ARROW-3294: [C++][Flight] Support Flight on Windows
Author: Antoine Pitrou <an...@python.org>
Closes #4410 from pitrou/ARROW-3294-flight-windows and squashes the following commits:
bd4979b33 <Antoine Pitrou> ARROW-3294: Support Flight on Windows
---
appveyor.yml | 1 +
ci/appveyor-cpp-setup.bat | 9 +++
ci/cpp-msvc-build-main.bat | 9 ++-
cpp/cmake_modules/ThirdpartyToolchain.cmake | 9 +++
cpp/src/arrow/flight/CMakeLists.txt | 20 +++++-
cpp/src/arrow/flight/client.cc | 4 +-
cpp/src/arrow/flight/client.h | 8 +--
cpp/src/arrow/flight/client_auth.h | 8 +--
cpp/src/arrow/flight/customize_protobuf.h | 12 ++++
cpp/src/arrow/flight/flight-test.cc | 32 ++++++---
cpp/src/arrow/flight/internal.cc | 2 +-
cpp/src/arrow/flight/internal.h | 13 ++--
.../windows_compatibility.h => flight/platform.h} | 23 +++----
cpp/src/arrow/flight/protocol-internal.h | 3 +
cpp/src/arrow/flight/serialization-internal.cc | 3 +-
cpp/src/arrow/flight/server.cc | 68 +++++++++++--------
cpp/src/arrow/flight/server.h | 30 ++++++---
cpp/src/arrow/flight/server_auth.h | 10 +--
cpp/src/arrow/flight/test-util.cc | 17 +++++
cpp/src/arrow/flight/test-util.h | 49 ++++++++++----
cpp/src/arrow/flight/types.h | 32 ++++-----
.../visibility.h} | 37 +++++++----
cpp/src/arrow/io/mman.h | 7 --
cpp/src/arrow/python/flight.cc | 11 ++--
cpp/src/arrow/util/io-util.cc | 76 ++++++++++++++++++++++
cpp/src/arrow/util/io-util.h | 39 +++++++++++
cpp/src/arrow/util/windows_compatibility.h | 11 +++-
python/CMakeLists.txt | 2 +-
python/pyarrow/tests/test_flight.py | 7 +-
29 files changed, 411 insertions(+), 141 deletions(-)
diff --git a/appveyor.yml b/appveyor.yml
index 74c68df..3e0e645 100644
--- a/appveyor.yml
+++ b/appveyor.yml
@@ -65,6 +65,7 @@ environment:
- JOB: "Toolchain"
GENERATOR: Visual Studio 14 2015 Win64
CONFIGURATION: "Release"
+ ARROW_BUILD_FLIGHT: "ON"
ARROW_BUILD_GANDIVA: "ON"
# NOTE: Since ARROW-5403 we have disabled the static CRT build
# - JOB: "Static_Crt_Build"
diff --git a/ci/appveyor-cpp-setup.bat b/ci/appveyor-cpp-setup.bat
index 0f5868a..aa29498 100644
--- a/ci/appveyor-cpp-setup.bat
+++ b/ci/appveyor-cpp-setup.bat
@@ -17,6 +17,15 @@
@echo on
+@rem Avoid picking up AppVeyor-installed OpenSSL (linker errors with gRPC)
+@rem XXX Perhaps there is a smarter way of solving this issue?
+rd /s /q C:\OpenSSL-Win32
+rd /s /q C:\OpenSSL-Win64
+rd /s /q C:\OpenSSL-v11-Win32
+rd /s /q C:\OpenSSL-v11-Win64
+rd /s /q C:\OpenSSL-v111-Win32
+rd /s /q C:\OpenSSL-v111-Win64
+
conda update -y -q conda
conda config --set auto_update_conda false
conda info -a
diff --git a/ci/cpp-msvc-build-main.bat b/ci/cpp-msvc-build-main.bat
index b3ac9bb..80a4cca 100644
--- a/ci/cpp-msvc-build-main.bat
+++ b/ci/cpp-msvc-build-main.bat
@@ -61,6 +61,7 @@ cmake -G "%GENERATOR%" %CMAKE_ARGS% ^
-DARROW_VERBOSE_THIRDPARTY_BUILD=ON ^
-DARROW_CXXFLAGS="%ARROW_CXXFLAGS%" ^
-DCMAKE_CXX_FLAGS_RELEASE="/MD %CMAKE_CXX_FLAGS_RELEASE%" ^
+ -DARROW_FLIGHT=%ARROW_BUILD_FLIGHT% ^
-DARROW_GANDIVA=%ARROW_BUILD_GANDIVA% ^
-DARROW_PARQUET=ON ^
-DPARQUET_BUILD_EXECUTABLES=ON ^
@@ -87,10 +88,16 @@ pip install -r requirements.txt pickle5
set PYARROW_CXXFLAGS=%ARROW_CXXFLAGS%
set PYARROW_CMAKE_GENERATOR=%GENERATOR%
-set PYARROW_BUNDLE_ARROW_CPP=ON
+if "%ARROW_BUILD_FLIGHT%" == "ON" (
+ @rem ARROW-5441: bundling Arrow Flight libraries not implemented
+ set PYARROW_BUNDLE_ARROW_CPP=OFF
+) else (
+ set PYARROW_BUNDLE_ARROW_CPP=ON
+)
set PYARROW_BUNDLE_BOOST=OFF
set PYARROW_WITH_STATIC_BOOST=ON
set PYARROW_WITH_PARQUET=ON
+set PYARROW_WITH_FLIGHT=%ARROW_BUILD_FLIGHT%
set PYARROW_WITH_GANDIVA=%ARROW_BUILD_GANDIVA%
set PYARROW_PARALLEL=2
diff --git a/cpp/cmake_modules/ThirdpartyToolchain.cmake b/cpp/cmake_modules/ThirdpartyToolchain.cmake
index 33405c5..5f249c3 100644
--- a/cpp/cmake_modules/ThirdpartyToolchain.cmake
+++ b/cpp/cmake_modules/ThirdpartyToolchain.cmake
@@ -1075,6 +1075,10 @@ endmacro()
if(ARROW_WITH_PROTOBUF)
resolve_dependency(Protobuf)
+ if(ARROW_PROTOBUF_USE_SHARED AND MSVC)
+ add_definitions(-DPROTOBUF_USE_DLLS)
+ endif()
+
# TODO: Don't use global includes but rather target_include_directories
include_directories(SYSTEM ${PROTOBUF_INCLUDE_DIR})
@@ -1791,6 +1795,8 @@ macro(build_cares)
CMAKE_ARGS ${CARES_CMAKE_ARGS}
BUILD_BYPRODUCTS "${CARES_STATIC_LIB}")
+ file(MAKE_DIRECTORY ${CARES_INCLUDE_DIR})
+
add_dependencies(toolchain cares_ep)
add_library(c-ares::cares STATIC IMPORTED)
set_target_properties(c-ares::cares
@@ -1918,6 +1924,9 @@ macro(build_grpc)
CMAKE_ARGS ${GRPC_CMAKE_ARGS} ${EP_LOG_OPTIONS}
DEPENDS ${grpc_dependencies})
+ # Work around https://gitlab.kitware.com/cmake/cmake/issues/15052
+ file(MAKE_DIRECTORY ${GRPC_INCLUDE_DIR})
+
add_library(gRPC::gpr STATIC IMPORTED)
set_target_properties(gRPC::gpr
PROPERTIES IMPORTED_LOCATION "${GRPC_STATIC_LIBRARY_GPR}"
diff --git a/cpp/src/arrow/flight/CMakeLists.txt b/cpp/src/arrow/flight/CMakeLists.txt
index 6ec5d5d..a46e776 100644
--- a/cpp/src/arrow/flight/CMakeLists.txt
+++ b/cpp/src/arrow/flight/CMakeLists.txt
@@ -27,6 +27,10 @@ set(ARROW_FLIGHT_STATIC_LINK_LIBS
gRPC::gpr
c-ares::cares)
+if(WIN32)
+ list(APPEND ARROW_FLIGHT_STATIC_LINK_LIBS Ws2_32.lib)
+endif()
+
if(GRPC_HAS_ADDRESS_SORTING)
list(APPEND ARROW_FLIGHT_STATIC_LINK_LIBS gRPC::address_sorting)
endif()
@@ -81,6 +85,8 @@ set(ARROW_FLIGHT_SRCS
types.cc)
add_arrow_lib(arrow_flight
+ OUTPUTS
+ ARROW_FLIGHT_LIBRARIES
SOURCES
${ARROW_FLIGHT_SRCS}
DEPENDENCIES
@@ -95,9 +101,15 @@ add_arrow_lib(arrow_flight
arrow_static
${ARROW_FLIGHT_STATIC_LINK_LIBS})
+foreach(LIB_TARGET ${ARROW_FLIGHT_LIBRARIES})
+ target_compile_definitions(${LIB_TARGET} PRIVATE ARROW_FLIGHT_EXPORTING)
+endforeach()
+
# Define arrow_flight_testing library
if(ARROW_BUILD_TESTS OR ARROW_BUILD_BENCHMARKS)
add_arrow_lib(arrow_flight_testing
+ OUTPUTS
+ ARROW_FLIGHT_TESTING_LIBRARIES
SOURCES
test-util.cc
DEPENDENCIES
@@ -108,14 +120,20 @@ if(ARROW_BUILD_TESTS OR ARROW_BUILD_BENCHMARKS)
SHARED_LINK_LIBS
arrow_shared
arrow_flight_shared
+ arrow_testing_shared
${BOOST_FILESYSTEM_LIBRARY}
${BOOST_SYSTEM_LIBRARY}
GTest::GTest
STATIC_LINK_LIBS
arrow_static
- arrow_flight_static)
+ arrow_flight_static
+ arrow_testing_static)
endif()
+foreach(LIB_TARGET ${ARROW_FLIGHT_TESTING_LIBRARIES})
+ target_compile_definitions(${LIB_TARGET} PRIVATE ARROW_FLIGHT_EXPORTING)
+endforeach()
+
add_arrow_test(flight-test
EXTRA_LINK_LIBS
${ARROW_FLIGHT_TEST_LINK_LIBS}
diff --git a/cpp/src/arrow/flight/client.cc b/cpp/src/arrow/flight/client.cc
index d831647..1c927da 100644
--- a/cpp/src/arrow/flight/client.cc
+++ b/cpp/src/arrow/flight/client.cc
@@ -17,12 +17,14 @@
#include "arrow/flight/client.h"
+// Platform-specific defines
+#include "arrow/flight/platform.h"
+
#include <memory>
#include <sstream>
#include <string>
#include <utility>
-#include "arrow/util/config.h"
#ifdef GRPCPP_PP_INCLUDE
#include <grpcpp/grpcpp.h>
#else
diff --git a/cpp/src/arrow/flight/client.h b/cpp/src/arrow/flight/client.h
index 276ffc7..689c9f8 100644
--- a/cpp/src/arrow/flight/client.h
+++ b/cpp/src/arrow/flight/client.h
@@ -27,9 +27,9 @@
#include "arrow/ipc/writer.h"
#include "arrow/status.h"
-#include "arrow/util/visibility.h"
#include "arrow/flight/types.h" // IWYU pragma: keep
+#include "arrow/flight/visibility.h"
namespace arrow {
@@ -46,7 +46,7 @@ class ClientAuthHandler;
typedef std::chrono::duration<double, std::chrono::seconds::period> TimeoutDuration;
/// \brief Hints to the underlying RPC layer for Arrow Flight calls.
-class ARROW_EXPORT FlightCallOptions {
+class ARROW_FLIGHT_EXPORT FlightCallOptions {
public:
/// Create a default set of call options.
FlightCallOptions();
@@ -57,14 +57,14 @@ class ARROW_EXPORT FlightCallOptions {
TimeoutDuration timeout;
};
-class ARROW_EXPORT FlightClientOptions {
+class ARROW_FLIGHT_EXPORT FlightClientOptions {
public:
std::string tls_root_certs;
};
/// \brief Client class for Arrow Flight RPC services (gRPC-based).
/// API experimental for now
-class ARROW_EXPORT FlightClient {
+class ARROW_FLIGHT_EXPORT FlightClient {
public:
~FlightClient();
diff --git a/cpp/src/arrow/flight/client_auth.h b/cpp/src/arrow/flight/client_auth.h
index cc7ed10..9dad36a 100644
--- a/cpp/src/arrow/flight/client_auth.h
+++ b/cpp/src/arrow/flight/client_auth.h
@@ -19,8 +19,8 @@
#include <string>
+#include "arrow/flight/visibility.h"
#include "arrow/status.h"
-#include "arrow/util/visibility.h"
namespace arrow {
@@ -28,7 +28,7 @@ namespace flight {
/// \brief A reader for messages from the server during an
/// authentication handshake.
-class ARROW_EXPORT ClientAuthReader {
+class ARROW_FLIGHT_EXPORT ClientAuthReader {
public:
virtual ~ClientAuthReader() = default;
virtual Status Read(std::string* response) = 0;
@@ -36,7 +36,7 @@ class ARROW_EXPORT ClientAuthReader {
/// \brief A writer for messages to the server during an
/// authentication handshake.
-class ARROW_EXPORT ClientAuthSender {
+class ARROW_FLIGHT_EXPORT ClientAuthSender {
public:
virtual ~ClientAuthSender() = default;
virtual Status Write(const std::string& token) = 0;
@@ -46,7 +46,7 @@ class ARROW_EXPORT ClientAuthSender {
/// Authentication includes both an initial negotiation and a per-call
/// token validation. Implementations may choose to use either or both
/// mechanisms.
-class ARROW_EXPORT ClientAuthHandler {
+class ARROW_FLIGHT_EXPORT ClientAuthHandler {
public:
virtual ~ClientAuthHandler() = default;
/// \brief Authenticate the client on initial connection. The client
diff --git a/cpp/src/arrow/flight/customize_protobuf.h b/cpp/src/arrow/flight/customize_protobuf.h
index 1d67480..f27ab0b 100644
--- a/cpp/src/arrow/flight/customize_protobuf.h
+++ b/cpp/src/arrow/flight/customize_protobuf.h
@@ -20,7 +20,15 @@
#include <limits>
#include <memory>
+#include "arrow/flight/platform.h"
#include "arrow/util/config.h"
+
+// Silence protobuf warnings
+#ifdef _MSC_VER
+#pragma warning(push)
+#pragma warning(disable : 4244)
+#endif
+
#ifdef GRPCPP_PP_INCLUDE
#include <grpcpp/impl/codegen/config_protobuf.h>
#else
@@ -40,6 +48,10 @@
#include <grpc++/impl/codegen/proto_utils.h>
#endif
+#ifdef _MSC_VER
+#pragma warning(pop)
+#endif
+
namespace grpc {
class ByteBuffer;
diff --git a/cpp/src/arrow/flight/flight-test.cc b/cpp/src/arrow/flight/flight-test.cc
index e3e01df..cb7e57c 100644
--- a/cpp/src/arrow/flight/flight-test.cc
+++ b/cpp/src/arrow/flight/flight-test.cc
@@ -21,6 +21,7 @@
#include <cstring>
#include <iostream>
#include <memory>
+#include <sstream>
#include <string>
#include <thread>
#include <vector>
@@ -38,7 +39,6 @@
#error "gRPC headers should not be in public API"
#endif
-#include "arrow/flight/Flight.pb.h"
#include "arrow/flight/internal.h"
#include "arrow/flight/test-util.h"
@@ -117,6 +117,8 @@ TEST(TestFlightDescriptor, Basics) {
ASSERT_TRUE(e.Equals(f));
}
+// This tests the internal protobuf types which don't get exported in the Flight DLL.
+#ifndef _WIN32
TEST(TestFlightDescriptor, ToFromProto) {
FlightDescriptor descr_test;
pb::FlightDescriptor pb_descr;
@@ -131,9 +133,10 @@ TEST(TestFlightDescriptor, ToFromProto) {
ASSERT_OK(internal::FromProto(pb_descr, &descr_test));
AssertEqual(descr2, descr_test);
}
+#endif
TEST(TestFlight, StartStopTestServer) {
- TestServer server("flight-test-server", 30000);
+ TestServer server("flight-test-server");
server.Start();
ASSERT_TRUE(server.IsRunning());
@@ -141,20 +144,29 @@ TEST(TestFlight, StartStopTestServer) {
ASSERT_TRUE(server.IsRunning());
int exit_code = server.Stop();
+#ifdef _WIN32
+ // We do a hard kill on Windows
+ ASSERT_EQ(259, exit_code);
+#else
ASSERT_EQ(0, exit_code);
+#endif
ASSERT_FALSE(server.IsRunning());
}
TEST(TestFlight, ConnectUri) {
- TestServer server("flight-test-server", 30000);
+ TestServer server("flight-test-server");
server.Start();
ASSERT_TRUE(server.IsRunning());
+ std::stringstream ss;
+ ss << "grpc://localhost:" << server.port();
+ std::string uri = ss.str();
+
std::unique_ptr<FlightClient> client;
Location location1;
Location location2;
- ASSERT_OK(Location::Parse("grpc://localhost:30000", &location1));
- ASSERT_OK(Location::Parse("grpc://localhost:30000", &location2));
+ ASSERT_OK(Location::Parse(uri, &location1));
+ ASSERT_OK(Location::Parse(uri, &location2));
ASSERT_OK(FlightClient::Connect(location1, &client));
ASSERT_OK(FlightClient::Connect(location2, &client));
}
@@ -174,9 +186,9 @@ class TestFlightClient : public ::testing::Test {
// void TearDown() {}
void SetUp() {
- port_ = 30000;
- server_.reset(new TestServer("flight-test-server", port_));
+ server_.reset(new TestServer("flight-test-server"));
server_->Start();
+ port_ = server_->port();
ASSERT_OK(ConnectClient());
}
@@ -258,7 +270,7 @@ class TestAuthHandler : public ::testing::Test {
Location location;
std::unique_ptr<FlightServerBase> server(new AuthTestServer);
- ASSERT_OK(Location::ForGrpcTcp("localhost", 30000, &location));
+ ASSERT_OK(Location::ForGrpcTcp("localhost", GetListenPort(), &location));
FlightServerOptions options(location);
options.auth_handler =
std::unique_ptr<ServerAuthHandler>(new TestServerAuthHandler("user", "p4ssw0rd"));
@@ -282,7 +294,7 @@ class TestDoPut : public ::testing::Test {
public:
void SetUp() {
Location location;
- ASSERT_OK(Location::ForGrpcTcp("localhost", 30000, &location));
+ ASSERT_OK(Location::ForGrpcTcp("localhost", GetListenPort(), &location));
do_put_server_ = new DoPutTestServer();
server_.reset(new InProcessTestServer(
@@ -464,7 +476,7 @@ TEST_F(TestFlightClient, TimeoutFires) {
TEST_F(TestFlightClient, NoTimeout) {
// Call should complete quickly, so timeout should not fire
FlightCallOptions options;
- options.timeout = TimeoutDuration{0.5};
+ options.timeout = TimeoutDuration{5.0}; // account for slow server process startup
std::unique_ptr<FlightInfo> info;
auto start = std::chrono::system_clock::now();
auto descriptor = FlightDescriptor::Path({"examples", "ints"});
diff --git a/cpp/src/arrow/flight/internal.cc b/cpp/src/arrow/flight/internal.cc
index c60e585..5582149 100644
--- a/cpp/src/arrow/flight/internal.cc
+++ b/cpp/src/arrow/flight/internal.cc
@@ -16,6 +16,7 @@
// under the License.
#include "arrow/flight/internal.h"
+#include "arrow/flight/platform.h"
#include "arrow/flight/protocol-internal.h"
#include <cstddef>
@@ -23,7 +24,6 @@
#include <string>
#include <utility>
-#include "arrow/util/config.h"
#ifdef GRPCPP_PP_INCLUDE
#include <grpcpp/grpcpp.h>
#else
diff --git a/cpp/src/arrow/flight/internal.h b/cpp/src/arrow/flight/internal.h
index db95eb3..784e8eb 100644
--- a/cpp/src/arrow/flight/internal.h
+++ b/cpp/src/arrow/flight/internal.h
@@ -65,8 +65,17 @@ namespace internal {
static const char* AUTH_HEADER = "auth-token-bin";
+ARROW_FLIGHT_EXPORT
Status SchemaToString(const Schema& schema, std::string* out);
+ARROW_FLIGHT_EXPORT
+Status FromGrpcStatus(const grpc::Status& grpc_status);
+
+ARROW_FLIGHT_EXPORT
+grpc::Status ToGrpcStatus(const Status& arrow_status);
+
+// These functions depend on protobuf types which are not exported in the Flight DLL.
+
Status FromProto(const pb::ActionType& pb_type, ActionType* type);
Status FromProto(const pb::Action& pb_action, Action* action);
Status FromProto(const pb::Result& pb_result, Result* result);
@@ -86,10 +95,6 @@ Status ToProto(const Action& action, pb::Action* pb_action);
Status ToProto(const Result& result, pb::Result* pb_result);
void ToProto(const Ticket& ticket, pb::Ticket* pb_ticket);
-Status FromGrpcStatus(const grpc::Status& grpc_status);
-
-grpc::Status ToGrpcStatus(const Status& arrow_status);
-
} // namespace internal
} // namespace flight
} // namespace arrow
diff --git a/cpp/src/arrow/util/windows_compatibility.h b/cpp/src/arrow/flight/platform.h
similarity index 63%
copy from cpp/src/arrow/util/windows_compatibility.h
copy to cpp/src/arrow/flight/platform.h
index 7b70e28..7f1b095 100644
--- a/cpp/src/arrow/util/windows_compatibility.h
+++ b/cpp/src/arrow/flight/platform.h
@@ -15,21 +15,18 @@
// specific language governing permissions and limitations
// under the License.
-#pragma once
-
-#ifdef _WIN32
+// Internal header. Platform-specific definitions for gRPC.
-// Windows defines min and max macros that mess up std::min/max
-#ifndef NOMINMAX
-#define NOMINMAX
-#endif
+#pragma once
-#define WIN32_LEAN_AND_MEAN
+#ifdef _MSC_VER
-#include <winsock2.h>
-#include <windows.h>
+// The protobuf documentation says that C4251 warnings when using the
+// library are spurious and suppressed when the build the library and
+// compiler, but must be also suppressed in downstream projects
+#pragma warning(disable : 4251)
-// TODO(wesm): address when/if we add windows support
-// #include <util/syserr_reporting.hpp>
+#endif // _MSC_VER
-#endif // _WIN32
+#include "arrow/util/config.h" // IWYU pragma: keep
+#include "arrow/util/windows_compatibility.h" // IWYU pragma: keep
diff --git a/cpp/src/arrow/flight/protocol-internal.h b/cpp/src/arrow/flight/protocol-internal.h
index 848c1a8..98bf923 100644
--- a/cpp/src/arrow/flight/protocol-internal.h
+++ b/cpp/src/arrow/flight/protocol-internal.h
@@ -16,6 +16,9 @@
#pragma once
+// This addresses platform-specific defines, e.g. on Windows
+#include "arrow/flight/platform.h" // IWYU pragma: keep
+
// This header holds the Flight protobuf definitions.
// Need to include this first to get our gRPC customizations
diff --git a/cpp/src/arrow/flight/serialization-internal.cc b/cpp/src/arrow/flight/serialization-internal.cc
index c0d0bc1..d78bac8 100644
--- a/cpp/src/arrow/flight/serialization-internal.cc
+++ b/cpp/src/arrow/flight/serialization-internal.cc
@@ -22,11 +22,12 @@
#include <string>
#include <vector>
-#include "arrow/util/config.h"
+#include "arrow/flight/platform.h"
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
#include <google/protobuf/wire_format_lite.h>
+
#include <grpc/byte_buffer_reader.h>
#ifdef GRPCPP_PP_INCLUDE
#include <grpcpp/grpcpp.h>
diff --git a/cpp/src/arrow/flight/server.cc b/cpp/src/arrow/flight/server.cc
index 46dd5bf..9b6bf6c 100644
--- a/cpp/src/arrow/flight/server.cc
+++ b/cpp/src/arrow/flight/server.cc
@@ -15,6 +15,9 @@
// specific language governing permissions and limitations
// under the License.
+// Platform-specific defines
+#include "arrow/flight/platform.h"
+
#include "arrow/flight/server.h"
#include <signal.h>
@@ -25,7 +28,6 @@
#include <string>
#include <utility>
-#include "arrow/util/config.h"
#ifdef GRPCPP_PP_INCLUDE
#include <grpcpp/grpcpp.h>
#else
@@ -39,6 +41,7 @@
#include "arrow/memory_pool.h"
#include "arrow/record_batch.h"
#include "arrow/status.h"
+#include "arrow/util/io-util.h"
#include "arrow/util/logging.h"
#include "arrow/util/stl.h"
#include "arrow/util/uri.h"
@@ -407,21 +410,41 @@ class FlightServiceImpl : public FlightService::Service {
} // namespace
+//
+// gRPC server lifecycle
+//
+
#if (ATOMIC_INT_LOCK_FREE != 2 || ATOMIC_POINTER_LOCK_FREE != 2)
#error "atomic ints and atomic pointers not always lock-free!"
#endif
+using ::arrow::internal::GetSignalHandler;
+using ::arrow::internal::SetSignalHandler;
+using ::arrow::internal::SignalHandler;
+
struct FlightServerBase::Impl {
std::unique_ptr<FlightServiceImpl> service_;
std::unique_ptr<grpc::Server> server_;
+#ifdef _WIN32
+ // Signal handlers are executed in a separate thread on Windows, so getting
+ // the current thread instance wouldn't make sense. This means only a single
+ // instance can receive signals on Windows.
+ static std::atomic<Impl*> running_instance_;
+#else
+ static thread_local std::atomic<Impl*> running_instance_;
+#endif
// Signal handling
std::vector<int> signals_;
- std::vector<struct sigaction> old_signal_handlers_;
+ std::vector<SignalHandler> old_signal_handlers_;
std::atomic<int> got_signal_;
- static thread_local std::atomic<Impl*> running_instance_;
- static void HandleSignal(int signum);
+ static void HandleSignal(int signum) {
+ auto instance = running_instance_.load();
+ if (instance != nullptr) {
+ instance->DoHandleSignal(signum);
+ }
+ }
void DoHandleSignal(int signum) {
got_signal_ = signum;
@@ -429,15 +452,12 @@ struct FlightServerBase::Impl {
}
};
+#ifdef _WIN32
+std::atomic<FlightServerBase::Impl*> FlightServerBase::Impl::running_instance_;
+#else
thread_local std::atomic<FlightServerBase::Impl*>
FlightServerBase::Impl::running_instance_;
-
-void FlightServerBase::Impl::HandleSignal(int signum) {
- auto instance = running_instance_.load();
- if (instance != nullptr) {
- instance->DoHandleSignal(signum);
- }
-}
+#endif
FlightServerOptions::FlightServerOptions(const Location& location_)
: location(location_), auth_handler(nullptr) {}
@@ -502,23 +522,15 @@ Status FlightServerBase::Serve() {
if (!impl_->server_) {
return Status::UnknownError("Server did not start properly");
}
-
impl_->got_signal_ = 0;
+ impl_->old_signal_handlers_.clear();
impl_->running_instance_ = impl_.get();
- // Setup signal handlers
- impl_->old_signal_handlers_.clear();
+ // Override existing signal handlers with our own handler so as to stop the server.
for (size_t i = 0; i < impl_->signals_.size(); ++i) {
int signum = impl_->signals_[i];
- // Override with our own handler so as to stop the server.
- struct sigaction sa, old_handler;
- sa.sa_handler = &Impl::HandleSignal;
- sa.sa_flags = 0;
- sigemptyset(&sa.sa_mask);
- int ret = sigaction(signum, &sa, &old_handler);
- if (ret != 0) {
- return Status::IOError("sigaction call failed");
- }
+ SignalHandler new_handler(&Impl::HandleSignal), old_handler;
+ RETURN_NOT_OK(SetSignalHandler(signum, new_handler, &old_handler));
impl_->old_signal_handlers_.push_back(old_handler);
}
@@ -527,10 +539,8 @@ Status FlightServerBase::Serve() {
// Restore signal handlers
for (size_t i = 0; i < impl_->signals_.size(); ++i) {
- int ret = sigaction(impl_->signals_[i], &impl_->old_signal_handlers_[i], nullptr);
- if (ret != 0) {
- return Status::IOError("sigaction call failed");
- }
+ RETURN_NOT_OK(
+ SetSignalHandler(impl_->signals_[i], impl_->old_signal_handlers_[i], nullptr));
}
return Status::OK();
@@ -659,11 +669,15 @@ class RecordBatchStream::RecordBatchStreamImpl {
int dictionary_index_ = 0;
};
+FlightDataStream::~FlightDataStream() {}
+
RecordBatchStream::RecordBatchStream(const std::shared_ptr<RecordBatchReader>& reader,
MemoryPool* pool) {
impl_.reset(new RecordBatchStreamImpl(reader, pool));
}
+RecordBatchStream::~RecordBatchStream() {}
+
std::shared_ptr<Schema> RecordBatchStream::schema() { return impl_->schema(); }
Status RecordBatchStream::GetSchemaPayload(FlightPayload* payload) {
diff --git a/cpp/src/arrow/flight/server.h b/cpp/src/arrow/flight/server.h
index 28e87aa..7164b64 100644
--- a/cpp/src/arrow/flight/server.h
+++ b/cpp/src/arrow/flight/server.h
@@ -25,11 +25,11 @@
#include <vector>
#include "arrow/flight/server_auth.h"
-#include "arrow/flight/types.h" // IWYU pragma: keep
+#include "arrow/flight/types.h" // IWYU pragma: keep
+#include "arrow/flight/visibility.h" // IWYU pragma: keep
#include "arrow/ipc/dictionary.h"
#include "arrow/memory_pool.h"
#include "arrow/record_batch.h"
-#include "arrow/util/visibility.h"
namespace arrow {
@@ -41,9 +41,9 @@ namespace flight {
/// \brief Interface that produces a sequence of IPC payloads to be sent in
/// FlightData protobuf messages
-class ARROW_EXPORT FlightDataStream {
+class ARROW_FLIGHT_EXPORT FlightDataStream {
public:
- virtual ~FlightDataStream() = default;
+ virtual ~FlightDataStream();
virtual std::shared_ptr<Schema> schema() = 0;
@@ -57,12 +57,13 @@ class ARROW_EXPORT FlightDataStream {
/// \brief A basic implementation of FlightDataStream that will provide
/// a sequence of FlightData messages to be written to a gRPC stream
-class ARROW_EXPORT RecordBatchStream : public FlightDataStream {
+class ARROW_FLIGHT_EXPORT RecordBatchStream : public FlightDataStream {
public:
/// \param[in] reader produces a sequence of record batches
/// \param[in,out] pool a MemoryPool to use for allocations
explicit RecordBatchStream(const std::shared_ptr<RecordBatchReader>& reader,
MemoryPool* pool = default_memory_pool());
+ ~RecordBatchStream() override;
std::shared_ptr<Schema> schema() override;
Status GetSchemaPayload(FlightPayload* payload) override;
@@ -73,22 +74,33 @@ class ARROW_EXPORT RecordBatchStream : public FlightDataStream {
std::unique_ptr<RecordBatchStreamImpl> impl_;
};
+// Silence warning
+// "non dll-interface class RecordBatchReader used as base for dll-interface class"
+#ifdef _MSC_VER
+#pragma warning(push)
+#pragma warning(disable : 4275)
+#endif
+
/// \brief A reader for IPC payloads uploaded by a client
-class ARROW_EXPORT FlightMessageReader : public RecordBatchReader {
+class ARROW_FLIGHT_EXPORT FlightMessageReader : public RecordBatchReader {
public:
/// \brief Get the descriptor for this upload.
virtual const FlightDescriptor& descriptor() const = 0;
};
+#ifdef _MSC_VER
+#pragma warning(pop)
+#endif
+
/// \brief Call state/contextual data.
-class ARROW_EXPORT ServerCallContext {
+class ARROW_FLIGHT_EXPORT ServerCallContext {
public:
virtual ~ServerCallContext() = default;
/// \brief The name of the authenticated peer (may be the empty string)
virtual const std::string& peer_identity() const = 0;
};
-class ARROW_EXPORT FlightServerOptions {
+class ARROW_FLIGHT_EXPORT FlightServerOptions {
public:
explicit FlightServerOptions(const Location& location_);
@@ -100,7 +112,7 @@ class ARROW_EXPORT FlightServerOptions {
/// \brief Skeleton RPC server implementation which can be used to create
/// custom servers by implementing its abstract methods
-class ARROW_EXPORT FlightServerBase {
+class ARROW_FLIGHT_EXPORT FlightServerBase {
public:
FlightServerBase();
virtual ~FlightServerBase();
diff --git a/cpp/src/arrow/flight/server_auth.h b/cpp/src/arrow/flight/server_auth.h
index 5d06894..b1ccb09 100644
--- a/cpp/src/arrow/flight/server_auth.h
+++ b/cpp/src/arrow/flight/server_auth.h
@@ -21,8 +21,8 @@
#include <string>
+#include "arrow/flight/visibility.h"
#include "arrow/status.h"
-#include "arrow/util/visibility.h"
namespace arrow {
@@ -30,7 +30,7 @@ namespace flight {
/// \brief A reader for messages from the client during an
/// authentication handshake.
-class ARROW_EXPORT ServerAuthReader {
+class ARROW_FLIGHT_EXPORT ServerAuthReader {
public:
virtual ~ServerAuthReader() = default;
virtual Status Read(std::string* token) = 0;
@@ -38,7 +38,7 @@ class ARROW_EXPORT ServerAuthReader {
/// \brief A writer for messages to the client during an
/// authentication handshake.
-class ARROW_EXPORT ServerAuthSender {
+class ARROW_FLIGHT_EXPORT ServerAuthSender {
public:
virtual ~ServerAuthSender() = default;
virtual Status Write(const std::string& message) = 0;
@@ -50,7 +50,7 @@ class ARROW_EXPORT ServerAuthSender {
/// mechanisms.
/// An implementation may need to track some state, e.g. a mapping of
/// client tokens to authenticated identities.
-class ARROW_EXPORT ServerAuthHandler {
+class ARROW_FLIGHT_EXPORT ServerAuthHandler {
public:
virtual ~ServerAuthHandler();
/// \brief Authenticate the client on initial connection. The server
@@ -67,7 +67,7 @@ class ARROW_EXPORT ServerAuthHandler {
};
/// \brief An authentication mechanism that does nothing.
-class ARROW_EXPORT NoOpAuthHandler : public ServerAuthHandler {
+class ARROW_FLIGHT_EXPORT NoOpAuthHandler : public ServerAuthHandler {
public:
~NoOpAuthHandler() override;
Status Authenticate(ServerAuthSender* outgoing, ServerAuthReader* incoming) override;
diff --git a/cpp/src/arrow/flight/test-util.cc b/cpp/src/arrow/flight/test-util.cc
index f870dcb..b20a4cb 100644
--- a/cpp/src/arrow/flight/test-util.cc
+++ b/cpp/src/arrow/flight/test-util.cc
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+#include "arrow/flight/platform.h"
+
#ifdef __APPLE__
#include <limits.h>
#include <mach-o/dyld.h>
@@ -58,6 +60,12 @@ Status ResolveCurrentExecutable(fs::path* out) {
return Status::Invalid("Can't resolve current exe: path too large");
}
*out = fs::canonical(buf, ec);
+#elif defined(_WIN32)
+ char buf[MAX_PATH + 1];
+ if (!GetModuleFileNameA(NULL, buf, sizeof(buf))) {
+ return Status::Invalid("Can't get executable file path");
+ }
+ *out = fs::canonical(buf, ec);
#else
ARROW_UNUSED(ec);
return Status::NotImplemented("Not available on this system");
@@ -72,6 +80,10 @@ Status ResolveCurrentExecutable(fs::path* out) {
} // namespace
+static int next_listen_port_ = 30001;
+
+int GetListenPort() { return next_listen_port_++; }
+
void TestServer::Start() {
namespace fs = boost::filesystem;
@@ -105,7 +117,12 @@ void TestServer::Start() {
int TestServer::Stop() {
if (server_process_ && server_process_->valid()) {
+#ifndef _WIN32
kill(server_process_->id(), SIGTERM);
+#else
+ // This would use SIGKILL on POSIX, which is more brutal than SIGTERM
+ server_process_->terminate();
+#endif
server_process_->wait();
return server_process_->exit_code();
} else {
diff --git a/cpp/src/arrow/flight/test-util.h b/cpp/src/arrow/flight/test-util.h
index b5bc31a..2e1f4b0 100644
--- a/cpp/src/arrow/flight/test-util.h
+++ b/cpp/src/arrow/flight/test-util.h
@@ -27,6 +27,7 @@
#include "arrow/flight/client_auth.h"
#include "arrow/flight/server_auth.h"
#include "arrow/flight/types.h"
+#include "arrow/flight/visibility.h"
namespace boost {
namespace process {
@@ -42,8 +43,16 @@ namespace flight {
// ----------------------------------------------------------------------
// Fixture to use for running test servers
-class ARROW_EXPORT TestServer {
+// Get a TCP port number to listen on. This is a different number every time,
+// as reusing the same port accross tests can produce spurious "Stream removed"
+// errors as Windows.
+ARROW_FLIGHT_EXPORT
+int GetListenPort();
+
+class ARROW_FLIGHT_EXPORT TestServer {
public:
+ explicit TestServer(const std::string& executable_name)
+ : executable_name_(executable_name), port_(GetListenPort()) {}
explicit TestServer(const std::string& executable_name, int port)
: executable_name_(executable_name), port_(port) {}
@@ -61,7 +70,7 @@ class ARROW_EXPORT TestServer {
std::shared_ptr<::boost::process::child> server_process_;
};
-class ARROW_EXPORT InProcessTestServer {
+class ARROW_FLIGHT_EXPORT InProcessTestServer {
public:
explicit InProcessTestServer(std::unique_ptr<FlightServerBase> server,
const Location& location)
@@ -80,7 +89,14 @@ class ARROW_EXPORT InProcessTestServer {
// ----------------------------------------------------------------------
// A RecordBatchReader for serving a sequence of in-memory record batches
-class BatchIterator : public RecordBatchReader {
+// Silence warning
+// "non dll-interface class RecordBatchReader used as base for dll-interface class"
+#ifdef _MSC_VER
+#pragma warning(push)
+#pragma warning(disable : 4275)
+#endif
+
+class ARROW_FLIGHT_EXPORT BatchIterator : public RecordBatchReader {
public:
BatchIterator(const std::shared_ptr<Schema>& schema,
const std::vector<std::shared_ptr<RecordBatch>>& batches)
@@ -103,30 +119,37 @@ class BatchIterator : public RecordBatchReader {
size_t position_;
};
+#ifdef _MSC_VER
+#pragma warning(pop)
+#endif
+
// ----------------------------------------------------------------------
// Example data for test-server and unit tests
using BatchVector = std::vector<std::shared_ptr<RecordBatch>>;
-ARROW_EXPORT std::shared_ptr<Schema> ExampleIntSchema();
+ARROW_FLIGHT_EXPORT
+std::shared_ptr<Schema> ExampleIntSchema();
-ARROW_EXPORT std::shared_ptr<Schema> ExampleStringSchema();
+ARROW_FLIGHT_EXPORT
+std::shared_ptr<Schema> ExampleStringSchema();
-ARROW_EXPORT std::shared_ptr<Schema> ExampleDictSchema();
+ARROW_FLIGHT_EXPORT
+std::shared_ptr<Schema> ExampleDictSchema();
-ARROW_EXPORT
+ARROW_FLIGHT_EXPORT
Status ExampleIntBatches(BatchVector* out);
-ARROW_EXPORT
+ARROW_FLIGHT_EXPORT
Status ExampleDictBatches(BatchVector* out);
-ARROW_EXPORT
+ARROW_FLIGHT_EXPORT
std::vector<FlightInfo> ExampleFlightInfo();
-ARROW_EXPORT
+ARROW_FLIGHT_EXPORT
std::vector<ActionType> ExampleActionTypes();
-ARROW_EXPORT
+ARROW_FLIGHT_EXPORT
Status MakeFlightInfo(const Schema& schema, const FlightDescriptor& descriptor,
const std::vector<FlightEndpoint>& endpoints, int64_t total_records,
int64_t total_bytes, FlightInfo::Data* out);
@@ -135,7 +158,7 @@ Status MakeFlightInfo(const Schema& schema, const FlightDescriptor& descriptor,
// A pair of authentication handlers that check for a predefined password
// and set the peer identity to a predefined username.
-class ARROW_EXPORT TestServerAuthHandler : public ServerAuthHandler {
+class ARROW_FLIGHT_EXPORT TestServerAuthHandler : public ServerAuthHandler {
public:
explicit TestServerAuthHandler(const std::string& username,
const std::string& password);
@@ -148,7 +171,7 @@ class ARROW_EXPORT TestServerAuthHandler : public ServerAuthHandler {
std::string password_;
};
-class ARROW_EXPORT TestClientAuthHandler : public ClientAuthHandler {
+class ARROW_FLIGHT_EXPORT TestClientAuthHandler : public ClientAuthHandler {
public:
explicit TestClientAuthHandler(const std::string& username,
const std::string& password);
diff --git a/cpp/src/arrow/flight/types.h b/cpp/src/arrow/flight/types.h
index ba07b99..8d37225 100644
--- a/cpp/src/arrow/flight/types.h
+++ b/cpp/src/arrow/flight/types.h
@@ -26,8 +26,8 @@
#include <utility>
#include <vector>
+#include "arrow/flight/visibility.h"
#include "arrow/ipc/writer.h"
-#include "arrow/util/visibility.h"
namespace arrow {
@@ -50,7 +50,7 @@ class Uri;
namespace flight {
/// \brief A type of action that can be performed with the DoAction RPC
-struct ActionType {
+struct ARROW_FLIGHT_EXPORT ActionType {
/// Name of action
std::string type;
@@ -59,13 +59,13 @@ struct ActionType {
};
/// \brief Opaque selection critera for ListFlights RPC
-struct Criteria {
+struct ARROW_FLIGHT_EXPORT Criteria {
/// Opaque criteria expression, dependent on server implementation
std::string expression;
};
/// \brief An action to perform with the DoAction RPC
-struct Action {
+struct ARROW_FLIGHT_EXPORT Action {
/// The action type
std::string type;
@@ -74,15 +74,15 @@ struct Action {
};
/// \brief Opaque result returned after executing an action
-struct Result {
+struct ARROW_FLIGHT_EXPORT Result {
std::shared_ptr<Buffer> body;
};
/// \brief A message received after completing a DoPut stream
-struct PutResult {};
+struct ARROW_FLIGHT_EXPORT PutResult {};
/// \brief A request to retrieve or generate a dataset
-struct FlightDescriptor {
+struct ARROW_FLIGHT_EXPORT FlightDescriptor {
enum DescriptorType {
UNKNOWN = 0, /// Unused
PATH = 1, /// Named path identifying a dataset
@@ -117,7 +117,7 @@ struct FlightDescriptor {
/// \brief Data structure providing an opaque identifier or credential to use
/// when requesting a data stream with the DoGet RPC
-struct Ticket {
+struct ARROW_FLIGHT_EXPORT Ticket {
std::string ticket;
};
@@ -130,7 +130,7 @@ static const char* kSchemeGrpcUnix = "grpc+unix";
static const char* kSchemeGrpcTls = "grpc+tls";
/// \brief A host location (a URI)
-struct Location {
+struct ARROW_FLIGHT_EXPORT Location {
public:
/// \brief Initialize a blank location.
Location();
@@ -174,7 +174,7 @@ struct Location {
/// \brief A flight ticket and list of locations where the ticket can be
/// redeemed
-struct FlightEndpoint {
+struct ARROW_FLIGHT_EXPORT FlightEndpoint {
/// Opaque ticket identify; use with DoGet RPC
Ticket ticket;
@@ -187,14 +187,14 @@ struct FlightEndpoint {
/// \brief Staging data structure for messages about to be put on the wire
///
/// This structure corresponds to FlightData in the protocol.
-struct FlightPayload {
+struct ARROW_FLIGHT_EXPORT FlightPayload {
std::shared_ptr<Buffer> descriptor;
ipc::internal::IpcPayload ipc_message;
};
/// \brief The access coordinates for retireval of a dataset, returned by
/// GetFlightInfo
-class FlightInfo {
+class ARROW_FLIGHT_EXPORT FlightInfo {
public:
struct Data {
std::string schema;
@@ -239,7 +239,7 @@ class FlightInfo {
};
/// \brief An iterator to FlightInfo instances returned by ListFlights
-class ARROW_EXPORT FlightListing {
+class ARROW_FLIGHT_EXPORT FlightListing {
public:
virtual ~FlightListing() = default;
@@ -251,7 +251,7 @@ class ARROW_EXPORT FlightListing {
};
/// \brief An iterator to Result instances returned by DoAction
-class ARROW_EXPORT ResultStream {
+class ARROW_FLIGHT_EXPORT ResultStream {
public:
virtual ~ResultStream() = default;
@@ -264,7 +264,7 @@ class ARROW_EXPORT ResultStream {
// \brief Create a FlightListing from a vector of FlightInfo objects. This can
// be iterated once, then it is consumed
-class ARROW_EXPORT SimpleFlightListing : public FlightListing {
+class ARROW_FLIGHT_EXPORT SimpleFlightListing : public FlightListing {
public:
explicit SimpleFlightListing(const std::vector<FlightInfo>& flights);
explicit SimpleFlightListing(std::vector<FlightInfo>&& flights);
@@ -276,7 +276,7 @@ class ARROW_EXPORT SimpleFlightListing : public FlightListing {
std::vector<FlightInfo> flights_;
};
-class ARROW_EXPORT SimpleResultStream : public ResultStream {
+class ARROW_FLIGHT_EXPORT SimpleResultStream : public ResultStream {
public:
explicit SimpleResultStream(std::vector<Result>&& results);
Status Next(std::unique_ptr<Result>* result) override;
diff --git a/cpp/src/arrow/util/windows_compatibility.h b/cpp/src/arrow/flight/visibility.h
similarity index 53%
copy from cpp/src/arrow/util/windows_compatibility.h
copy to cpp/src/arrow/flight/visibility.h
index 7b70e28..bdee8b7 100644
--- a/cpp/src/arrow/util/windows_compatibility.h
+++ b/cpp/src/arrow/flight/visibility.h
@@ -17,19 +17,32 @@
#pragma once
-#ifdef _WIN32
-
-// Windows defines min and max macros that mess up std::min/max
-#ifndef NOMINMAX
-#define NOMINMAX
+#if defined(_WIN32) || defined(__CYGWIN__)
+#if defined(_MSC_VER)
+#pragma warning(push)
+#pragma warning(disable : 4251)
+#else
+#pragma GCC diagnostic ignored "-Wattributes"
#endif
-#define WIN32_LEAN_AND_MEAN
-
-#include <winsock2.h>
-#include <windows.h>
+#ifdef ARROW_FLIGHT_STATIC
+#define ARROW_FLIGHT_EXPORT
+#elif defined(ARROW_FLIGHT_EXPORTING)
+#define ARROW_FLIGHT_EXPORT __declspec(dllexport)
+#else
+#define ARROW_FLIGHT_EXPORT __declspec(dllimport)
+#endif
-// TODO(wesm): address when/if we add windows support
-// #include <util/syserr_reporting.hpp>
+#define ARROW_FLIGHT_NO_EXPORT
+#else // Not Windows
+#ifndef ARROW_FLIGHT_EXPORT
+#define ARROW_FLIGHT_EXPORT __attribute__((visibility("default")))
+#endif
+#ifndef ARROW_FLIGHT_NO_EXPORT
+#define ARROW_FLIGHT_NO_EXPORT __attribute__((visibility("hidden")))
+#endif
+#endif // Non-Windows
-#endif // _WIN32
+#if defined(_MSC_VER)
+#pragma warning(pop)
+#endif
diff --git a/cpp/src/arrow/io/mman.h b/cpp/src/arrow/io/mman.h
index 139ee7e..6125492 100644
--- a/cpp/src/arrow/io/mman.h
+++ b/cpp/src/arrow/io/mman.h
@@ -8,13 +8,6 @@
#ifndef _MMAN_WIN32_H
#define _MMAN_WIN32_H
-// Allow use of features specific to Windows XP or later.
-#ifndef _WIN32_WINNT
-// Change this to the appropriate value to target other versions of Windows.
-#define _WIN32_WINNT 0x0501
-
-#endif
-
#include "arrow/util/windows_compatibility.h"
#include <errno.h>
diff --git a/cpp/src/arrow/python/flight.cc b/cpp/src/arrow/python/flight.cc
index 4db3157..409ba60 100644
--- a/cpp/src/arrow/python/flight.cc
+++ b/cpp/src/arrow/python/flight.cc
@@ -20,6 +20,7 @@
#include "arrow/flight/internal.h"
#include "arrow/python/flight.h"
+#include "arrow/util/io-util.h"
#include "arrow/util/logging.h"
using arrow::flight::FlightPayload;
@@ -137,12 +138,10 @@ Status PyFlightServer::ServeWithSignals() {
// an active signal handler for SIGINT and SIGTERM.
std::vector<int> signals;
for (const int signum : {SIGINT, SIGTERM}) {
- struct sigaction handler;
- int ret = sigaction(signum, nullptr, &handler);
- if (ret != 0) {
- return Status::IOError("sigaction call failed");
- }
- if (handler.sa_handler != SIG_DFL && handler.sa_handler != SIG_IGN) {
+ ::arrow::internal::SignalHandler handler;
+ RETURN_NOT_OK(::arrow::internal::GetSignalHandler(signum, &handler));
+ auto cb = handler.callback();
+ if (cb != SIG_DFL && cb != SIG_IGN) {
signals.push_back(signum);
}
}
diff --git a/cpp/src/arrow/util/io-util.cc b/cpp/src/arrow/util/io-util.cc
index 8ff4361..2dcd05d 100644
--- a/cpp/src/arrow/util/io-util.cc
+++ b/cpp/src/arrow/util/io-util.cc
@@ -35,6 +35,7 @@
#include <utility>
#include <fcntl.h>
+#include <signal.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h> // IWYU pragma: keep
@@ -887,5 +888,80 @@ Status TemporaryDir::Make(const std::string& prefix, std::unique_ptr<TemporaryDi
return Status::OK();
}
+SignalHandler::SignalHandler() {}
+
+SignalHandler::SignalHandler(Callback cb) {
+#if ARROW_HAVE_SIGACTION
+ sa_.sa_handler = cb;
+ sa_.sa_flags = 0;
+ sigemptyset(&sa_.sa_mask);
+#else
+ cb_ = cb;
+#endif
+}
+
+#if ARROW_HAVE_SIGACTION
+SignalHandler::SignalHandler(const struct sigaction& sa) {
+ memcpy(&sa_, &sa, sizeof(sa));
+}
+#endif
+
+SignalHandler::Callback SignalHandler::callback() const {
+#if ARROW_HAVE_SIGACTION
+ return sa_.sa_handler;
+#else
+ return cb_;
+#endif
+}
+
+#if ARROW_HAVE_SIGACTION
+const struct sigaction& SignalHandler::action() const { return sa_; }
+#endif
+
+Status GetSignalHandler(int signum, SignalHandler* out) {
+#if ARROW_HAVE_SIGACTION
+ struct sigaction sa;
+ int ret = sigaction(signum, nullptr, &sa);
+ if (ret != 0) {
+ // TODO more detailed message using errno
+ return Status::IOError("sigaction call failed");
+ }
+ *out = SignalHandler(sa);
+#else
+ // To read the old handler, set the signal handler to something else temporarily
+ SignalHandler::Callback cb = signal(signum, SIG_IGN);
+ if (cb == SIG_ERR || signal(signum, cb) == SIG_ERR) {
+ // TODO more detailed message using errno
+ return Status::IOError("signal call failed");
+ }
+ *out = SignalHandler(cb);
+#endif
+ return Status::OK();
+}
+
+Status SetSignalHandler(int signum, SignalHandler handler, SignalHandler* old_handler) {
+#if ARROW_HAVE_SIGACTION
+ struct sigaction old_sa;
+ int ret = sigaction(signum, &handler.action(), &old_sa);
+ if (ret != 0) {
+ // TODO more detailed message using errno
+ return Status::IOError("sigaction call failed");
+ }
+ if (old_handler != nullptr) {
+ *old_handler = SignalHandler(old_sa);
+ }
+#else
+ SignalHandler::Callback cb = signal(signum, handler.callback());
+ if (cb == SIG_ERR) {
+ // TODO more detailed message using errno
+ return Status::IOError("signal call failed");
+ }
+ if (old_handler != nullptr) {
+ *old_handler = SignalHandler(cb);
+ }
+#endif
+ return Status::OK();
+}
+
} // namespace internal
} // namespace arrow
diff --git a/cpp/src/arrow/util/io-util.h b/cpp/src/arrow/util/io-util.h
index 5d01f40..2b48a5c 100644
--- a/cpp/src/arrow/util/io-util.h
+++ b/cpp/src/arrow/util/io-util.h
@@ -18,9 +18,17 @@
#ifndef ARROW_UTIL_IO_UTIL_H
#define ARROW_UTIL_IO_UTIL_H
+#ifndef _WIN32
+#define ARROW_HAVE_SIGACTION 1
+#endif
+
#include <memory>
#include <string>
+#if ARROW_HAVE_SIGACTION
+#include <signal.h> // Needed for struct sigaction
+#endif
+
#include "arrow/io/interfaces.h"
#include "arrow/status.h"
#include "arrow/util/macros.h"
@@ -218,6 +226,37 @@ class ARROW_EXPORT TemporaryDir {
explicit TemporaryDir(PlatformFilename&&);
};
+class ARROW_EXPORT SignalHandler {
+ public:
+ typedef void (*Callback)(int);
+
+ SignalHandler();
+ explicit SignalHandler(Callback cb);
+#if ARROW_HAVE_SIGACTION
+ explicit SignalHandler(const struct sigaction& sa);
+#endif
+
+ Callback callback() const;
+#if ARROW_HAVE_SIGACTION
+ const struct sigaction& action() const;
+#endif
+
+ protected:
+#if ARROW_HAVE_SIGACTION
+ // Storing the full sigaction allows to restore the entire signal handling
+ // configuration.
+ struct sigaction sa_;
+#else
+ Callback cb_;
+#endif
+};
+
+ARROW_EXPORT
+Status GetSignalHandler(int signum, SignalHandler* out);
+ARROW_EXPORT
+Status SetSignalHandler(int signum, SignalHandler handler,
+ SignalHandler* old_handler = NULLPTR);
+
} // namespace internal
} // namespace arrow
diff --git a/cpp/src/arrow/util/windows_compatibility.h b/cpp/src/arrow/util/windows_compatibility.h
index 7b70e28..70c4313 100644
--- a/cpp/src/arrow/util/windows_compatibility.h
+++ b/cpp/src/arrow/util/windows_compatibility.h
@@ -26,10 +26,15 @@
#define WIN32_LEAN_AND_MEAN
+// Set Windows 7 as a conservative minimum for Apache Arrow
+#if defined(_WIN32_WINNT) && _WIN32_WINNT < 0x601
+#undef _WIN32_WINNT
+#endif
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x601
+#endif
+
#include <winsock2.h>
#include <windows.h>
-// TODO(wesm): address when/if we add windows support
-// #include <util/syserr_reporting.hpp>
-
#endif // _WIN32
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index 435daeb..6d8d16f 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -493,7 +493,7 @@ endif()
# Flight
if(PYARROW_BUILD_FLIGHT)
if(PYARROW_BUNDLE_ARROW_CPP)
- # TODO:
+ # TODO: need to implement FindArrowFlight.cmake first
message(FATAL_ERROR "Not yet implemented: bundling arrow-flight in pyarrow")
endif()
# We do NOT want to link gRPC or any other Flight dependency
diff --git a/python/pyarrow/tests/test_flight.py b/python/pyarrow/tests/test_flight.py
index 907d4f2..9ce2264 100644
--- a/python/pyarrow/tests/test_flight.py
+++ b/python/pyarrow/tests/test_flight.py
@@ -18,6 +18,7 @@
import base64
import contextlib
+import os
import socket
import tempfile
import threading
@@ -319,8 +320,10 @@ def test_flight_get_info():
flight.Location.for_grpc_tcp('localhost', 5005)
+@pytest.mark.skipif(os.name == 'nt',
+ reason="Unix sockets can't be tested on Windows")
def test_flight_domain_socket():
- """Try a simple do_get call over a domain socket."""
+ """Try a simple do_get call over a Unix domain socket."""
table = simple_ints_table()
with tempfile.NamedTemporaryFile() as sock:
@@ -395,7 +398,7 @@ def test_timeout_passes():
"""Make sure timeouts do not fire on fast requests."""
with flight_server(ConstantFlightServer) as server_location:
client = flight.FlightClient.connect(server_location)
- options = flight.FlightCallOptions(timeout=0.2)
+ options = flight.FlightCallOptions(timeout=5.0)
client.do_get(flight.Ticket(b'ints'), options=options).read_all()