You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/10/28 08:08:03 UTC
[ignite-3] 01/01: IGNITE-18007 MacOS support
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch ignite-18007
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 57a00c40c2a9dd0d092497b922a88a087d5a3df3
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Fri Oct 28 12:07:21 2022 +0400
IGNITE-18007 MacOS support
---
modules/platforms/cpp/DEVNOTES.md | 26 +-
modules/platforms/cpp/ignite/common/config.h | 12 +-
.../platforms/cpp/ignite/network/CMakeLists.txt | 17 +-
.../network/detail/macos/connecting_context.cpp | 94 +++++++
.../network/detail/macos/connecting_context.h | 92 ++++++
.../network/detail/macos/macos_async_client.cpp | 170 +++++++++++
.../network/detail/macos/macos_async_client.h | 217 ++++++++++++++
.../detail/macos/macos_async_client_pool.cpp | 180 ++++++++++++
.../network/detail/macos/macos_async_client_pool.h | 185 ++++++++++++
.../detail/macos/macos_async_worker_thread.cpp | 311 +++++++++++++++++++++
.../detail/macos/macos_async_worker_thread.h | 158 +++++++++++
.../cpp/ignite/network/detail/macos/sockets.cpp | 115 ++++++++
.../cpp/ignite/network/detail/macos/sockets.h | 59 ++++
.../{network.cpp => detail/macos/utils.cpp} | 39 ++-
.../cpp/ignite/network/length_prefix_codec.cpp | 4 +-
modules/platforms/cpp/ignite/network/network.cpp | 11 +-
.../cpp/ignite/protocol/buffer_adapter.cpp | 2 +-
.../platforms/cpp/tests/client-test/CMakeLists.txt | 2 +-
modules/platforms/cpp/tests/client-test/main.cpp | 28 +-
.../cpp/tests/test-common/detail/linux_process.h | 86 +-----
.../cpp/tests/test-common/detail/macos_process.h | 65 +++++
.../detail/{linux_process.h => unix_process.h} | 51 ++--
.../cpp/tests/test-common/ignite_runner.cpp | 4 +-
.../platforms/cpp/tests/test-common/process.cpp | 13 +-
24 files changed, 1788 insertions(+), 153 deletions(-)
diff --git a/modules/platforms/cpp/DEVNOTES.md b/modules/platforms/cpp/DEVNOTES.md
index e0c7c243bb..36177f0309 100644
--- a/modules/platforms/cpp/DEVNOTES.md
+++ b/modules/platforms/cpp/DEVNOTES.md
@@ -17,7 +17,7 @@ mkdir cmake-build-debug
cd cmake-build-debug
conan install .. --build=missing -s build_type=Debug
cmake .. -DENABLE_TESTS=ON
-cmake --build . -j8
+cmake --build . -j8
```
### For Linux Developers
@@ -27,7 +27,17 @@ mkdir cmake-build-debug
cd cmake-build-debug
conan install .. --build=missing -s build_type=Debug -s compiler.libcxx=libstdc++11
cmake .. -DENABLE_TESTS=ON -DCMAKE_BUILD_TYPE=Debug
-cmake --build . -j8
+cmake --build . -j8
+```
+
+### For macOS Developers
+Building in debug mode with tests. In this dir:
+```shell
+mkdir cmake-build-debug
+cd cmake-build-debug
+conan install .. --build=missing -s build_type=Debug -s compiler.libcxx=libc++
+cmake .. -DENABLE_TESTS=ON -DCMAKE_BUILD_TYPE=Debug
+cmake --build . -j8
```
### For Windows users
@@ -47,7 +57,17 @@ mkdir cmake-build-release
cd cmake-build-release
conan install .. --build=missing -s build_type=Release -s compiler.libcxx=libstdc++11
cmake .. -DENABLE_TESTS=ON -DCMAKE_BUILD_TYPE=Release
-cmake --build . -j8
+cmake --build . -j8
+```
+
+### For macOS users
+Building in release mode without tests. In this dir:
+```shell
+mkdir cmake-build-release
+cd cmake-build-release
+conan install .. --build=missing -s build_type=Release -s compiler.libcxx=libc++
+cmake .. -DENABLE_TESTS=ON -DCMAKE_BUILD_TYPE=Release
+cmake --build . -j8
```
## Run Tests
diff --git a/modules/platforms/cpp/ignite/common/config.h b/modules/platforms/cpp/ignite/common/config.h
index 6274f9f733..10a05dbb65 100644
--- a/modules/platforms/cpp/ignite/common/config.h
+++ b/modules/platforms/cpp/ignite/common/config.h
@@ -51,10 +51,12 @@
#endif
/**
- * Macro IGNITE_SWITCH_WIN_OTHER that uses first option on Windows and second on any other OS.
+ * Macro IGNITE_OS_SWITCH that uses first option on Windows, second on Linux and third on MacOS.
*/
-#ifdef WIN32
-# define IGNITE_SWITCH_WIN_OTHER(x, y) x
-#else
-# define IGNITE_SWITCH_WIN_OTHER(x, y) y
+#ifdef _WIN32
+# define IGNITE_OS_SWITCH(win, lin, mac) win
+#elif __linux__
+# define IGNITE_OS_SWITCH(win, lin, mac) lin
+#elif __APPLE__
+# define IGNITE_OS_SWITCH(win, lin, mac) mac
#endif
diff --git a/modules/platforms/cpp/ignite/network/CMakeLists.txt b/modules/platforms/cpp/ignite/network/CMakeLists.txt
index d69976fce1..1ed9dfd1ee 100644
--- a/modules/platforms/cpp/ignite/network/CMakeLists.txt
+++ b/modules/platforms/cpp/ignite/network/CMakeLists.txt
@@ -37,7 +37,16 @@ if (WIN32)
detail/win/win_async_connecting_thread.cpp
detail/win/win_async_worker_thread.cpp
)
-else()
+elseif(APPLE)
+ list(APPEND SOURCES
+ detail/macos/connecting_context.cpp
+ detail/macos/macos_async_client.cpp
+ detail/macos/macos_async_client_pool.cpp
+ detail/macos/macos_async_worker_thread.cpp
+ detail/macos/sockets.cpp
+ detail/macos/utils.cpp
+ )
+elseif(UNIX)
list(APPEND SOURCES
detail/linux/connecting_context.cpp
detail/linux/linux_async_client.cpp
@@ -57,5 +66,11 @@ if (WIN32)
target_link_libraries(${TARGET} wsock32 ws2_32 iphlpapi crypt32)
endif()
+if (APPLE)
+ find_package(epoll-shim REQUIRED)
+ target_link_libraries(${TARGET} epoll-shim::epoll-shim)
+ add_compile_definitions(EPOLL_SHIM_NO_VARIADICS)
+endif()
+
set_target_properties(${TARGET} PROPERTIES VERSION ${CMAKE_PROJECT_VERSION})
set_target_properties(${TARGET} PROPERTIES POSITION_INDEPENDENT_CODE 1)
diff --git a/modules/platforms/cpp/ignite/network/detail/macos/connecting_context.cpp b/modules/platforms/cpp/ignite/network/detail/macos/connecting_context.cpp
new file mode 100644
index 0000000000..dcf57627ec
--- /dev/null
+++ b/modules/platforms/cpp/ignite/network/detail/macos/connecting_context.cpp
@@ -0,0 +1,94 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "connecting_context.h"
+
+#include <cstring>
+#include <iterator>
+
+#include <netdb.h>
+#include <sys/socket.h>
+
+namespace ignite::network::detail {
+
+connecting_context::connecting_context(tcp_range range)
+ : m_range(std::move(range))
+ , m_next_port(range.port)
+ , m_info(nullptr)
+ , m_current_info(nullptr) {
+}
+
+connecting_context::~connecting_context() {
+ reset();
+}
+
+void connecting_context::reset() {
+ if (m_info) {
+ freeaddrinfo(m_info);
+ m_info = nullptr;
+ m_current_info = nullptr;
+ }
+
+ m_next_port = m_range.port;
+}
+
+addrinfo *connecting_context::next() {
+ if (m_current_info)
+ m_current_info = m_current_info->ai_next;
+
+ while (!m_current_info) {
+ if (m_info) {
+ freeaddrinfo(m_info);
+ m_info = nullptr;
+ }
+
+ if (m_next_port > m_range.port + m_range.range)
+ return nullptr;
+
+ addrinfo hints{};
+ std::memset(&hints, 0, sizeof(hints));
+
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_protocol = IPPROTO_TCP;
+
+ std::string strPort = std::to_string(m_next_port);
+
+ // Resolve the server address and port
+ int res = getaddrinfo(m_range.host.c_str(), strPort.c_str(), &hints, &m_info);
+ if (res != 0)
+ return nullptr;
+
+ m_current_info = m_info;
+ ++m_next_port;
+ }
+
+ return m_current_info;
+}
+
+end_point connecting_context::current_address() const {
+ if (!m_current_info)
+ throw ignite_error("There is no current address");
+
+ return {m_range.host, uint16_t(m_next_port - 1)};
+}
+
+std::shared_ptr<macos_async_client> connecting_context::to_client(int fd) {
+ return std::make_shared<macos_async_client>(fd, current_address(), m_range);
+}
+
+} // namespace ignite::network::detail
diff --git a/modules/platforms/cpp/ignite/network/detail/macos/connecting_context.h b/modules/platforms/cpp/ignite/network/detail/macos/connecting_context.h
new file mode 100644
index 0000000000..c3a61ab479
--- /dev/null
+++ b/modules/platforms/cpp/ignite/network/detail/macos/connecting_context.h
@@ -0,0 +1,92 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#pragma once
+
+#include "macos_async_client.h"
+
+#include <ignite/network/end_point.h>
+#include <ignite/network/tcp_range.h>
+
+#include <cstdint>
+#include <memory>
+
+#include <netdb.h>
+
+namespace ignite::network::detail {
+
+/**
+* Connecting context.
+*/
+class connecting_context {
+public:
+ // Default
+ connecting_context(connecting_context &&) = default;
+ connecting_context &operator=(connecting_context &&) = default;
+
+ /**
+ * Constructor.
+ */
+ explicit connecting_context(tcp_range range);
+
+ /**
+ * Destructor.
+ */
+ ~connecting_context();
+
+ /**
+ * Reset connection context to it's initial state.
+ */
+ void reset();
+
+ /**
+ * Next address in range.
+ *
+ * @return Next address info for connection.
+ */
+ addrinfo *next();
+
+ /**
+ * Get last address.
+ *
+ * @return Address.
+ */
+ end_point current_address() const;
+
+ /**
+ * Make client.
+ *
+ * @param fd Socket file descriptor.
+ * @return Client instance from current internal state.
+ */
+ std::shared_ptr<macos_async_client> to_client(int fd);
+
+private:
+ /** Range. */
+ tcp_range m_range;
+
+ /** Next port. */
+ uint16_t m_next_port;
+
+ /** Current address info. */
+ addrinfo *m_info;
+
+ /** Address info which is currently used for connection */
+ addrinfo *m_current_info;
+};
+
+} // namespace ignite::network::detail
diff --git a/modules/platforms/cpp/ignite/network/detail/macos/macos_async_client.cpp b/modules/platforms/cpp/ignite/network/detail/macos/macos_async_client.cpp
new file mode 100644
index 0000000000..f4f6f721ef
--- /dev/null
+++ b/modules/platforms/cpp/ignite/network/detail/macos/macos_async_client.cpp
@@ -0,0 +1,170 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "macos_async_client.h"
+
+
+#include <algorithm>
+#include <cstring>
+
+#include <sys/epoll.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+#undef close
+
+namespace ignite::network::detail {
+
+macos_async_client::macos_async_client(int fd, end_point addr, tcp_range range)
+ : m_state(state::CONNECTED)
+ , m_fd(fd)
+ , m_epoll(-1)
+ , m_id(0)
+ , m_addr(std::move(addr))
+ , m_range(std::move(range))
+ , m_send_packets()
+ , m_send_mutex()
+ , m_recv_packet(BUFFER_SIZE)
+ , m_close_err() {
+}
+
+macos_async_client::~macos_async_client() {
+ shutdown(std::nullopt);
+
+ close();
+}
+
+bool macos_async_client::shutdown(std::optional<ignite_error> err) {
+ std::lock_guard<std::mutex> lock(m_send_mutex);
+ if (m_state != state::CONNECTED)
+ return false;
+
+ m_close_err = err ? std::move(*err) : ignite_error("Connection closed by application");
+ ::shutdown(m_fd, SHUT_RDWR);
+ m_state = state::SHUTDOWN;
+
+ return true;
+}
+
+bool macos_async_client::close() {
+ if (state::CLOSED == m_state)
+ return false;
+
+ stop_monitoring();
+ ::close(m_fd);
+ m_fd = -1;
+ m_state = state::CLOSED;
+
+ return true;
+}
+
+bool macos_async_client::send(std::vector<std::byte> &&data) {
+ std::lock_guard<std::mutex> lock(m_send_mutex);
+
+ m_send_packets.emplace_back(std::move(data));
+ if (m_send_packets.size() > 1)
+ return true;
+
+ return send_next_packet_locked();
+}
+
+bool macos_async_client::send_next_packet_locked() {
+ if (m_send_packets.empty())
+ return true;
+
+ auto &packet = m_send_packets.front();
+ auto dataView = packet.get_bytes_view();
+
+ ssize_t ret = ::send(m_fd, dataView.data(), dataView.size(), 0);
+ if (ret < 0)
+ return false;
+
+ packet.skip(static_cast<int32_t>(ret));
+
+ enable_send_notifications();
+
+ return true;
+}
+
+bytes_view macos_async_client::receive() {
+ ssize_t res = recv(m_fd, m_recv_packet.data(), m_recv_packet.size(), 0);
+ if (res < 0)
+ return {};
+
+ return {m_recv_packet.data(), size_t(res)};
+}
+
+bool macos_async_client::start_monitoring(int epoll0) {
+ if (epoll0 < 0)
+ return false;
+
+ epoll_event event{};
+ memset(&event, 0, sizeof(event));
+ event.data.ptr = this;
+ event.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP;
+
+ int res = epoll_ctl(epoll0, EPOLL_CTL_ADD, m_fd, &event);
+ if (res < 0)
+ return false;
+
+ m_epoll = epoll0;
+
+ return true;
+}
+
+void macos_async_client::stop_monitoring() // NOLINT(readability-make-member-function-const)
+{
+ epoll_event event{};
+ memset(&event, 0, sizeof(event));
+
+ epoll_ctl(m_epoll, EPOLL_CTL_DEL, m_fd, &event);
+}
+
+void macos_async_client::enable_send_notifications() {
+ epoll_event event{};
+ memset(&event, 0, sizeof(event));
+ event.data.ptr = this;
+ event.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP;
+
+ epoll_ctl(m_epoll, EPOLL_CTL_MOD, m_fd, &event);
+}
+
+void macos_async_client::disable_send_notifications() {
+ epoll_event event{};
+ memset(&event, 0, sizeof(event));
+ event.data.ptr = this;
+ event.events = EPOLLIN | EPOLLRDHUP;
+
+ epoll_ctl(m_epoll, EPOLL_CTL_MOD, m_fd, &event);
+}
+
+bool macos_async_client::process_sent() {
+ std::lock_guard<std::mutex> lock(m_send_mutex);
+
+ if (m_send_packets.empty()) {
+ disable_send_notifications();
+
+ return true;
+ }
+
+ if (m_send_packets.front().empty())
+ m_send_packets.pop_front();
+
+ return send_next_packet_locked();
+}
+
+} // namespace ignite::network::detail
diff --git a/modules/platforms/cpp/ignite/network/detail/macos/macos_async_client.h b/modules/platforms/cpp/ignite/network/detail/macos/macos_async_client.h
new file mode 100644
index 0000000000..36f40d0181
--- /dev/null
+++ b/modules/platforms/cpp/ignite/network/detail/macos/macos_async_client.h
@@ -0,0 +1,217 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#pragma once
+
+#include "sockets.h"
+
+#include <ignite/network/async_handler.h>
+#include <ignite/network/codec.h>
+#include <ignite/network/end_point.h>
+#include <ignite/network/tcp_range.h>
+
+#include <cstdint>
+#include <deque>
+#include <memory>
+#include <mutex>
+
+namespace ignite::network::detail {
+
+/**
+* Linux-specific implementation of async network client.
+*/
+class macos_async_client {
+ /**
+ * State.
+ */
+ enum class state {
+ CONNECTED,
+
+ SHUTDOWN,
+
+ CLOSED,
+ };
+
+public:
+ static constexpr size_t BUFFER_SIZE = 0x10000;
+
+ /**
+ * Constructor.
+ *
+ * @param fd Socket file descriptor.
+ * @param addr Address.
+ * @param range Range.
+ */
+ macos_async_client(int fd, end_point addr, tcp_range range);
+
+ /**
+ * Destructor.
+ *
+ * Should not be destructed from external threads.
+ * Can be destructed from WorkerThread.
+ */
+ ~macos_async_client();
+
+ /**
+ * Shutdown client.
+ *
+ * Can be called from external threads.
+ * Can be called from WorkerThread.
+ *
+ * @param err Error message. Can be null.
+ * @return @c true if shutdown performed successfully.
+ */
+ bool shutdown(std::optional<ignite_error> err);
+
+ /**
+ * Close client.
+ *
+ * Should not be called from external threads.
+ * Can be called from WorkerThread.
+ *
+ * @return @c true if shutdown performed successfully.
+ */
+ bool close();
+
+ /**
+ * Send packet using client.
+ *
+ * @param data Data to send.
+ * @return @c true on success.
+ */
+ bool send(std::vector<std::byte> &&data);
+
+ /**
+ * Initiate next receive of data.
+ *
+ * @return @c true on success.
+ */
+ bytes_view receive();
+
+ /**
+ * Process sent data.
+ *
+ * @return @c true on success.
+ */
+ bool process_sent();
+
+ /**
+ * Start monitoring client.
+ *
+ * @param epoll Epoll file descriptor.
+ * @return @c true on success.
+ */
+ bool start_monitoring(int epoll);
+
+ /**
+ * Stop monitoring client.
+ */
+ void stop_monitoring();
+
+ /**
+ * Enable epoll notifications.
+ */
+ void enable_send_notifications();
+
+ /**
+ * Disable epoll notifications.
+ */
+ void disable_send_notifications();
+
+ /**
+ * Get client ID.
+ *
+ * @return Client ID.
+ */
+ [[nodiscard]] uint64_t id() const { return m_id; }
+
+ /**
+ * Set ID.
+ *
+ * @param id ID to set.
+ */
+ void set_id(uint64_t id) { m_id = id; }
+
+ /**
+ * Get address.
+ *
+ * @return Address.
+ */
+ [[nodiscard]] const end_point &address() const { return m_addr; }
+
+ /**
+ * Get range.
+ *
+ * @return Range.
+ */
+ [[nodiscard]] const tcp_range &get_range() const { return m_range; }
+
+ /**
+ * Check whether client is closed.
+ *
+ * @return @c true if closed.
+ */
+ [[nodiscard]] bool is_closed() const { return m_state == state::CLOSED; }
+
+ /**
+ * Get closing error for the connection. Can be IGNITE_SUCCESS.
+ *
+ * @return Connection error.
+ */
+ [[nodiscard]] const ignite_error &get_close_error() const { return m_close_err; }
+
+private:
+ /**
+ * Send next packet in queue.
+ *
+ * @warning Can only be called when holding m_send_mutex lock.
+ * @return @c true on success.
+ */
+ bool send_next_packet_locked();
+
+ /** State. */
+ state m_state;
+
+ /** Socket file descriptor. */
+ int m_fd;
+
+ /** Epoll file descriptor. */
+ int m_epoll;
+
+ /** Connection ID. */
+ uint64_t m_id;
+
+ /** Server end point. */
+ end_point m_addr;
+
+ /** Address range associated with current connection. */
+ tcp_range m_range;
+
+ /** Packets that should be sent. */
+ std::deque<data_buffer_owning> m_send_packets;
+
+ /** Send critical section. */
+ std::mutex m_send_mutex;
+
+ /** Packet that is currently received. */
+ std::vector<std::byte> m_recv_packet;
+
+ /** Closing error. */
+ ignite_error m_close_err;
+};
+
+} // namespace ignite::network::detail
diff --git a/modules/platforms/cpp/ignite/network/detail/macos/macos_async_client_pool.cpp b/modules/platforms/cpp/ignite/network/detail/macos/macos_async_client_pool.cpp
new file mode 100644
index 0000000000..f589871857
--- /dev/null
+++ b/modules/platforms/cpp/ignite/network/detail/macos/macos_async_client_pool.cpp
@@ -0,0 +1,180 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "macos_async_client_pool.h"
+
+#include "../utils.h"
+
+#include <algorithm>
+
+namespace ignite::network::detail {
+
+macos_async_client_pool::macos_async_client_pool()
+ : m_stopping(true)
+ , m_async_handler()
+ , m_worker_thread(*this)
+ , m_id_gen(0)
+ , m_clients_mutex()
+ , m_client_id_map() {
+}
+
+macos_async_client_pool::~macos_async_client_pool() {
+ internal_stop();
+}
+
+void macos_async_client_pool::start(const std::vector<tcp_range> addrs, uint32_t conn_limit) {
+ if (!m_stopping)
+ throw ignite_error("Client pool is already started");
+
+ m_id_gen = 0;
+ m_stopping = false;
+
+ try {
+ m_worker_thread.start(conn_limit, addrs);
+ } catch (...) {
+ stop();
+
+ throw;
+ }
+}
+
+void macos_async_client_pool::stop() {
+ internal_stop();
+}
+
+bool macos_async_client_pool::send(uint64_t id, std::vector<std::byte> &&data) {
+ if (m_stopping)
+ return false;
+
+ auto client = find_client(id);
+ if (!client)
+ return false;
+
+ return client->send(std::move(data));
+}
+
+void macos_async_client_pool::close(uint64_t id, std::optional<ignite_error> err) {
+ if (m_stopping)
+ return;
+
+ std::shared_ptr<macos_async_client> client = find_client(id);
+ if (client && !client->is_closed())
+ client->shutdown(std::move(err));
+}
+
+void macos_async_client_pool::close_and_release(uint64_t id, std::optional<ignite_error> err) {
+ if (m_stopping)
+ return;
+
+ std::shared_ptr<macos_async_client> client;
+ {
+ std::lock_guard<std::mutex> lock(m_clients_mutex);
+
+ auto it = m_client_id_map.find(id);
+ if (it == m_client_id_map.end())
+ return;
+
+ client = it->second;
+
+ m_client_id_map.erase(it);
+ }
+
+ bool closed = client->close();
+ if (closed) {
+ ignite_error err0(client->get_close_error());
+ if (err0.get_status_code() == status_code::SUCCESS)
+ err0 = ignite_error(status_code::NETWORK, "Connection closed by server");
+
+ if (!err)
+ err = std::move(err0);
+
+ handle_connection_closed(id, err);
+ }
+}
+
+bool macos_async_client_pool::add_client(std::shared_ptr<macos_async_client> client) {
+ if (m_stopping)
+ return false;
+
+ auto client_addr = client->address();
+ uint64_t client_id;
+ {
+ std::lock_guard<std::mutex> lock(m_clients_mutex);
+
+ client_id = ++m_id_gen;
+ client->set_id(client_id);
+
+ m_client_id_map[client_id] = std::move(client);
+ }
+
+ handle_connection_success(client_addr, client_id);
+
+ return true;
+}
+
+void macos_async_client_pool::handle_connection_error(const end_point &addr, ignite_error err) {
+ if (auto handler = m_async_handler.lock())
+ handler->on_connection_error(addr, std::move(err));
+}
+
+void macos_async_client_pool::handle_connection_success(const end_point &addr, uint64_t id) {
+ if (auto handler = m_async_handler.lock())
+ handler->on_connection_success(addr, id);
+}
+
+void macos_async_client_pool::handle_connection_closed(uint64_t id, std::optional<ignite_error> err) {
+ if (auto handler = m_async_handler.lock())
+ handler->on_connection_closed(id, std::move(err));
+}
+
+void macos_async_client_pool::handle_nessage_received(uint64_t id, bytes_view msg) {
+ if (auto handler = m_async_handler.lock())
+ handler->on_message_received(id, msg);
+}
+
+void macos_async_client_pool::handle_message_sent(uint64_t id) {
+ if (auto handler = m_async_handler.lock())
+ handler->on_message_sent(id);
+}
+
+void macos_async_client_pool::internal_stop() {
+ m_stopping = true;
+ m_worker_thread.stop();
+
+ {
+ std::lock_guard<std::mutex> lock(m_clients_mutex);
+
+ for (auto [_, client] : m_client_id_map) {
+ ignite_error err("Client stopped");
+ handle_connection_closed(client->id(), err);
+ }
+
+ m_client_id_map.clear();
+ }
+}
+
+std::shared_ptr<macos_async_client> macos_async_client_pool::find_client(uint64_t id) const {
+ std::lock_guard<std::mutex> lock(m_clients_mutex);
+
+ auto it = m_client_id_map.find(id);
+ if (it == m_client_id_map.end())
+ return {};
+
+ return it->second;
+}
+
+} // namespace ignite::network::detail
diff --git a/modules/platforms/cpp/ignite/network/detail/macos/macos_async_client_pool.h b/modules/platforms/cpp/ignite/network/detail/macos/macos_async_client_pool.h
new file mode 100644
index 0000000000..0511dc11fc
--- /dev/null
+++ b/modules/platforms/cpp/ignite/network/detail/macos/macos_async_client_pool.h
@@ -0,0 +1,185 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#pragma once
+
+#include "macos_async_client.h"
+#include "macos_async_worker_thread.h"
+
+#include <ignite/common/ignite_error.h>
+#include <ignite/network/async_client_pool.h>
+#include <ignite/network/async_handler.h>
+#include <ignite/network/tcp_range.h>
+
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <mutex>
+
+namespace ignite::network::detail {
+
+/**
+* Linux-specific implementation of asynchronous client pool.
+*/
+class macos_async_client_pool : public async_client_pool {
+public:
+ /**
+ * Constructor
+ *
+ * @param handler Upper level event handler.
+ */
+ macos_async_client_pool();
+
+ /**
+ * Destructor.
+ */
+ ~macos_async_client_pool() override;
+
+ /**
+ * Start internal thread that establishes connections to provided addresses and asynchronously sends and
+ * receives messages from them. Function returns either when thread is started and first connection is
+ * established or failure happened.
+ *
+ * @param addrs Addresses to connect to.
+ * @param conn_limit Connection upper limit. Zero means limit is disabled.
+ *
+ * @throw IgniteError on error.
+ */
+ void start(std::vector<tcp_range> addrs, uint32_t conn_limit) override;
+
+ /**
+ * Close all established connections and stops handling thread.
+ */
+ void stop() override;
+
+ /**
+ * Set handler.
+ *
+ * @param handler Handler to set.
+ */
+ void set_handler(std::weak_ptr<async_handler> handler) override { m_async_handler = std::move(handler); }
+
+ /**
+ * Send data to specific established connection.
+ *
+ * @param id Client ID.
+ * @param data Data to be sent.
+ * @return @c true if connection is present and @c false otherwise.
+ *
+ * @throw IgniteError on error.
+ */
+ bool send(uint64_t id, std::vector<std::byte> &&data) override;
+
+ /**
+ * Closes specified connection if it's established. Connection to the specified address is planned for
+ * re-connect. Event is issued to the handler with specified error.
+ *
+ * @param id Client ID.
+ */
+ void close(uint64_t id, std::optional<ignite_error> err) override;
+
+ /**
+ * Closes and releases memory allocated for client with specified ID.
+ * Error is reported to handler.
+ *
+ * @param id Client ID.
+ * @param err Error to report. May be null.
+ * @return @c true if connection with specified ID was found.
+ */
+ void close_and_release(uint64_t id, std::optional<ignite_error> err);
+
+ /**
+ * Add client to connection map. Notify user.
+ *
+ * @param client Client.
+ * @return Client ID.
+ */
+ bool add_client(std::shared_ptr<macos_async_client> client);
+
+ /**
+ * Handle error during connection establishment.
+ *
+ * @param addr Connection address.
+ * @param err Error.
+ */
+ void handle_connection_error(const end_point &addr, ignite_error err);
+
+ /**
+ * Handle successful connection establishment.
+ *
+ * @param addr Address of the new connection.
+ * @param id Connection ID.
+ */
+ void handle_connection_success(const end_point &addr, uint64_t id);
+
+ /**
+ * Handle error during connection establishment.
+ *
+ * @param id Async client ID.
+ * @param err Error. Can be null if connection closed without error.
+ */
+ void handle_connection_closed(uint64_t id, std::optional<ignite_error> err);
+
+ /**
+ * Handle new message.
+ *
+ * @param id Async client ID.
+ * @param msg Received message.
+ */
+ void handle_nessage_received(uint64_t id, bytes_view msg);
+
+ /**
+ * Handle sent message event.
+ *
+ * @param id Async client ID.
+ */
+ void handle_message_sent(uint64_t id);
+
+private:
+ /**
+ * Close all established connections and stops handling threads.
+ */
+ void internal_stop();
+
+ /**
+ * Find client by ID.
+ *
+ * @param id Client ID.
+ * @return Client. Null pointer if is not found.
+ */
+ std::shared_ptr<macos_async_client> find_client(uint64_t id) const;
+
+ /** Flag indicating that pool is stopping. */
+ volatile bool m_stopping;
+
+ /** Event handler. */
+ std::weak_ptr<async_handler> m_async_handler;
+
+ /** Worker thread. */
+ macos_async_worker_thread m_worker_thread;
+
+ /** ID counter. */
+ uint64_t m_id_gen;
+
+ /** Clients critical section. */
+ mutable std::mutex m_clients_mutex;
+
+ /** Client mapping ID -> client */
+ std::map<uint64_t, std::shared_ptr<macos_async_client>> m_client_id_map;
+};
+
+} // namespace ignite::network::detail
diff --git a/modules/platforms/cpp/ignite/network/detail/macos/macos_async_worker_thread.cpp b/modules/platforms/cpp/ignite/network/detail/macos/macos_async_worker_thread.cpp
new file mode 100644
index 0000000000..33ec7aa5ef
--- /dev/null
+++ b/modules/platforms/cpp/ignite/network/detail/macos/macos_async_worker_thread.cpp
@@ -0,0 +1,311 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "macos_async_worker_thread.h"
+
+#include "../utils.h"
+#include "macos_async_client_pool.h"
+
+#include <algorithm>
+#include <cstring>
+
+#include <netdb.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+#undef close
+
+namespace ignite::network::detail {
+
+namespace {
+
+fibonacci_sequence<10> fibonacci10;
+
+} // ignite::network::detail
+
+macos_async_worker_thread::macos_async_worker_thread(macos_async_client_pool &client_pool)
+ : m_client_pool(client_pool)
+ , m_stopping(true)
+ , m_epoll(-1)
+ , m_stop_event(-1)
+ , m_non_connected()
+ , m_current_connection()
+ , m_current_client()
+ , m_failed_attempts(0)
+ , m_last_connection_time()
+ , m_min_addrs(0)
+ , m_thread() {
+ memset(&m_last_connection_time, 0, sizeof(m_last_connection_time));
+}
+
+macos_async_worker_thread::~macos_async_worker_thread() {
+ stop();
+}
+
+void macos_async_worker_thread::start(size_t limit, std::vector<tcp_range> addrs) {
+ m_epoll = epoll_create(1);
+ if (m_epoll < 0)
+ throw_last_system_error("Failed to create epoll instance");
+
+ m_stop_event = eventfd(0, EFD_NONBLOCK);
+ if (m_stop_event < 0) {
+ std::string msg = get_last_system_error("Failed to create stop event instance", "");
+ epoll_shim_close(m_stop_event);
+ throw ignite_error(status_code::OS, msg);
+ }
+
+ epoll_event event{};
+ memset(&event, 0, sizeof(event));
+
+ event.events = EPOLLIN;
+
+ int res = epoll_ctl(m_epoll, EPOLL_CTL_ADD, m_stop_event, &event);
+ if (res < 0) {
+ std::string msg = get_last_system_error("Failed to create stop event instance", "");
+ epoll_shim_close(m_stop_event);
+ epoll_shim_close(m_epoll);
+ throw ignite_error(status_code::OS, msg);
+ }
+
+ m_stopping = false;
+ m_failed_attempts = 0;
+ m_non_connected = std::move(addrs);
+
+ m_current_connection.reset();
+ m_current_client.reset();
+
+ if (!limit || limit > m_non_connected.size())
+ m_min_addrs = 0;
+ else
+ m_min_addrs = m_non_connected.size() - limit;
+
+ m_thread = std::thread(&macos_async_worker_thread::run, this);
+}
+
+void macos_async_worker_thread::stop() {
+ if (m_stopping)
+ return;
+
+ m_stopping = true;
+
+ int64_t value = 1;
+ ssize_t res = epoll_shim_write(m_stop_event, &value, sizeof(value));
+
+ (void)res;
+ assert(res == sizeof(value));
+
+ m_thread.join();
+
+ epoll_shim_close(m_stop_event);
+ epoll_shim_close(m_epoll);
+
+ m_non_connected.clear();
+ m_current_connection.reset();
+}
+
+void macos_async_worker_thread::run() {
+ while (!m_stopping) {
+ handle_new_connections();
+
+ if (m_stopping)
+ break;
+
+ handle_connection_events();
+ }
+}
+
+void macos_async_worker_thread::handle_new_connections() {
+ if (!should_initiate_new_connection())
+ return;
+
+ if (calculate_connection_timeout() > 0)
+ return;
+
+ addrinfo *addr = nullptr;
+ if (m_current_connection)
+ addr = m_current_connection->next();
+
+ if (!addr) {
+ // TODO: Use round-robin instead.
+ size_t idx = rand() % m_non_connected.size();
+ const tcp_range &range = m_non_connected.at(idx);
+
+ m_current_connection = std::make_unique<connecting_context>(range);
+ addr = m_current_connection->next();
+ if (!addr) {
+ m_current_connection.reset();
+ report_connection_error(end_point(), "Can not resolve a single address from range: " + range.to_string());
+ ++m_failed_attempts;
+ return;
+ }
+ }
+
+ // Create a socket for connecting to server
+ int socket_fd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
+ if (SOCKET_ERROR == socket_fd) {
+ report_connection_error(
+ m_current_connection->current_address(), "Socket creation failed: " + get_last_socket_error_message());
+ return;
+ }
+
+ try_set_socket_options(socket_fd, macos_async_client::BUFFER_SIZE, true, true, true);
+ bool success = set_non_blocking_mode(socket_fd, true);
+ if (!success) {
+ report_connection_error(
+ m_current_connection->current_address(), "Can not make non-blocking socket: " + get_last_socket_error_message());
+ return;
+ }
+
+ m_current_client = m_current_connection->to_client(socket_fd);
+ bool ok = m_current_client->start_monitoring(m_epoll);
+ if (!ok)
+ throw_last_system_error("Can not add file descriptor to epoll");
+
+ // Connect to server.
+ int res = connect(socket_fd, addr->ai_addr, addr->ai_addrlen);
+ if (SOCKET_ERROR == res) {
+ int last_error = errno;
+
+ clock_gettime(CLOCK_MONOTONIC, &m_last_connection_time);
+
+ if (last_error != EWOULDBLOCK && last_error != EINPROGRESS) {
+ handle_connection_failed("Failed to establish connection with the host: " + get_socket_error_message(last_error));
+ return;
+ }
+ }
+}
+
+void macos_async_worker_thread::handle_connection_events() {
+ enum { MAX_EVENTS = 16 };
+ epoll_event events[MAX_EVENTS];
+
+ int timeout = calculate_connection_timeout();
+
+ int res = epoll_wait(m_epoll, events, MAX_EVENTS, timeout);
+
+ if (res <= 0)
+ return;
+
+ for (int i = 0; i < res; ++i) {
+ epoll_event ¤t_event = events[i];
+ auto client = static_cast<macos_async_client *>(current_event.data.ptr);
+ if (!client)
+ continue;
+
+ if (client == m_current_client.get()) {
+ if (current_event.events & (EPOLLRDHUP | EPOLLERR)) {
+ handle_connection_failed("Can not establish connection");
+ continue;
+ }
+
+ handle_connection_success(client);
+ }
+
+ if (current_event.events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) {
+ handle_connection_closed(client);
+ continue;
+ }
+
+ if (current_event.events & EPOLLIN) {
+ auto msg = client->receive();
+ if (msg.empty()) {
+ handle_connection_closed(client);
+ continue;
+ }
+
+ m_client_pool.handle_nessage_received(client->id(), msg);
+ }
+
+ if (current_event.events & EPOLLOUT) {
+ bool ok = client->process_sent();
+ if (!ok) {
+ handle_connection_closed(client);
+ continue;
+ }
+
+ m_client_pool.handle_message_sent(client->id());
+ }
+ }
+}
+
+void macos_async_worker_thread::report_connection_error(const end_point &addr, std::string msg) {
+ ignite_error err(status_code::NETWORK, std::move(msg));
+ m_client_pool.handle_connection_error(addr, err);
+}
+
+void macos_async_worker_thread::handle_connection_failed(std::string msg) {
+ assert(m_current_client);
+
+ m_current_client->stop_monitoring();
+ m_current_client->close();
+
+ report_connection_error(m_current_client->address(), std::move(msg));
+
+ m_current_client.reset();
+ ++m_failed_attempts;
+}
+
+void macos_async_worker_thread::handle_connection_closed(macos_async_client *client) {
+ client->stop_monitoring();
+
+ m_non_connected.push_back(client->get_range());
+
+ m_client_pool.close_and_release(client->id(), std::nullopt);
+}
+
+void macos_async_worker_thread::handle_connection_success(macos_async_client *client) {
+ m_non_connected.erase(std::find(m_non_connected.begin(), m_non_connected.end(), client->get_range()));
+
+ m_client_pool.add_client(std::move(m_current_client));
+
+ m_current_client.reset();
+ m_current_connection.reset();
+
+ m_failed_attempts = 0;
+
+ clock_gettime(CLOCK_MONOTONIC, &m_last_connection_time);
+}
+
+int macos_async_worker_thread::calculate_connection_timeout() const {
+ if (!should_initiate_new_connection())
+ return -1;
+
+ if (m_last_connection_time.tv_sec == 0)
+ return 0;
+
+ int timeout = int(fibonacci10.get_value(m_failed_attempts) * 1000);
+
+ timespec now{};
+ clock_gettime(CLOCK_MONOTONIC, &now);
+
+ int passed =
+ int((now.tv_sec - m_last_connection_time.tv_sec) * 1000 + (now.tv_nsec - m_last_connection_time.tv_nsec) / 1000000);
+
+ timeout -= passed;
+ if (timeout < 0)
+ timeout = 0;
+
+ return timeout;
+}
+
+bool macos_async_worker_thread::should_initiate_new_connection() const {
+ return !m_current_client && m_non_connected.size() > m_min_addrs;
+}
+
+} // namespace ignite::network::detail
diff --git a/modules/platforms/cpp/ignite/network/detail/macos/macos_async_worker_thread.h b/modules/platforms/cpp/ignite/network/detail/macos/macos_async_worker_thread.h
new file mode 100644
index 0000000000..cbe9d649fa
--- /dev/null
+++ b/modules/platforms/cpp/ignite/network/detail/macos/macos_async_worker_thread.h
@@ -0,0 +1,158 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#pragma once
+
+#include "connecting_context.h"
+#include "macos_async_client.h"
+
+#include <ignite/network/async_handler.h>
+#include <ignite/network/end_point.h>
+#include <ignite/network/tcp_range.h>
+
+#include <cstdint>
+#include <ctime>
+#include <memory>
+#include <mutex>
+#include <thread>
+
+namespace ignite::network::detail {
+
+class macos_async_client_pool;
+
+/**
+* Async pool working thread.
+*/
+class macos_async_worker_thread {
+public:
+ /**
+ * Default constructor.
+ */
+ explicit macos_async_worker_thread(macos_async_client_pool &client_pool);
+
+ /**
+ * Destructor.
+ */
+ ~macos_async_worker_thread();
+
+ /**
+ * Start worker thread.
+ *
+ * @param limit Connection limit.
+ * @param addrs Addresses to connect to.
+ */
+ void start(size_t limit, std::vector<tcp_range> addrs);
+
+ /**
+ * Stop thread.
+ */
+ void stop();
+
+private:
+ /**
+ * Run thread.
+ */
+ void run();
+
+ /**
+ * Initiate new connection process if needed.
+ */
+ void handle_new_connections();
+
+ /**
+ * Handle epoll events.
+ */
+ void handle_connection_events();
+
+ /**
+ * Handle network error during connection establishment.
+ *
+ * @param addr End point.
+ * @param msg Error message.
+ */
+ void report_connection_error(const end_point &addr, std::string msg);
+
+ /**
+ * Handle failed connection.
+ *
+ * @param msg Error message.
+ */
+ void handle_connection_failed(std::string msg);
+
+ /**
+ * Handle network error on established connection.
+ *
+ * @param client Client instance.
+ */
+ void handle_connection_closed(macos_async_client *client);
+
+ /**
+ * Handle successfully established connection.
+ *
+ * @param client Client instance.
+ */
+ void handle_connection_success(macos_async_client *client);
+
+ /**
+ * Calculate connection timeout.
+ *
+ * @return Connection timeout.
+ */
+ [[nodiscard]] int calculate_connection_timeout() const;
+
+ /**
+ * Check whether new connection should be initiated.
+ *
+ * @return @c true if new connection should be initiated.
+ */
+ [[nodiscard]] bool should_initiate_new_connection() const;
+
+ /** Client pool. */
+ macos_async_client_pool &m_client_pool;
+
+ /** Flag indicating that thread is stopping. */
+ volatile bool m_stopping;
+
+ /** Client epoll file descriptor. */
+ int m_epoll;
+
+ /** Stop event file descriptor. */
+ int m_stop_event;
+
+ /** Addresses to use for connection establishment. */
+ std::vector<tcp_range> m_non_connected;
+
+ /** Connection which is currently in connecting process. */
+ std::unique_ptr<connecting_context> m_current_connection;
+
+ /** Currently connected client. */
+ std::shared_ptr<macos_async_client> m_current_client;
+
+ /** Failed connection attempts. */
+ size_t m_failed_attempts;
+
+ /** Last connection time. */
+ timespec m_last_connection_time;
+
+ /** Minimal number of addresses. */
+ size_t m_min_addrs;
+
+ /** Thread. */
+ std::thread m_thread;
+};
+
+} // namespace ignite::network::detail
diff --git a/modules/platforms/cpp/ignite/network/detail/macos/sockets.cpp b/modules/platforms/cpp/ignite/network/detail/macos/sockets.cpp
new file mode 100644
index 0000000000..99d0703bc4
--- /dev/null
+++ b/modules/platforms/cpp/ignite/network/detail/macos/sockets.cpp
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "sockets.h"
+
+#include <cerrno>
+#include <cstring>
+#include <sstream>
+
+#include <fcntl.h>
+#include <netdb.h>
+#include <netinet/tcp.h>
+#include <sys/socket.h>
+
+namespace ignite::network::detail {
+
+std::string get_socket_error_message(int error) {
+ std::stringstream res;
+
+ res << "error_code=" << error;
+
+ if (error == 0)
+ return res.str();
+
+ char err_buf[1024] = {0};
+
+ const int err_res = strerror_r(error, err_buf, sizeof(err_buf));
+
+ switch (err_res) {
+ case 0:
+ res << ", msg=" << err_buf;
+ break;
+ case ERANGE:
+ // Buffer too small.
+ break;
+ default:
+ case EINVAL:
+ // Invalid error code.
+ break;
+ }
+
+ return res.str();
+}
+
+std::string get_last_socket_error_message() {
+ int last_error = errno;
+
+ return get_socket_error_message(last_error);
+}
+
+void try_set_socket_options(int socket_fd, int buf_size, bool no_delay, bool out_of_band, bool keep_alive) {
+ setsockopt(socket_fd, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char *>(&buf_size), sizeof(buf_size));
+ setsockopt(socket_fd, SOL_SOCKET, SO_RCVBUF, reinterpret_cast<char *>(&buf_size), sizeof(buf_size));
+
+ int iNoDelay = no_delay ? 1 : 0;
+ setsockopt(socket_fd, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast<char *>(&iNoDelay), sizeof(iNoDelay));
+
+ int iOutOfBand = out_of_band ? 1 : 0;
+ setsockopt(socket_fd, SOL_SOCKET, SO_OOBINLINE, reinterpret_cast<char *>(&iOutOfBand), sizeof(iOutOfBand));
+
+ int iKeepAlive = keep_alive ? 1 : 0;
+ int res = setsockopt(socket_fd, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<char *>(&iKeepAlive), sizeof(iKeepAlive));
+
+ if (SOCKET_ERROR == res) {
+ // There is no sense in configuring keep alive params if we failed to set up keep alive mode.
+ return;
+ }
+
+ // The time in seconds the connection needs to remain idle before starts sending keepalive probes.
+ enum { KEEP_ALIVE_IDLE_TIME = 60 };
+
+ // The time in seconds between individual keepalive probes.
+ enum { KEEP_ALIVE_PROBES_PERIOD = 1 };
+
+ int idle_opt = KEEP_ALIVE_IDLE_TIME;
+ int idle_retry_opt = KEEP_ALIVE_PROBES_PERIOD;
+#ifdef __APPLE__
+ setsockopt(socket_fd, IPPROTO_TCP, TCP_KEEPALIVE, reinterpret_cast<char *>(&idle_opt), sizeof(idle_opt));
+#else
+ setsockopt(socket_fd, IPPROTO_TCP, TCP_KEEPIDLE, reinterpret_cast<char *>(&idle_opt), sizeof(idle_opt));
+#endif
+
+ setsockopt(socket_fd, IPPROTO_TCP, TCP_KEEPINTVL, reinterpret_cast<char *>(&idle_retry_opt), sizeof(idle_retry_opt));
+}
+
+bool set_non_blocking_mode(int socket_fd, bool non_blocking) {
+ int flags = fcntl(socket_fd, F_GETFL, 0);
+ if (flags == -1)
+ return false;
+
+ bool current_non_blocking = flags & O_NONBLOCK;
+ if (non_blocking == current_non_blocking)
+ return true;
+
+ flags ^= O_NONBLOCK;
+ int res = fcntl(socket_fd, F_SETFL, flags);
+
+ return res != -1;
+}
+
+} // namespace ignite::network::detail
diff --git a/modules/platforms/cpp/ignite/network/detail/macos/sockets.h b/modules/platforms/cpp/ignite/network/detail/macos/sockets.h
new file mode 100644
index 0000000000..64380b56b9
--- /dev/null
+++ b/modules/platforms/cpp/ignite/network/detail/macos/sockets.h
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <string>
+
+#define SOCKET_ERROR (-1)
+
+namespace ignite::network::detail {
+
+/**
+ * Get socket error message for the error code.
+ * @param error Error code.
+ * @return Socket error message string.
+ */
+std::string get_socket_error_message(int error);
+
+/**
+ * Get last socket error message.
+ * @return Last socket error message string.
+ */
+std::string get_last_socket_error_message();
+
+/**
+ * Try and set socket options.
+ *
+ * @param socket_fd Socket file descriptor.
+ * @param buf_size Buffer size.
+ * @param no_delay Set no-delay mode.
+ * @param out_of_band Set out-of-Band mode.
+ * @param keep_alive Keep alive mode.
+ */
+void try_set_socket_options(int socket_fd, int buf_size, bool no_delay, bool out_of_band, bool keep_alive);
+
+/**
+ * Set non blocking mode for socket.
+ *
+ * @param socket_fd Socket file descriptor.
+ * @param non_blocking Non-blocking mode.
+ */
+bool set_non_blocking_mode(int socket_fd, bool non_blocking);
+
+} // namespace ignite::network::detail
diff --git a/modules/platforms/cpp/ignite/network/network.cpp b/modules/platforms/cpp/ignite/network/detail/macos/utils.cpp
similarity index 53%
copy from modules/platforms/cpp/ignite/network/network.cpp
copy to modules/platforms/cpp/ignite/network/detail/macos/utils.cpp
index 342d78f57c..ebfdf112f6 100644
--- a/modules/platforms/cpp/ignite/network/network.cpp
+++ b/modules/platforms/cpp/ignite/network/detail/macos/utils.cpp
@@ -15,25 +15,36 @@
* limitations under the License.
*/
-#include "network.h"
+#include "../utils.h"
-#include "async_client_pool_adapter.h"
+#include <cstring>
-#include <ignite/common/config.h>
+namespace ignite::network::detail {
-#ifdef _WIN32
-# include "detail/win/win_async_client_pool.h"
-#else // Other. Assume Linux
-# include "detail/linux/linux_async_client_pool.h"
-#endif
+std::string get_last_system_error() {
+ int error_code = errno;
-namespace ignite::network {
+ std::string error_details;
+ if (error_code != 0) {
+ char err_buf[1024] = {0};
-std::shared_ptr<async_client_pool> make_async_client_pool(data_filters filters) {
- auto pool =
- std::make_shared<IGNITE_SWITCH_WIN_OTHER(detail::win_async_client_pool, detail::linux_async_client_pool)>();
+ const int res = strerror_r(error_code, err_buf, sizeof(err_buf));
- return std::make_shared<async_client_pool_adapter>(std::move(filters), std::move(pool));
+ switch (res) {
+ case 0:
+ error_details.assign(err_buf);
+ break;
+ case ERANGE:
+ // Buffer too small.
+ break;
+ default:
+ case EINVAL:
+ // Invalid error code.
+ break;
+ }
+ }
+
+ return error_details;
}
-} // namespace ignite::network
+} // namespace ignite::network::detail
diff --git a/modules/platforms/cpp/ignite/network/length_prefix_codec.cpp b/modules/platforms/cpp/ignite/network/length_prefix_codec.cpp
index 21267702c8..64e779f4a5 100644
--- a/modules/platforms/cpp/ignite/network/length_prefix_codec.cpp
+++ b/modules/platforms/cpp/ignite/network/length_prefix_codec.cpp
@@ -31,7 +31,7 @@ length_prefix_codec::length_prefix_codec()
data_buffer_owning length_prefix_codec::encode(data_buffer_owning &data) {
// Just pass data as is, because we encode message size in
// the application to avoid unnecessary re-allocations and copying.
- return std::move(data.consume_entirely());
+ return data.consume_entirely();
}
void length_prefix_codec::reset_buffer() {
@@ -62,7 +62,7 @@ data_buffer_ref length_prefix_codec::decode(data_buffer_ref &data) {
if (m_packet.size() < PACKET_HEADER_SIZE)
return {};
- m_packet_size = bytes::load<endian::LITTLE, int32_t>(m_packet.data());
+ m_packet_size = bytes::load<endian::BIG, int32_t>(m_packet.data());
}
consume(data, m_packet_size + PACKET_HEADER_SIZE);
diff --git a/modules/platforms/cpp/ignite/network/network.cpp b/modules/platforms/cpp/ignite/network/network.cpp
index 342d78f57c..8c5e88892d 100644
--- a/modules/platforms/cpp/ignite/network/network.cpp
+++ b/modules/platforms/cpp/ignite/network/network.cpp
@@ -23,15 +23,20 @@
#ifdef _WIN32
# include "detail/win/win_async_client_pool.h"
-#else // Other. Assume Linux
+#elif __linux__
# include "detail/linux/linux_async_client_pool.h"
+#elif __APPLE__
+# include "detail/macos/macos_async_client_pool.h"
#endif
namespace ignite::network {
std::shared_ptr<async_client_pool> make_async_client_pool(data_filters filters) {
- auto pool =
- std::make_shared<IGNITE_SWITCH_WIN_OTHER(detail::win_async_client_pool, detail::linux_async_client_pool)>();
+ auto pool = std::make_shared<IGNITE_OS_SWITCH(
+ detail::win_async_client_pool,
+ detail::linux_async_client_pool,
+ detail::macos_async_client_pool
+ )>();
return std::make_shared<async_client_pool_adapter>(std::move(filters), std::move(pool));
}
diff --git a/modules/platforms/cpp/ignite/protocol/buffer_adapter.cpp b/modules/platforms/cpp/ignite/protocol/buffer_adapter.cpp
index 270467bdec..5d534b946f 100644
--- a/modules/platforms/cpp/ignite/protocol/buffer_adapter.cpp
+++ b/modules/platforms/cpp/ignite/protocol/buffer_adapter.cpp
@@ -29,7 +29,7 @@ void buffer_adapter::write_length_header() {
auto length = std::int32_t(m_buffer.size() - (m_length_pos + LENGTH_HEADER_SIZE));
- bytes::store<endian::LITTLE, int32_t>(m_buffer.data() + m_length_pos, length);
+ bytes::store<endian::BIG, int32_t>(m_buffer.data() + m_length_pos, length);
}
} // namespace ignite::protocol
diff --git a/modules/platforms/cpp/tests/client-test/CMakeLists.txt b/modules/platforms/cpp/tests/client-test/CMakeLists.txt
index 04cc116801..bee8332b66 100644
--- a/modules/platforms/cpp/tests/client-test/CMakeLists.txt
+++ b/modules/platforms/cpp/tests/client-test/CMakeLists.txt
@@ -28,7 +28,7 @@ set(SOURCES
)
add_executable(${TARGET} ${SOURCES})
-target_link_libraries(${TARGET} ignite-test-common ignite-client GTest::gtest)
+target_link_libraries(${TARGET} ignite-test-common ignite-client GTest::GTest)
set(TEST_TARGET IgniteClientTest)
add_test(NAME ${TEST_TARGET} COMMAND ${TARGET})
diff --git a/modules/platforms/cpp/tests/client-test/main.cpp b/modules/platforms/cpp/tests/client-test/main.cpp
index fbe3a35086..b40c7fa94d 100644
--- a/modules/platforms/cpp/tests/client-test/main.cpp
+++ b/modules/platforms/cpp/tests/client-test/main.cpp
@@ -24,6 +24,18 @@
#include <chrono>
#include <thread>
+namespace {
+/** Shutdown handler that cleans up resources. */
+std::function<void(int)> shutdown_handler;
+
+/**
+ * Receives OS signal and handles it.
+ *
+ * @param signal Signal value.
+ */
+void signal_handler(int signal) { shutdown_handler(signal); }
+}
+
/**
* Run prior to any other tests.
*/
@@ -40,9 +52,23 @@ void before_all() {
}
int main(int argc, char **argv) {
+ // Install signal handlers to clean up resources on early exit.
+ signal(SIGABRT, signal_handler);
+ signal(SIGINT, signal_handler);
+
+ ignite::IgniteRunner runner;
+
+ shutdown_handler = [&](int signal) {
+ std::cout << "Caught signal " << signal << " during tests" << std::endl;
+
+ runner.stop();
+
+ std::cout << "Resources cleanup OK" << std::endl;
+ };
+
int res = 0;
before_all();
- ignite::IgniteRunner runner;
+
try {
runner.start(false);
diff --git a/modules/platforms/cpp/tests/test-common/detail/linux_process.h b/modules/platforms/cpp/tests/test-common/detail/linux_process.h
index 4f85b94b3f..f0ba3fa4ab 100644
--- a/modules/platforms/cpp/tests/test-common/detail/linux_process.h
+++ b/modules/platforms/cpp/tests/test-common/detail/linux_process.h
@@ -20,6 +20,7 @@
// It's OK that this code is entirely in header as it only supposed to be included from a single file.
#include "cmd_process.h"
+#include "unix_process.h"
#include <chrono>
#include <iostream>
@@ -35,7 +36,7 @@ namespace ignite::detail {
/**
* Implementation of CmdProcess for Windows.
*/
-class LinuxProcess : public ignite::CmdProcess {
+class LinuxProcess : public ignite::detail::UnixProcess {
public:
/**
* Constructor.
@@ -45,95 +46,14 @@ public:
* @param workDir Working directory.
*/
LinuxProcess(std::string command, std::vector<std::string> args, std::string workDir)
- : m_running(false)
- , m_command(std::move(command))
- , m_args(std::move(args))
- , m_workDir(std::move(workDir)) { }
+ : ignite::detail::UnixProcess(std::move(command), std::move(args), std::move(workDir)) { }
- /**
- * Destructor.
- */
- ~LinuxProcess() override { kill(); }
-
- /**
- * Start process.
- */
- bool start() final {
- if (m_running)
- return false;
-
- m_pid = fork();
-
- if (!m_pid) {
- // Setting the group ID to be killed easily.
- int res = setpgid(0, 0);
- if (res) {
- std::cout << "Failed set group ID of the forked process: " + std::to_string(res) << std::endl;
- exit(1);
- }
-
- // Route for the forked process.
- res = chdir(m_workDir.c_str());
- if (res) {
- std::cout << "Failed to change directory of the forked process: " + std::to_string(res) << std::endl;
- exit(1);
- }
-
- std::vector<const char *> args;
- args.push_back(m_command.c_str());
-
- for (auto &arg : m_args) {
- args.push_back(arg.c_str());
- }
-
- args.push_back(nullptr);
-
- res = execvp(m_command.c_str(), const_cast<char *const *>(args.data()));
-
- // On success this code should never be reached because the process get replaced by a new one.
- std::cout << "Failed to execute process: " + std::to_string(res) << std::endl;
- exit(1);
- }
-
- m_running = true;
- return true;
- }
-
- /**
- * Kill the process.
- */
void kill() final {
if (!m_running)
return;
::kill(-m_pid, SIGTERM);
}
-
- /**
- * Join process.
- *
- * @param timeout Timeout.
- */
- void join(std::chrono::milliseconds) final {
- // Ignoring timeout in Linux...
- ::waitpid(m_pid, nullptr, 0);
- }
-
-private:
- /** Running flag. */
- bool m_running;
-
- /** Process ID. */
- int m_pid{0};
-
- /** Command. */
- const std::string m_command;
-
- /** Arguments. */
- const std::vector<std::string> m_args;
-
- /** Working directory. */
- const std::string m_workDir;
};
} // namespace ignite::detail
diff --git a/modules/platforms/cpp/tests/test-common/detail/macos_process.h b/modules/platforms/cpp/tests/test-common/detail/macos_process.h
new file mode 100644
index 0000000000..8fb7600fda
--- /dev/null
+++ b/modules/platforms/cpp/tests/test-common/detail/macos_process.h
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+// It's OK that this code is entirely in header as it only supposed to be included from a single file.
+
+#include "cmd_process.h"
+#include "unix_process.h"
+
+#include <chrono>
+#include <iostream>
+#include <string>
+#include <vector>
+
+#include <csignal>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <unistd.h>
+
+namespace ignite::detail {
+
+/**
+ * Implementation of CmdProcess for Windows.
+ */
+class MacosProcess : public ignite::detail::UnixProcess {
+public:
+ /**
+ * Constructor.
+ *
+ * @param command Command.
+ * @param args Arguments.
+ * @param workDir Working directory.
+ */
+ MacosProcess(std::string command, std::vector<std::string> args, std::string workDir)
+ : ignite::detail::UnixProcess(std::move(command), std::move(args), std::move(workDir)) { }
+
+ ~MacosProcess() override {
+ std::cout << "asd";
+ }
+
+ void kill() final {
+ if (!m_running)
+ return;
+
+ ::kill(-m_pid, SIGTERM);
+ }
+
+};
+
+} // namespace ignite::detail
diff --git a/modules/platforms/cpp/tests/test-common/detail/linux_process.h b/modules/platforms/cpp/tests/test-common/detail/unix_process.h
similarity index 67%
copy from modules/platforms/cpp/tests/test-common/detail/linux_process.h
copy to modules/platforms/cpp/tests/test-common/detail/unix_process.h
index 4f85b94b3f..2ecca8a2ce 100644
--- a/modules/platforms/cpp/tests/test-common/detail/linux_process.h
+++ b/modules/platforms/cpp/tests/test-common/detail/unix_process.h
@@ -1,19 +1,19 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
#pragma once
@@ -35,7 +35,7 @@ namespace ignite::detail {
/**
* Implementation of CmdProcess for Windows.
*/
-class LinuxProcess : public ignite::CmdProcess {
+class UnixProcess : public ignite::CmdProcess {
public:
/**
* Constructor.
@@ -44,17 +44,12 @@ public:
* @param args Arguments.
* @param workDir Working directory.
*/
- LinuxProcess(std::string command, std::vector<std::string> args, std::string workDir)
+ UnixProcess(std::string command, std::vector<std::string> args, std::string workDir)
: m_running(false)
, m_command(std::move(command))
, m_args(std::move(args))
, m_workDir(std::move(workDir)) { }
- /**
- * Destructor.
- */
- ~LinuxProcess() override { kill(); }
-
/**
* Start process.
*/
@@ -99,16 +94,6 @@ public:
return true;
}
- /**
- * Kill the process.
- */
- void kill() final {
- if (!m_running)
- return;
-
- ::kill(-m_pid, SIGTERM);
- }
-
/**
* Join process.
*
@@ -119,7 +104,7 @@ public:
::waitpid(m_pid, nullptr, 0);
}
-private:
+protected:
/** Running flag. */
bool m_running;
diff --git a/modules/platforms/cpp/tests/test-common/ignite_runner.cpp b/modules/platforms/cpp/tests/test-common/ignite_runner.cpp
index 81bdb299ea..69715f28da 100644
--- a/modules/platforms/cpp/tests/test-common/ignite_runner.cpp
+++ b/modules/platforms/cpp/tests/test-common/ignite_runner.cpp
@@ -30,8 +30,8 @@ namespace {
/**
* System shell command string.
*/
-constexpr std::string_view SYSTEM_SHELL = IGNITE_SWITCH_WIN_OTHER("cmd.exe", "/bin/sh");
-constexpr std::string_view SYSTEM_SHELL_ARG_0 = IGNITE_SWITCH_WIN_OTHER("/c ", "-c");
+constexpr std::string_view SYSTEM_SHELL = IGNITE_OS_SWITCH("cmd.exe", "/bin/sh", "/bin/bash");
+constexpr std::string_view SYSTEM_SHELL_ARG_0 = IGNITE_OS_SWITCH("/c ", "-c", "-c");
} // anonymous namespace
diff --git a/modules/platforms/cpp/tests/test-common/process.cpp b/modules/platforms/cpp/tests/test-common/process.cpp
index 9c2f9e6e39..c8a2aa5f93 100644
--- a/modules/platforms/cpp/tests/test-common/process.cpp
+++ b/modules/platforms/cpp/tests/test-common/process.cpp
@@ -15,10 +15,12 @@
* limitations under the License.
*/
-#ifdef WIN32
+#ifdef _WIN32
# include "detail/win_process.h"
-#else
+#elif __linux__
# include "detail/linux_process.h"
+#elif __APPLE__
+# include "detail/macos_process.h"
#endif
#include "cmd_process.h"
@@ -30,11 +32,14 @@
namespace ignite {
std::unique_ptr<CmdProcess> CmdProcess::make(std::string command, std::vector<std::string> args, std::string workDir) {
-#ifdef WIN32
+#ifdef _WIN32
return std::unique_ptr<CmdProcess>(new detail::WinProcess(std::move(command), std::move(args), std::move(workDir)));
-#else
+#elif __linux__
return std::unique_ptr<CmdProcess>(
new detail::LinuxProcess(std::move(command), std::move(args), std::move(workDir)));
+#elif __APPLE__
+ return std::unique_ptr<CmdProcess>(
+ new detail::MacosProcess(std::move(command), std::move(args), std::move(workDir)));
#endif
}