You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by is...@apache.org on 2022/01/10 21:48:23 UTC
[ignite] branch master updated: IGNITE-15864 CPP Thin: asynchronous network events
This is an automated email from the ASF dual-hosted git repository.
isapego pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new e18bbbe IGNITE-15864 CPP Thin: asynchronous network events
e18bbbe is described below
commit e18bbbedfa23f4a4c7bcd1f4c48fa881411e5653
Author: Igor Sapego <is...@apache.org>
AuthorDate: Tue Jan 11 00:46:01 2022 +0300
IGNITE-15864 CPP Thin: asynchronous network events
This closes #9713
---
.../include/ignite/impl/binary/binary_utils.h | 4 +-
.../ignite/impl/interop/interop_input_stream.h | 16 +-
.../include/ignite/impl/interop/interop_memory.h | 6 +-
.../cpp/binary/src/impl/binary/binary_utils.cpp | 14 +-
.../src/impl/interop/interop_input_stream.cpp | 30 +-
.../cpp/binary/src/impl/interop/interop_memory.cpp | 2 +-
.../include/ignite/common/factory.h} | 38 +-
.../cpp/common/include/ignite/common/promise.h | 92 +++++
.../common/include/ignite/common/shared_state.h | 174 +++++++++
.../cpp/common/include/ignite/common/utils.h | 53 ++-
.../platforms/cpp/common/include/ignite/future.h | 125 +++++++
.../os/linux/include/ignite/common/concurrent_os.h | 43 +++
.../common/os/linux/src/common/concurrent_os.cpp | 33 ++
.../os/win/include/ignite/common/concurrent_os.h | 43 +++
.../cpp/common/os/win/src/common/concurrent_os.cpp | 32 ++
modules/platforms/cpp/common/src/common/utils.cpp | 17 +
modules/platforms/cpp/network/CMakeLists.txt | 29 +-
.../include/ignite/network/async_client_pool.h | 81 ++++
.../network/include/ignite/network/async_handler.h | 87 +++++
.../cpp/network/include/ignite/network/codec.h | 79 ++++
.../include/ignite/network/codec_data_filter.h | 112 ++++++
.../network/include/ignite/network/data_buffer.h | 139 +++++++
.../network/include/ignite/network/data_filter.h | 105 ++++++
.../include/ignite/network/data_filter_adapter.h | 147 ++++++++
.../cpp/network/include/ignite/network/data_sink.h | 65 ++++
.../cpp/network/include/ignite/network/end_point.h | 15 +
.../include/ignite/network/length_prefix_codec.h | 115 ++++++
.../cpp/network/include/ignite/network/network.h | 22 +-
.../network/include/ignite/network/socket_client.h | 16 +-
.../ignite/network/ssl/secure_data_filter.h | 261 +++++++++++++
.../cpp/network/include/ignite/network/tcp_range.h | 34 +-
.../cpp/network/include/ignite/network/utils.h | 11 +
.../os/linux/src/network/connecting_context.cpp | 110 ++++++
.../os/linux/src/network/connecting_context.h | 96 +++++
.../os/linux/src/network/linux_async_client.cpp | 200 ++++++++++
.../os/linux/src/network/linux_async_client.h | 249 +++++++++++++
.../linux/src/network/linux_async_client_pool.cpp | 205 ++++++++++
.../os/linux/src/network/linux_async_client_pool.h | 205 ++++++++++
.../src/network/linux_async_worker_thread.cpp | 356 ++++++++++++++++++
.../linux/src/network/linux_async_worker_thread.h | 178 +++++++++
.../cpp/network/os/linux/src/network/sockets.cpp | 61 +++
.../cpp/network/os/linux/src/network/sockets.h | 21 +-
.../os/linux/src/network/tcp_socket_client.cpp | 48 +--
.../cpp/network/os/win/src/network/sockets.cpp | 98 ++++-
.../cpp/network/os/win/src/network/sockets.h | 26 ++
.../os/win/src/network/tcp_socket_client.cpp | 91 +----
.../os/win/src/network/win_async_client.cpp | 187 ++++++++++
.../network/os/win/src/network/win_async_client.h | 279 ++++++++++++++
.../os/win/src/network/win_async_client_pool.cpp | 248 +++++++++++++
.../os/win/src/network/win_async_client_pool.h | 218 +++++++++++
.../src/network/win_async_connecting_thread.cpp | 252 +++++++++++++
.../win/src/network/win_async_connecting_thread.h | 127 +++++++
.../os/win/src/network/win_async_worker_thread.cpp | 142 +++++++
.../os/win/src/network/win_async_worker_thread.h | 76 ++++
.../src/network/async_client_pool_adapter.cpp | 75 ++++
.../src/network/async_client_pool_adapter.h | 111 ++++++
.../cpp/network/src/network/codec_data_filter.cpp | 108 ++++++
.../cpp/network/src/network/data_buffer.cpp | 126 +++++++
.../network/src/network/error_handling_filter.cpp | 86 +++++
.../network/src/network/error_handling_filter.h | 83 +++++
.../network/src/network/length_prefix_codec.cpp | 94 +++++
.../platforms/cpp/network/src/network/network.cpp | 30 +-
.../network/src/network/ssl/secure_data_filter.cpp | 411 +++++++++++++++++++++
.../src/network/ssl/secure_socket_client.cpp | 11 +-
.../cpp/network/src/network/ssl/ssl_gateway.cpp | 102 +++++
.../cpp/network/src/network/ssl/ssl_gateway.h | 25 +-
.../cpp/network/src/network/tcp_socket_client.h | 11 -
.../cpp/odbc/include/ignite/odbc/connection.h | 2 +-
.../cpp/odbc/include/ignite/odbc/utility.h | 8 -
modules/platforms/cpp/odbc/src/connection.cpp | 8 +-
modules/platforms/cpp/odbc/src/utility.cpp | 15 -
.../cpp/thin-client-test/src/cache_client_test.cpp | 40 +-
.../thin-client-test/src/ignite_client_test.cpp | 56 ++-
.../cpp/thin-client-test/src/ssl_test.cpp | 80 +++-
.../cpp/thin-client-test/src/test_utils.cpp | 2 +-
.../ignite/thin/ignite_client_configuration.h | 39 +-
.../src/impl/cache/cache_client_impl.cpp | 10 +-
.../thin-client/src/impl/cache/cache_client_impl.h | 6 +-
.../thin-client/src/impl/cache/query/cursor_page.h | 5 +-
...ute_client_impl.cpp => channel_state_handler.h} | 43 ++-
.../src/impl/compute/compute_client_impl.cpp | 81 +++-
.../cpp/thin-client/src/impl/data_channel.cpp | 384 ++++++++-----------
.../cpp/thin-client/src/impl/data_channel.h | 324 ++++++----------
.../cpp/thin-client/src/impl/data_router.cpp | 373 +++++++++++++------
.../cpp/thin-client/src/impl/data_router.h | 263 ++++++-------
.../thin-client/src/impl/ignite_client_impl.cpp | 2 +-
.../platforms/cpp/thin-client/src/impl/message.cpp | 18 -
.../platforms/cpp/thin-client/src/impl/message.h | 109 +++++-
.../thin-client/src/impl/notification_handler.h | 144 ++++++++
.../src/impl/transactions/transaction_impl.cpp | 2 +-
.../src/impl/transactions/transaction_impl.h | 2 +-
91 files changed, 7750 insertions(+), 1052 deletions(-)
diff --git a/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_utils.h b/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_utils.h
index 58e5819..ce79baf 100644
--- a/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_utils.h
+++ b/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_utils.h
@@ -250,7 +250,7 @@ namespace ignite
* @param pos Position in memory.
* @return Value.
*/
- static int32_t ReadInt32(interop::InteropMemory& mem, int32_t pos);
+ static int32_t ReadInt32(const interop::InteropMemory& mem, int32_t pos);
/**
* Utility method to read signed 32-bit integer from memory.
@@ -260,7 +260,7 @@ namespace ignite
* @param pos Position in memory.
* @return Value.
*/
- static int32_t UnsafeReadInt32(interop::InteropMemory& mem, int32_t pos);
+ static int32_t UnsafeReadInt32(const interop::InteropMemory& mem, int32_t pos);
/**
* Utility method to write signed 32-bit integer to stream.
diff --git a/modules/platforms/cpp/binary/include/ignite/impl/interop/interop_input_stream.h b/modules/platforms/cpp/binary/include/ignite/impl/interop/interop_input_stream.h
index 3e14171..ff9a554 100644
--- a/modules/platforms/cpp/binary/include/ignite/impl/interop/interop_input_stream.h
+++ b/modules/platforms/cpp/binary/include/ignite/impl/interop/interop_input_stream.h
@@ -36,7 +36,15 @@ namespace ignite
*
* @param mem Memory.
*/
- explicit InteropInputStream(InteropMemory* mem);
+ explicit InteropInputStream(const InteropMemory* mem);
+
+ /**
+ * Constructor.
+ *
+ * @param mem Memory.
+ * @param len Length. Should be <= mem->Length().
+ */
+ explicit InteropInputStream(const InteropMemory* mem, int32_t len);
/**
* Read signed 8-byte int.
@@ -219,17 +227,17 @@ namespace ignite
* Get memory.
* @return Underlying memory.
*/
- InteropMemory* GetMemory()
+ const InteropMemory* GetMemory()
{
return mem;
}
private:
/** Memory. */
- InteropMemory* mem;
+ const InteropMemory* mem;
/** Pointer to data. */
- int8_t* data;
+ const int8_t* data;
/** Length. */
int len;
diff --git a/modules/platforms/cpp/binary/include/ignite/impl/interop/interop_memory.h b/modules/platforms/cpp/binary/include/ignite/impl/interop/interop_memory.h
index d5c3d60..5982bb4 100644
--- a/modules/platforms/cpp/binary/include/ignite/impl/interop/interop_memory.h
+++ b/modules/platforms/cpp/binary/include/ignite/impl/interop/interop_memory.h
@@ -21,6 +21,7 @@
#include <stdint.h>
#include <ignite/common/common.h>
+#include <ignite/common/concurrent.h>
namespace ignite
{
@@ -205,7 +206,7 @@ namespace ignite
/**
* Get cross-platform pointer in long form.
*/
- int64_t PointerLong();
+ int64_t PointerLong() const;
/**
* Get raw data pointer.
@@ -260,6 +261,9 @@ namespace ignite
int8_t* memPtr;
};
+ typedef common::concurrent::SharedPointer<interop::InteropMemory> SP_InteropMemory;
+ typedef common::concurrent::SharedPointer<const interop::InteropMemory> SP_ConstInteropMemory;
+
/**
* Interop unpooled memory.
*/
diff --git a/modules/platforms/cpp/binary/src/impl/binary/binary_utils.cpp b/modules/platforms/cpp/binary/src/impl/binary/binary_utils.cpp
index 0b431ac..9afcdab 100644
--- a/modules/platforms/cpp/binary/src/impl/binary/binary_utils.cpp
+++ b/modules/platforms/cpp/binary/src/impl/binary/binary_utils.cpp
@@ -36,7 +36,7 @@ namespace
* @param pos Position.
* @param len Data to read.
*/
- inline void CheckEnoughData(InteropMemory& mem, int32_t pos, int32_t len)
+ inline void CheckEnoughData(const InteropMemory& mem, int32_t pos, int32_t len)
{
if (mem.Length() < (pos + len))
{
@@ -55,11 +55,11 @@ namespace
* @return Primitive.
*/
template<typename T>
- inline T ReadPrimitive(InteropMemory& mem, int32_t pos)
+ inline T ReadPrimitive(const InteropMemory& mem, int32_t pos)
{
CheckEnoughData(mem, pos, sizeof(T));
- return *reinterpret_cast<T*>(mem.Data() + pos);
+ return *reinterpret_cast<const T*>(mem.Data() + pos);
}
/**
@@ -71,9 +71,9 @@ namespace
* @return Primitive.
*/
template<typename T>
- inline T UnsafeReadPrimitive(InteropMemory& mem, int32_t pos)
+ inline T UnsafeReadPrimitive(const InteropMemory& mem, int32_t pos)
{
- return *reinterpret_cast<T*>(mem.Data() + pos);
+ return *reinterpret_cast<const T*>(mem.Data() + pos);
}
}
@@ -204,12 +204,12 @@ namespace ignite
return stream->ReadInt32();
}
- int32_t BinaryUtils::ReadInt32(InteropMemory& mem, int32_t pos)
+ int32_t BinaryUtils::ReadInt32(const InteropMemory& mem, int32_t pos)
{
return ReadPrimitive<int32_t>(mem, pos);
}
- int32_t BinaryUtils::UnsafeReadInt32(InteropMemory& mem, int32_t pos)
+ int32_t BinaryUtils::UnsafeReadInt32(const InteropMemory& mem, int32_t pos)
{
return UnsafeReadPrimitive<int32_t>(mem, pos);
}
diff --git a/modules/platforms/cpp/binary/src/impl/interop/interop_input_stream.cpp b/modules/platforms/cpp/binary/src/impl/interop/interop_input_stream.cpp
index 907c840..794f7c7 100644
--- a/modules/platforms/cpp/binary/src/impl/interop/interop_input_stream.cpp
+++ b/modules/platforms/cpp/binary/src/impl/interop/interop_input_stream.cpp
@@ -26,7 +26,7 @@
*/
#define IGNITE_INTEROP_IN_READ(type, len) { \
EnsureEnoughData(len); \
- type res = *reinterpret_cast<type*>(data + pos); \
+ type res = *reinterpret_cast<const type*>(data + pos); \
Shift(len); \
return res; \
}
@@ -44,13 +44,25 @@ namespace ignite
{
namespace interop
{
- InteropInputStream::InteropInputStream(InteropMemory* mem)
+ InteropInputStream::InteropInputStream(const InteropMemory* mem) :
+ mem(mem),
+ data(mem->Data()),
+ len(mem->Length()),
+ pos(0)
{
- this->mem = mem;
+ // No-op.
+ }
- data = mem->Data();
- len = mem->Length();
- pos = 0;
+ InteropInputStream::InteropInputStream(const InteropMemory *mem, int32_t len) :
+ mem(mem),
+ data(mem->Data()),
+ len(len),
+ pos(0)
+ {
+ if (len > mem->Length())
+ IGNITE_ERROR_FORMATTED_3(IgniteError::IGNITE_ERR_MEMORY,
+ "Requested input stream len is greater than memories length",
+ "memPtr", mem->PointerLong(), "len", len, "memLen", mem->Length());
}
int8_t InteropInputStream::ReadInt8()
@@ -65,7 +77,7 @@ namespace ignite
if (delta > 0)
EnsureEnoughData(delta);
- return *reinterpret_cast<int8_t*>(data + pos);
+ return *reinterpret_cast<const int8_t*>(data + pos);
}
void InteropInputStream::ReadInt8Array(int8_t* const res, const int32_t len)
@@ -96,7 +108,7 @@ namespace ignite
if (delta > 0)
EnsureEnoughData(delta);
- return *reinterpret_cast<int16_t*>(data + pos);
+ return *reinterpret_cast<const int16_t*>(data + pos);
}
void InteropInputStream::ReadInt16Array(int16_t* const res, const int32_t len)
@@ -126,7 +138,7 @@ namespace ignite
if (delta > 0)
EnsureEnoughData(delta);
- return *reinterpret_cast<int32_t*>(data + pos);
+ return *reinterpret_cast<const int32_t*>(data + pos);
}
void InteropInputStream::ReadInt32Array(int32_t* const res, const int32_t len)
diff --git a/modules/platforms/cpp/binary/src/impl/interop/interop_memory.cpp b/modules/platforms/cpp/binary/src/impl/interop/interop_memory.cpp
index 443bd25..8efd959 100644
--- a/modules/platforms/cpp/binary/src/impl/interop/interop_memory.cpp
+++ b/modules/platforms/cpp/binary/src/impl/interop/interop_memory.cpp
@@ -100,7 +100,7 @@ namespace ignite
return memPtr;
}
- int64_t InteropMemory::PointerLong()
+ int64_t InteropMemory::PointerLong() const
{
return reinterpret_cast<int64_t>(memPtr);
}
diff --git a/modules/platforms/cpp/network/include/ignite/network/utils.h b/modules/platforms/cpp/common/include/ignite/common/factory.h
similarity index 58%
copy from modules/platforms/cpp/network/include/ignite/network/utils.h
copy to modules/platforms/cpp/common/include/ignite/common/factory.h
index e8f0851..b85f58b 100644
--- a/modules/platforms/cpp/network/include/ignite/network/utils.h
+++ b/modules/platforms/cpp/common/include/ignite/common/factory.h
@@ -15,28 +15,40 @@
* limitations under the License.
*/
-#ifndef _IGNITE_NETWORK_UTILS
-#define _IGNITE_NETWORK_UTILS
+#ifndef _IGNITE_COMMON_FACTORY
+#define _IGNITE_COMMON_FACTORY
-#include <set>
-#include <string>
-
-#include <ignite/common/common.h>
+#include <ignite/common/concurrent.h>
namespace ignite
{
- namespace network
+ namespace common
{
- namespace utils
+ /**
+ * Factory class.
+ *
+ * @tparam T Instances of this type factory builds.
+ */
+ template<typename T>
+ class Factory
{
+ public:
+ /**
+ * Destructor.
+ */
+ virtual ~Factory()
+ {
+ // No-op.
+ }
+
/**
- * Get set of local addresses.
+ * Build instance.
*
- * @param addrs Addresses set.
+ * @return New instance of type @c T.
*/
- void IGNITE_IMPORT_EXPORT GetLocalAddresses(std::set<std::string>& addrs);
- }
+ virtual common::concurrent::SharedPointer<T> Build() = 0;
+ };
}
}
-#endif //_IGNITE_NETWORK_UTILS
\ No newline at end of file
+#endif //_IGNITE_COMMON_FACTORY
\ No newline at end of file
diff --git a/modules/platforms/cpp/common/include/ignite/common/promise.h b/modules/platforms/cpp/common/include/ignite/common/promise.h
index 8fe9603..59f3ecf 100644
--- a/modules/platforms/cpp/common/include/ignite/common/promise.h
+++ b/modules/platforms/cpp/common/include/ignite/common/promise.h
@@ -213,6 +213,98 @@ namespace ignite
/** Shared state. */
concurrent::SharedPointer< SharedState<ValueType> > state;
};
+
+ /**
+ * Specialization for SharePointer type.
+ */
+ template<typename T>
+ class Promise< concurrent::SharedPointer<T> >
+ {
+ public:
+ /** Template value type */
+ typedef T ValueType;
+
+ /** Template value type wrapped in shared pointer */
+ typedef concurrent::SharedPointer<ValueType> SP_ValueType;
+
+ /**
+ * Constructor.
+ */
+ Promise() :
+ state(new SharedState<SP_ValueType>())
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ ~Promise()
+ {
+ SharedState<SP_ValueType>* state0 = state.Get();
+
+ assert(state0 != 0);
+
+ if (!state0->IsSet())
+ state0->SetError(IgniteError(IgniteError::IGNITE_ERR_FUTURE_STATE,
+ "Broken promise. Value will never be set due to internal error."));
+ }
+
+
+ /**
+ * Get future for this promise.
+ *
+ * @return New future instance.
+ */
+ Future<SP_ValueType> GetFuture() const
+ {
+ return Future<SP_ValueType>(state);
+ }
+
+ /**
+ * Set value.
+ *
+ * @throw IgniteError with IgniteError::IGNITE_ERR_FUTURE_STATE if error or value has been set already.
+ * @param val Value to set.
+ */
+ void SetValue(SP_ValueType val)
+ {
+ SharedState<SP_ValueType>* state0 = state.Get();
+
+ assert(state0 != 0);
+
+ return state0->SetValue(val);
+ }
+
+ /**
+ * Set error.
+ *
+ * @throw IgniteError with IgniteError::IGNITE_ERR_FUTURE_STATE if error or value has been set already.
+ * @param err Error to set.
+ */
+ void SetError(const IgniteError& err)
+ {
+ SharedState<SP_ValueType>* state0 = state.Get();
+
+ assert(state0 != 0);
+
+ state0->SetError(err);
+ }
+
+ /**
+ * Set cancel target.
+ */
+ void SetCancelTarget(std::auto_ptr<Cancelable>& target)
+ {
+ state.Get()->SetCancelTarget(target);
+ }
+
+ private:
+ IGNITE_NO_COPY_ASSIGNMENT(Promise);
+
+ /** Shared state. */
+ concurrent::SharedPointer< SharedState<SP_ValueType> > state;
+ };
}
}
diff --git a/modules/platforms/cpp/common/include/ignite/common/shared_state.h b/modules/platforms/cpp/common/include/ignite/common/shared_state.h
index d223753..e6fb9fe 100644
--- a/modules/platforms/cpp/common/include/ignite/common/shared_state.h
+++ b/modules/platforms/cpp/common/include/ignite/common/shared_state.h
@@ -376,6 +376,180 @@ namespace ignite
/** Lock that used to prevent double-set of the value. */
mutable concurrent::CriticalSection mutex;
};
+
+ /**
+ * Specialization for shared pointer type.
+ */
+ template<typename T>
+ class SharedState< concurrent::SharedPointer<T> >
+ {
+ public:
+ /** Template value type */
+ typedef T ValueType;
+
+ /**
+ * Default constructor.
+ * Constructs non-set SharedState instance.
+ */
+ SharedState() :
+ value(),
+ error()
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ ~SharedState()
+ {
+ // No-op.
+ }
+
+ /**
+ * Checks if the value or error set for the state.
+ * @return True if the value or error set for the state.
+ */
+ bool IsSet() const
+ {
+ return value.IsValid() || error.GetCode() != IgniteError::IGNITE_SUCCESS;
+ }
+
+ /**
+ * Set value.
+ *
+ * @throw IgniteError with IgniteError::IGNITE_ERR_FUTURE_STATE if error or value has been set already.
+ * @param val Value to set.
+ */
+ void SetValue(const concurrent::SharedPointer<ValueType>& val)
+ {
+ concurrent::CsLockGuard guard(mutex);
+
+ if (IsSet())
+ {
+ if (value.IsValid())
+ throw IgniteError(IgniteError::IGNITE_ERR_FUTURE_STATE, "Future value already set");
+
+ if (error.GetCode() != IgniteError::IGNITE_SUCCESS)
+ throw IgniteError(IgniteError::IGNITE_ERR_FUTURE_STATE, "Future error already set");
+ }
+
+ value = val;
+
+ cond.NotifyAll();
+ }
+
+ /**
+ * Set error.
+ *
+ * @throw IgniteError with IgniteError::IGNITE_ERR_FUTURE_STATE if error or value has been set already.
+ * @param err Error to set.
+ */
+ void SetError(const IgniteError& err)
+ {
+ concurrent::CsLockGuard guard(mutex);
+
+ if (IsSet())
+ {
+ if (value.IsValid())
+ throw IgniteError(IgniteError::IGNITE_ERR_FUTURE_STATE, "Future value already set");
+
+ if (error.GetCode() != IgniteError::IGNITE_SUCCESS)
+ throw IgniteError(IgniteError::IGNITE_ERR_FUTURE_STATE, "Future error already set");
+ }
+
+ error = err;
+
+ cond.NotifyAll();
+ }
+
+ /**
+ * Wait for value to be set.
+ * Active thread will be blocked until value or error will be set.
+ */
+ void Wait() const
+ {
+ concurrent::CsLockGuard guard(mutex);
+
+ while (!IsSet())
+ cond.Wait(mutex);
+ }
+
+ /**
+ * Wait for value to be set for specified time.
+ * Active thread will be blocked until value or error will be set or timeout will end.
+ *
+ * @param msTimeout Timeout in milliseconds.
+ * @return True if the object has been triggered and false in case of timeout.
+ */
+ bool WaitFor(int32_t msTimeout) const
+ {
+ concurrent::CsLockGuard guard(mutex);
+
+ if (IsSet())
+ return true;
+
+ return cond.WaitFor(mutex, msTimeout);
+ }
+
+ /**
+ * Get the set value.
+ * Active thread will be blocked until value or error will be set.
+ *
+ * @throw IgniteError if error has been set.
+ * @return Value that has been set on success.
+ */
+ concurrent::SharedPointer<ValueType> GetValue() const
+ {
+ Wait();
+
+ if (value.IsValid())
+ return value;
+
+ assert(error.GetCode() != IgniteError::IGNITE_SUCCESS);
+
+ throw error;
+ }
+
+ /**
+ * Set cancel target.
+ */
+ void SetCancelTarget(std::auto_ptr<Cancelable>& target)
+ {
+ concurrent::CsLockGuard guard(mutex);
+
+ cancelTarget = target;
+ }
+
+ /**
+ * Cancel related operation.
+ */
+ void Cancel()
+ {
+ concurrent::CsLockGuard guard(mutex);
+
+ if (cancelTarget.get())
+ cancelTarget->Cancel();
+ }
+
+ private:
+ IGNITE_NO_COPY_ASSIGNMENT(SharedState);
+
+ /** Cancel target. */
+ std::auto_ptr<Cancelable> cancelTarget;
+
+ /** Value. */
+ concurrent::SharedPointer<ValueType> value;
+
+ /** Error. */
+ IgniteError error;
+
+ /** Condition variable which serves to signal that value is set. */
+ mutable concurrent::ConditionVariable cond;
+
+ /** Lock that used to prevent double-set of the value. */
+ mutable concurrent::CriticalSection mutex;
+ };
}
}
diff --git a/modules/platforms/cpp/common/include/ignite/common/utils.h b/modules/platforms/cpp/common/include/ignite/common/utils.h
index 9d17d65..f516ffc 100644
--- a/modules/platforms/cpp/common/include/ignite/common/utils.h
+++ b/modules/platforms/cpp/common/include/ignite/common/utils.h
@@ -594,7 +594,7 @@ namespace ignite
};
/**
- * Deinit guard class template.
+ * De-init guard class template.
*
* Upon destruction calls provided deinit function on provided instance.
*
@@ -655,6 +655,57 @@ namespace ignite
* @return Full name.
*/
IGNITE_IMPORT_EXPORT std::string GetDynamicLibraryName(const char* name);
+
+ /**
+ * Get hex dump of binary data in string form.
+ * @param data Data.
+ * @param count Number of bytes.
+ * @return Hex dump string.
+ */
+ IGNITE_IMPORT_EXPORT std::string HexDump(const void* data, size_t count);
+
+ /**
+ * Fibonacci sequence iterator.
+ *
+ * @tparam S Sequence length. Should be >= 2.
+ */
+ template<size_t S>
+ class FibonacciSequence
+ {
+ public:
+ /** Size. */
+ static const size_t size = S > 2 ? S : 2;
+
+ /**
+ * Constructor.
+ */
+ FibonacciSequence()
+ {
+ sequence[0] = 0;
+ sequence[1] = 1;
+
+ for (size_t i = 2; i < size; ++i)
+ sequence[i] = sequence[i - 1] + sequence[i - 2];
+ }
+
+ /**
+ * Get n-th or max member of sequence.
+ *
+ * @param n Member position.
+ * @return N-th member of sequence if n < size, or max member.
+ */
+ size_t GetValue(size_t n) const
+ {
+ if (n < size)
+ return sequence[n];
+
+ return sequence[size-1];
+ }
+
+ private:
+ /** Sequence of fibonacci numbers */
+ size_t sequence[size];
+ };
}
}
diff --git a/modules/platforms/cpp/common/include/ignite/future.h b/modules/platforms/cpp/common/include/ignite/future.h
index f709797..0b15363 100644
--- a/modules/platforms/cpp/common/include/ignite/future.h
+++ b/modules/platforms/cpp/common/include/ignite/future.h
@@ -279,6 +279,131 @@ namespace ignite
/** Shared state. */
common::concurrent::SharedPointer< common::SharedState<ValueType> > state;
};
+
+ /**
+ * Specialization for shared pointer.
+ */
+ template<typename T>
+ class Future< common::concurrent::SharedPointer<T> >
+ {
+ friend class common::Promise< common::concurrent::SharedPointer<T> >;
+
+ public:
+ /** Template value type */
+ typedef T ValueType;
+
+ /** Template value type shared pointer */
+ typedef common::concurrent::SharedPointer<ValueType> SP_ValueType;
+
+ /**
+ * Copy constructor.
+ *
+ * @param src Instance to copy.
+ */
+ Future(const Future<SP_ValueType>& src) :
+ state(src.state)
+ {
+ // No-op.
+ }
+
+ /**
+ * Assignment operator.
+ *
+ * @param other Other instance.
+ * @return *this.
+ */
+ Future& operator=(const Future<SP_ValueType>& other)
+ {
+ if (this != &other)
+ state = other.state;
+
+ return *this;
+ }
+
+ /**
+ * Wait for value to be set.
+ * Active thread will be blocked until value or error will be set.
+ */
+ void Wait() const
+ {
+ const common::SharedState<SP_ValueType>* state0 = state.Get();
+
+ assert(state0 != 0);
+
+ state0->Wait();
+ }
+
+ /**
+ * Wait for value to be set for specified time.
+ * Active thread will be blocked until value or error will be set or timeout will end.
+ *
+ * @param msTimeout Timeout in milliseconds.
+ * @return True if the object has been triggered and false in case of timeout.
+ */
+ bool WaitFor(int32_t msTimeout) const
+ {
+ const common::SharedState<SP_ValueType>* state0 = state.Get();
+
+ assert(state0 != 0);
+
+ return state0->WaitFor(msTimeout);
+ }
+
+ /**
+ * Get the set value.
+ * Active thread will be blocked until value or error will be set.
+ *
+ * @throw IgniteError if error has been set.
+ * @return Value that has been set on success.
+ */
+ SP_ValueType GetValue() const
+ {
+ const common::SharedState<SP_ValueType>* state0 = state.Get();
+
+ assert(state0 != 0);
+
+ return state0->GetValue();
+ }
+
+ /**
+ * Cancel related operation.
+ */
+ void Cancel()
+ {
+ common::SharedState<SP_ValueType>* state0 = state.Get();
+
+ assert(state0 != 0);
+
+ state0->Cancel();
+ }
+
+ /**
+ * Check if the future ready.
+ */
+ bool IsReady()
+ {
+ common::SharedState<SP_ValueType>* state0 = state.Get();
+
+ assert(state0 != 0);
+
+ return state0->IsSet();
+ }
+
+ private:
+ /**
+ * Constructor.
+ *
+ * @param state0 Shared state instance.
+ */
+ Future(common::concurrent::SharedPointer< common::SharedState<SP_ValueType> > state0) :
+ state(state0)
+ {
+ // No-op.
+ }
+
+ /** Shared state. */
+ common::concurrent::SharedPointer< common::SharedState<SP_ValueType> > state;
+ };
}
#endif //_IGNITE_FUTURE
diff --git a/modules/platforms/cpp/common/os/linux/include/ignite/common/concurrent_os.h b/modules/platforms/cpp/common/os/linux/include/ignite/common/concurrent_os.h
index 66f6656..43ffb02 100644
--- a/modules/platforms/cpp/common/os/linux/include/ignite/common/concurrent_os.h
+++ b/modules/platforms/cpp/common/os/linux/include/ignite/common/concurrent_os.h
@@ -691,6 +691,49 @@ namespace ignite
/** State. */
bool state;
};
+
+ /**
+ * Thread.
+ */
+ class IGNITE_IMPORT_EXPORT Thread
+ {
+ public:
+ /**
+ * Constructor.
+ */
+ Thread();
+
+ /**
+ * Destructor.
+ */
+ virtual ~Thread();
+
+ /**
+ * Run thread.
+ */
+ virtual void Run() = 0;
+
+ /**
+ * Start thread.
+ */
+ virtual void Start();
+
+ /**
+ * Join thread.
+ */
+ virtual void Join();
+
+ private:
+ /**
+ * Routine.
+ * @param lpParam Param.
+ * @return Return code.
+ */
+ static void* ThreadRoutine(void* arg);
+
+ /** Thread handle. */
+ pthread_t thread;
+ };
}
}
}
diff --git a/modules/platforms/cpp/common/os/linux/src/common/concurrent_os.cpp b/modules/platforms/cpp/common/os/linux/src/common/concurrent_os.cpp
index 7b48b9b..ff36c55 100644
--- a/modules/platforms/cpp/common/os/linux/src/common/concurrent_os.cpp
+++ b/modules/platforms/cpp/common/os/linux/src/common/concurrent_os.cpp
@@ -203,6 +203,39 @@ namespace ignite
pthread_setspecific(tlsKey, ptr);
}
+
+ Thread::Thread() :
+ thread()
+ {
+ // No-op.
+ }
+
+ Thread::~Thread()
+ {
+ // No-op.
+ }
+
+ void* Thread::ThreadRoutine(void* arg)
+ {
+ Thread* self = static_cast<Thread*>(arg);
+
+ self->Run();
+
+ return 0;
+ }
+
+ void Thread::Start()
+ {
+ int res = pthread_create(&thread, NULL, Thread::ThreadRoutine, this);
+
+ IGNITE_UNUSED(res);
+ assert(res == 0);
+ }
+
+ void Thread::Join()
+ {
+ pthread_join(thread, 0);
+ }
}
}
}
diff --git a/modules/platforms/cpp/common/os/win/include/ignite/common/concurrent_os.h b/modules/platforms/cpp/common/os/win/include/ignite/common/concurrent_os.h
index b1e8916..4c0e4a2 100644
--- a/modules/platforms/cpp/common/os/win/include/ignite/common/concurrent_os.h
+++ b/modules/platforms/cpp/common/os/win/include/ignite/common/concurrent_os.h
@@ -600,6 +600,49 @@ namespace ignite
/** Event handle. */
HANDLE handle;
};
+
+ /**
+ * Thread.
+ */
+ class IGNITE_IMPORT_EXPORT Thread
+ {
+ public:
+ /**
+ * Constructor.
+ */
+ Thread();
+
+ /**
+ * Destructor.
+ */
+ virtual ~Thread();
+
+ /**
+ * Run thread.
+ */
+ virtual void Run() = 0;
+
+ /**
+ * Start thread.
+ */
+ virtual void Start();
+
+ /**
+ * Join thread.
+ */
+ virtual void Join();
+
+ private:
+ /**
+ * Routine.
+ * @param lpParam Param.
+ * @return Return code.
+ */
+ static DWORD WINAPI ThreadRoutine(LPVOID lpParam);
+
+ /** Thread handle. */
+ HANDLE handle;
+ };
}
}
}
diff --git a/modules/platforms/cpp/common/os/win/src/common/concurrent_os.cpp b/modules/platforms/cpp/common/os/win/src/common/concurrent_os.cpp
index 15a015c..8d2d29b 100644
--- a/modules/platforms/cpp/common/os/win/src/common/concurrent_os.cpp
+++ b/modules/platforms/cpp/common/os/win/src/common/concurrent_os.cpp
@@ -207,6 +207,38 @@ namespace ignite
{
TlsSetValue(winTlsIdx, ptr);
}
+
+ Thread::Thread() :
+ handle(NULL)
+ {
+ // No-op.
+ }
+
+ Thread::~Thread()
+ {
+ CloseHandle(handle);
+ }
+
+ DWORD Thread::ThreadRoutine(LPVOID lpParam)
+ {
+ Thread* self = static_cast<Thread*>(lpParam);
+
+ self->Run();
+
+ return 0;
+ }
+
+ void Thread::Start()
+ {
+ handle = CreateThread(NULL, 0, Thread::ThreadRoutine, this, 0, NULL);
+
+ assert(handle != NULL);
+ }
+
+ void Thread::Join()
+ {
+ WaitForSingleObject(handle, INFINITE);
+ }
}
}
}
diff --git a/modules/platforms/cpp/common/src/common/utils.cpp b/modules/platforms/cpp/common/src/common/utils.cpp
index 6a5ee63..28433e82 100644
--- a/modules/platforms/cpp/common/src/common/utils.cpp
+++ b/modules/platforms/cpp/common/src/common/utils.cpp
@@ -15,6 +15,8 @@
* limitations under the License.
*/
+#include <iomanip>
+
#include <ignite/common/utils.h>
namespace ignite
@@ -212,5 +214,20 @@ namespace ignite
return i == val.end();
}
+ std::string HexDump(const void *data, size_t count)
+ {
+ std::stringstream dump;
+ size_t cnt = 0;
+ for(const uint8_t* p = (const uint8_t*)data, *e = (const uint8_t*)data + count; p != e; ++p)
+ {
+ if (cnt++ % 16 == 0)
+ {
+ dump << std::endl;
+ }
+ dump << std::hex << std::setfill('0') << std::setw(2) << (int)*p << " ";
+ }
+ return dump.str();
+ }
+
}
}
diff --git a/modules/platforms/cpp/network/CMakeLists.txt b/modules/platforms/cpp/network/CMakeLists.txt
index e8e9703..cae7b42 100644
--- a/modules/platforms/cpp/network/CMakeLists.txt
+++ b/modules/platforms/cpp/network/CMakeLists.txt
@@ -23,20 +23,37 @@ find_package(OpenSSL REQUIRED)
include_directories(include src ${OPENSSL_INCLUDE_DIR})
-set(SOURCES src/network/network.cpp
+set(SOURCES
+ src/network/async_client_pool_adapter.cpp
+ src/network/error_handling_filter.cpp
+ src/network/codec_data_filter.cpp
+ src/network/data_buffer.cpp
+ src/network/length_prefix_codec.cpp
+ src/network/network.cpp
+ src/network/ssl/secure_data_filter.cpp
src/network/ssl/secure_socket_client.cpp
src/network/ssl/ssl_gateway.cpp)
if (WIN32)
include_directories(os/win/src)
- list(APPEND SOURCES os/win/src/network/tcp_socket_client.cpp
+ list(APPEND SOURCES
+ os/win/src/network/tcp_socket_client.cpp
os/win/src/network/sockets.cpp
- os/win/src/network/utils.cpp)
+ os/win/src/network/utils.cpp
+ os/win/src/network/win_async_client.cpp
+ os/win/src/network/win_async_client_pool.cpp
+ os/win/src/network/win_async_connecting_thread.cpp
+ os/win/src/network/win_async_worker_thread.cpp)
else()
include_directories(os/linux/src)
- list(APPEND SOURCES os/linux/src/network/tcp_socket_client.cpp
+ list(APPEND SOURCES
+ os/linux/src/network/connecting_context.cpp
+ os/linux/src/network/linux_async_client.cpp
+ os/linux/src/network/linux_async_client_pool.cpp
+ os/linux/src/network/linux_async_worker_thread.cpp
+ os/linux/src/network/tcp_socket_client.cpp
os/linux/src/network/sockets.cpp
os/linux/src/network/utils.cpp)
endif()
@@ -60,9 +77,9 @@ foreach(_target_lib IN LISTS _target_libs)
if (${_target_lib} STREQUAL ${TARGET}-objlib)
set_target_properties(${_target_lib} PROPERTIES POSITION_INDEPENDENT_CODE 1)
- target_link_libraries(${_target_lib} ignite-common-objlib)
+ target_link_libraries(${_target_lib} ignite-common-objlib ignite-binary-objlib)
else()
- target_link_libraries(${_target_lib} ignite-common)
+ target_link_libraries(${_target_lib} ignite-common ignite-binary)
endif()
if (WIN32)
diff --git a/modules/platforms/cpp/network/include/ignite/network/async_client_pool.h b/modules/platforms/cpp/network/include/ignite/network/async_client_pool.h
new file mode 100644
index 0000000..9ecf20e
--- /dev/null
+++ b/modules/platforms/cpp/network/include/ignite/network/async_client_pool.h
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_NETWORK_ASYNC_CLIENT_POOL
+#define _IGNITE_NETWORK_ASYNC_CLIENT_POOL
+
+#include <stdint.h>
+
+#include <vector>
+
+#include <ignite/ignite_error.h>
+#include <ignite/impl/interop/interop_memory.h>
+
+#include <ignite/network/async_handler.h>
+#include <ignite/network/data_filter.h>
+#include <ignite/network/data_sink.h>
+#include <ignite/network/tcp_range.h>
+
+namespace ignite
+{
+ namespace network
+ {
+ /**
+ * Asynchronous client pool.
+ */
+ class IGNITE_IMPORT_EXPORT AsyncClientPool : public DataSink
+ {
+ public:
+ /**
+ * Destructor.
+ */
+ virtual ~AsyncClientPool()
+ {
+ // No-op.
+ }
+
+ /**
+ * Start internal thread that establishes connections to provided addresses and asynchronously sends and
+ * receives messages from them. Function returns either when thread is started and first connection is
+ * established or failure happened.
+ *
+ * @param addrs Addresses to connect to.
+ * @param connLimit Connection upper limit. Zero means limit is disabled.
+ *
+ * @throw IgniteError on error.
+ */
+ virtual void Start(const std::vector<TcpRange>& addrs, uint32_t connLimit) = 0;
+
+ /**
+ * Close all established connections and stops handling threads.
+ */
+ virtual void Stop() = 0;
+
+ /**
+ * Set handler.
+ *
+ * @param handler Handler to set.
+ */
+ virtual void SetHandler(AsyncHandler *handler) = 0;
+ };
+
+ // Type alias
+ typedef common::concurrent::SharedPointer<AsyncClientPool> SP_AsyncClientPool;
+ }
+}
+
+#endif //_IGNITE_NETWORK_ASYNC_CLIENT_POOL
\ No newline at end of file
diff --git a/modules/platforms/cpp/network/include/ignite/network/async_handler.h b/modules/platforms/cpp/network/include/ignite/network/async_handler.h
new file mode 100644
index 0000000..ecb1d36
--- /dev/null
+++ b/modules/platforms/cpp/network/include/ignite/network/async_handler.h
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_NETWORK_ASYNC_HANDLER
+#define _IGNITE_NETWORK_ASYNC_HANDLER
+
+#include <stdint.h>
+
+#include <ignite/ignite_error.h>
+#include <ignite/network/end_point.h>
+#include <ignite/network/data_buffer.h>
+
+namespace ignite
+{
+ namespace network
+ {
+ /**
+ * Asynchronous events handler.
+ */
+ class IGNITE_IMPORT_EXPORT AsyncHandler
+ {
+ public:
+ /**
+ * Destructor.
+ */
+ virtual ~AsyncHandler()
+ {
+ // No-op.
+ }
+
+ /**
+ * Callback that called on successful connection establishment.
+ *
+ * @param addr Address of the new connection.
+ * @param id Connection ID.
+ */
+ virtual void OnConnectionSuccess(const EndPoint& addr, uint64_t id) = 0;
+
+ /**
+ * Callback that called on error during connection establishment.
+ *
+ * @param addr Connection address.
+ * @param err Error.
+ */
+ virtual void OnConnectionError(const EndPoint& addr, const IgniteError& err) = 0;
+
+ /**
+ * Callback that called on error during connection establishment.
+ *
+ * @param id Async client ID.
+ * @param err Error. Can be null if connection closed without error.
+ */
+ virtual void OnConnectionClosed(uint64_t id, const IgniteError* err) = 0;
+
+ /**
+ * Callback that called when new message is received.
+ *
+ * @param id Async client ID.
+ * @param msg Received message.
+ */
+ virtual void OnMessageReceived(uint64_t id, const DataBuffer& msg) = 0;
+
+ /**
+ * Callback that called when message is sent.
+ *
+ * @param id Async client ID.
+ */
+ virtual void OnMessageSent(uint64_t id) = 0;
+ };
+ }
+}
+
+#endif //_IGNITE_NETWORK_ASYNC_HANDLER
\ No newline at end of file
diff --git a/modules/platforms/cpp/network/include/ignite/network/codec.h b/modules/platforms/cpp/network/include/ignite/network/codec.h
new file mode 100644
index 0000000..b8938c0
--- /dev/null
+++ b/modules/platforms/cpp/network/include/ignite/network/codec.h
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_NETWORK_CODEC
+#define _IGNITE_NETWORK_CODEC
+
+#include <ignite/ignite_error.h>
+
+#include <ignite/common/factory.h>
+#include <ignite/impl/interop/interop_memory.h>
+
+#include <ignite/network/data_buffer.h>
+
+namespace ignite
+{
+ namespace network
+ {
+ /**
+ * Codec class.
+ * Encodes and decodes data.
+ */
+ class IGNITE_IMPORT_EXPORT Codec
+ {
+ public:
+ /**
+ * Destructor.
+ */
+ virtual ~Codec()
+ {
+ // No-op.
+ }
+
+ /**
+ * Encode provided data.
+ *
+ * @param data Data to encode.
+ * @return Encoded data. Returning null is ok.
+ *
+ * @throw IgniteError on error.
+ */
+ virtual DataBuffer Encode(DataBuffer& data) = 0;
+
+ /**
+ * Decode provided data.
+ *
+ * @param data Data to decode.
+ * @return Decoded data. Returning null means data is not yet ready.
+ *
+ * @throw IgniteError on error.
+ */
+ virtual DataBuffer Decode(DataBuffer& data) = 0;
+ };
+
+ // Shared pointer codec type alias.
+ typedef common::concurrent::SharedPointer<Codec> SP_Codec;
+
+ /** Codec factory. */
+ typedef common::Factory<Codec> CodecFactory;
+
+ // Shared pointer to codec factory type alias.
+ typedef common::concurrent::SharedPointer<CodecFactory> SP_CodecFactory;
+ }
+}
+
+#endif //_IGNITE_NETWORK_CODEC
\ No newline at end of file
diff --git a/modules/platforms/cpp/network/include/ignite/network/codec_data_filter.h b/modules/platforms/cpp/network/include/ignite/network/codec_data_filter.h
new file mode 100644
index 0000000..feaeddb
--- /dev/null
+++ b/modules/platforms/cpp/network/include/ignite/network/codec_data_filter.h
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_NETWORK_CODEC_DATA_FILTER
+#define _IGNITE_NETWORK_CODEC_DATA_FILTER
+
+#include <map>
+
+#include <ignite/common/concurrent.h>
+
+#include <ignite/network/codec.h>
+#include <ignite/network/data_filter_adapter.h>
+
+namespace ignite
+{
+ namespace network
+ {
+ /**
+ * Data filter that uses codecs inside to encode/decode data.
+ */
+ class IGNITE_IMPORT_EXPORT CodecDataFilter : public DataFilterAdapter
+ {
+ public:
+ /**
+ * Constructor.
+ *
+ * @param factory Codec factory.
+ */
+ explicit CodecDataFilter(const SP_CodecFactory& factory);
+
+ /**
+ * Destructor.
+ */
+ virtual ~CodecDataFilter();
+
+ /**
+ * Send data to specific established connection.
+ *
+ * @param id Client ID.
+ * @param data Data to be sent.
+ * @return @c true if connection is present and @c false otherwise.
+ *
+ * @throw IgniteError on error.
+ */
+ virtual bool Send(uint64_t id, const DataBuffer& data);
+
+ /**
+ * Callback that called on successful connection establishment.
+ *
+ * @param addr Address of the new connection.
+ * @param id Connection ID.
+ */
+ virtual void OnConnectionSuccess(const EndPoint& addr, uint64_t id);
+
+ /**
+ * Callback that called on error during connection establishment.
+ *
+ * @param id Async client ID.
+ * @param err Error. Can be null if connection closed without error.
+ */
+ virtual void OnConnectionClosed(uint64_t id, const IgniteError* err);
+
+ /**
+ * Callback that called when new message is received.
+ *
+ * @param id Async client ID.
+ * @param msg Received message.
+ */
+ virtual void OnMessageReceived(uint64_t id, const DataBuffer& msg);
+
+ private:
+ /** Codec map. */
+ typedef std::map<uint64_t, SP_Codec> CodecMap;
+
+ /**
+ * Get codec for connection.
+ *
+ * @param id Connection ID.
+ * @return Codec if found or null.
+ */
+ SP_Codec FindCodec(uint64_t id);
+
+ /** Codec factory. */
+ SP_CodecFactory codecFactory;
+
+ /** Codecs. */
+ CodecMap* codecs;
+
+ /** Mutex for secure access to codecs map. */
+ common::concurrent::CriticalSection codecsCs;
+ };
+
+ // Shared pointer type alias.
+ typedef common::concurrent::SharedPointer<CodecDataFilter> SP_CodecDataFilter;
+ }
+}
+
+#endif //_IGNITE_NETWORK_CODEC_DATA_FILTER
diff --git a/modules/platforms/cpp/network/include/ignite/network/data_buffer.h b/modules/platforms/cpp/network/include/ignite/network/data_buffer.h
new file mode 100644
index 0000000..cf2a3d9
--- /dev/null
+++ b/modules/platforms/cpp/network/include/ignite/network/data_buffer.h
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_NETWORK_DATA_BUFFER
+#define _IGNITE_NETWORK_DATA_BUFFER
+
+#include <ignite/impl/interop/interop_memory.h>
+#include <ignite/impl/interop/interop_input_stream.h>
+
+namespace ignite
+{
+ namespace network
+ {
+ /**
+ * Data buffer.
+ */
+ class IGNITE_IMPORT_EXPORT DataBuffer
+ {
+ public:
+ /**
+ * Default constructor.
+ */
+ DataBuffer();
+
+ /**
+ * Constructor.
+ *
+ * @param data Data.
+ */
+ explicit DataBuffer(const impl::interop::SP_ConstInteropMemory& data);
+
+ /**
+ * Constructor.
+ *
+ * @param data Data.
+ * @param pos Start of data.
+ * @param len Length.
+ */
+ DataBuffer(const impl::interop::SP_ConstInteropMemory& data, int32_t pos, int32_t len);
+
+ /**
+ * Destructor.
+ */
+ ~DataBuffer()
+ {
+ // No-op.
+ }
+
+ /**
+ * Consume data from buffer to specified place in memory.
+ *
+ * @param dst Destination in memory.
+ * @param size Number of bytes to copy.
+ */
+ void Consume(int8_t* dst, int32_t size);
+
+ /**
+ * Get data pointer.
+ *
+ * @return Data.
+ */
+ const int8_t* GetData() const;
+
+ /**
+ * Get packet size.
+ *
+ * @return Packet size.
+ */
+ int32_t GetSize() const;
+
+ /**
+ * Check whether data buffer was fully consumed.
+ *
+ * @return @c true if consumed and @c false otherwise.
+ */
+ bool IsEmpty() const;
+
+ /**
+ * Consume the whole buffer.
+ *
+ * @return Buffer containing consumed data.
+ */
+ DataBuffer ConsumeEntirely();
+
+ /**
+ * Get input stream for a data buffer.
+ * @return Stream set up to read data from buffer.
+ */
+ impl::interop::InteropInputStream GetInputStream() const;
+
+ /**
+ * Clone underlying buffer into a new one.
+ *
+ * @return New data buffer.
+ */
+ DataBuffer Clone() const;
+
+ /**
+ * Skip specified number of bytes.
+ *
+ * @param bytes Bytes to skip.
+ */
+ void Skip(int32_t bytes);
+
+ private:
+ /**
+ * Advance position in packet by specified value.
+ *
+ * @param val Bytes to advance.
+ */
+ void Advance(int32_t val);
+
+ /** Position in current data. */
+ int32_t position;
+
+ /** Data length. */
+ int32_t length;
+
+ /** Data */
+ impl::interop::SP_ConstInteropMemory data;
+ };
+ }
+}
+
+#endif //_IGNITE_NETWORK_DATA_BUFFER
diff --git a/modules/platforms/cpp/network/include/ignite/network/data_filter.h b/modules/platforms/cpp/network/include/ignite/network/data_filter.h
new file mode 100644
index 0000000..cb16fc9
--- /dev/null
+++ b/modules/platforms/cpp/network/include/ignite/network/data_filter.h
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_NETWORK_DATA_FILTER
+#define _IGNITE_NETWORK_DATA_FILTER
+
+#include <ignite/network/data_sink.h>
+#include <ignite/network/async_handler.h>
+
+namespace ignite
+{
+ namespace network
+ {
+ /**
+ * Data buffer.
+ */
+ class IGNITE_IMPORT_EXPORT DataFilter : public DataSink, public AsyncHandler
+ {
+ public:
+ /**
+ * Default constructor.
+ */
+ DataFilter() :
+ sink(0),
+ handler(0)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~DataFilter()
+ {
+ // No-op.
+ }
+
+ /**
+ * Set sink.
+ *
+ * @param sink Data sink
+ */
+ void SetSink(DataSink* sink)
+ {
+ this->sink = sink;
+ }
+
+ /**
+ * Get sink.
+ *
+ * @return Data sink.
+ */
+ DataSink* GetSink()
+ {
+ return sink;
+ }
+
+ /**
+ * Set handler.
+ *
+ * @param handler Event handler.
+ */
+ void SetHandler(AsyncHandler* handler)
+ {
+ this->handler = handler;
+ }
+
+ /**
+ * Get handler.
+ *
+ * @return Event handler.
+ */
+ AsyncHandler* GetHandler()
+ {
+ return handler;
+ }
+
+ protected:
+ /** Sink. */
+ DataSink* sink;
+
+ /** Handler. */
+ AsyncHandler* handler;
+ };
+
+ // Shared pointer type alias.
+ typedef common::concurrent::SharedPointer<DataFilter> SP_DataFilter;
+ }
+}
+
+#endif //_IGNITE_NETWORK_DATA_FILTER
diff --git a/modules/platforms/cpp/network/include/ignite/network/data_filter_adapter.h b/modules/platforms/cpp/network/include/ignite/network/data_filter_adapter.h
new file mode 100644
index 0000000..2b2d0c8
--- /dev/null
+++ b/modules/platforms/cpp/network/include/ignite/network/data_filter_adapter.h
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_NETWORK_DATA_FILTER_ADAPTER
+#define _IGNITE_NETWORK_DATA_FILTER_ADAPTER
+
+#include <ignite/network/data_filter.h>
+
+namespace ignite
+{
+ namespace network
+ {
+ /**
+ * Data buffer.
+ */
+ class IGNITE_IMPORT_EXPORT DataFilterAdapter : public DataFilter
+ {
+ public:
+ /**
+ * Default constructor.
+ */
+ DataFilterAdapter()
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~DataFilterAdapter()
+ {
+ // No-op.
+ }
+
+ /**
+ * Send data to specific established connection.
+ *
+ * @param id Client ID.
+ * @param data Data to be sent.
+ * @return @c true if connection is present and @c false otherwise.
+ *
+ * @throw IgniteError on error.
+ */
+ virtual bool Send(uint64_t id, const DataBuffer& data)
+ {
+ DataSink* sink0 = sink;
+ if (sink0)
+ return sink0->Send(id, data);
+
+ return false;
+ }
+
+ /**
+ * Closes specified connection if it's established. Connection to the specified address is planned for
+ * re-connect. Error is reported to handler.
+ *
+ * @param id Client ID.
+ */
+ virtual void Close(uint64_t id, const IgniteError* err)
+ {
+ DataSink* sink0 = sink;
+ if (sink0)
+ sink0->Close(id, err);
+ }
+
+ /**
+ * Callback that called on successful connection establishment.
+ *
+ * @param addr Address of the new connection.
+ * @param id Connection ID.
+ */
+ virtual void OnConnectionSuccess(const EndPoint& addr, uint64_t id)
+ {
+ AsyncHandler* handler0 = handler;
+ if (handler0)
+ handler0->OnConnectionSuccess(addr, id);
+ }
+
+ /**
+ * Callback that called on error during connection establishment.
+ *
+ * @param addr Connection address.
+ * @param err Error.
+ */
+ virtual void OnConnectionError(const EndPoint& addr, const IgniteError& err)
+ {
+ AsyncHandler* handler0 = handler;
+ if (handler0)
+ handler0->OnConnectionError(addr, err);
+ }
+
+ /**
+ * Callback that called on error during connection establishment.
+ *
+ * @param id Async client ID.
+ * @param err Error. Can be null if connection closed without error.
+ */
+ virtual void OnConnectionClosed(uint64_t id, const IgniteError* err)
+ {
+ AsyncHandler* handler0 = handler;
+ if (handler0)
+ handler0->OnConnectionClosed(id, err);
+ }
+
+ /**
+ * Callback that called when new message is received.
+ *
+ * @param id Async client ID.
+ * @param msg Received message.
+ */
+ virtual void OnMessageReceived(uint64_t id, const DataBuffer& msg)
+ {
+ AsyncHandler* handler0 = handler;
+ if (handler0)
+ handler0->OnMessageReceived(id, msg);
+ }
+
+ /**
+ * Callback that called when message is sent.
+ *
+ * @param id Async client ID.
+ */
+ virtual void OnMessageSent(uint64_t id)
+ {
+ AsyncHandler* handler0 = handler;
+ if (handler0)
+ handler0->OnMessageSent(id);
+ }
+ };
+ }
+}
+
+#endif //_IGNITE_NETWORK_DATA_FILTER_ADAPTER
diff --git a/modules/platforms/cpp/network/include/ignite/network/data_sink.h b/modules/platforms/cpp/network/include/ignite/network/data_sink.h
new file mode 100644
index 0000000..76ab774
--- /dev/null
+++ b/modules/platforms/cpp/network/include/ignite/network/data_sink.h
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_NETWORK_DATA_SINK
+#define _IGNITE_NETWORK_DATA_SINK
+
+#include <ignite/ignite_error.h>
+
+#include <ignite/network/data_buffer.h>
+
+namespace ignite
+{
+ namespace network
+ {
+ /**
+ * Data sink. Can consume data.
+ */
+ class IGNITE_IMPORT_EXPORT DataSink
+ {
+ public:
+ /**
+ * Destructor.
+ */
+ virtual ~DataSink()
+ {
+ // No-op.
+ }
+
+ /**
+ * Send data to specific established connection.
+ *
+ * @param id Client ID.
+ * @param data Data to be sent.
+ * @return @c true if connection is present and @c false otherwise.
+ *
+ * @throw IgniteError on error.
+ */
+ virtual bool Send(uint64_t id, const DataBuffer& data) = 0;
+
+ /**
+ * Closes specified connection if it's established. Connection to the specified address is planned for
+ * re-connect. Error is reported to handler.
+ *
+ * @param id Client ID.
+ */
+ virtual void Close(uint64_t id, const IgniteError* err) = 0;
+ };
+ }
+}
+
+#endif //_IGNITE_NETWORK_DATA_SINK
diff --git a/modules/platforms/cpp/network/include/ignite/network/end_point.h b/modules/platforms/cpp/network/include/ignite/network/end_point.h
index 499cbeb..a1f4945 100644
--- a/modules/platforms/cpp/network/include/ignite/network/end_point.h
+++ b/modules/platforms/cpp/network/include/ignite/network/end_point.h
@@ -20,6 +20,8 @@
#include <stdint.h>
#include <string>
+#include <sstream>
+#include <vector>
namespace ignite
{
@@ -53,6 +55,19 @@ namespace ignite
}
/**
+ * Convert to string.
+ *
+ * @return String form.
+ */
+ std::string ToString() const
+ {
+ std::stringstream ss;
+ ss << host << ':' << port;
+
+ return ss.str();
+ }
+
+ /**
* Compare to another instance.
*
* @param other Another instance.
diff --git a/modules/platforms/cpp/network/include/ignite/network/length_prefix_codec.h b/modules/platforms/cpp/network/include/ignite/network/length_prefix_codec.h
new file mode 100644
index 0000000..e11d367
--- /dev/null
+++ b/modules/platforms/cpp/network/include/ignite/network/length_prefix_codec.h
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_NETWORK_LENGTH_PREFIX_CODEC
+#define _IGNITE_NETWORK_LENGTH_PREFIX_CODEC
+
+#include <ignite/ignite_error.h>
+#include <ignite/impl/interop/interop_memory.h>
+
+#include <ignite/network/codec.h>
+
+namespace ignite
+{
+ namespace network
+ {
+ /**
+ * Codec that decodes messages prefixed with int32 length.
+ */
+ class IGNITE_IMPORT_EXPORT LengthPrefixCodec : public Codec
+ {
+ public:
+ enum
+ {
+ /** Packet length header size. */
+ PACKET_HEADER_SIZE = 4
+ };
+
+ /**
+ * Constructor.
+ */
+ LengthPrefixCodec();
+
+ /**
+ * Destructor.
+ */
+ virtual ~LengthPrefixCodec();
+
+ /**
+ * Encode provided data.
+ *
+ * @param data Data to encode.
+ * @return Encoded data. Returning null is ok.
+ *
+ * @throw IgniteError on error.
+ */
+ virtual DataBuffer Encode(DataBuffer& data);
+
+ /**
+ * Decode provided data.
+ *
+ * @param data Data to decode.
+ * @return Decoded data. Returning null means data is not yet ready.
+ *
+ * @throw IgniteError on error.
+ */
+ virtual DataBuffer Decode(DataBuffer& data);
+
+ private:
+ /**
+ * Consume the right amount of provided data to make packet closer to desired size.
+ *
+ * @param data Data to consume.
+ * @param desired Desired resulting size of packet.
+ */
+ void Consume(DataBuffer& data, int32_t desired);
+
+ /** Size of the current packet. */
+ int32_t packetSize;
+
+ /** Current packet */
+ impl::interop::SP_InteropMemory packet;
+ };
+
+ /**
+ * Factory for LengthPrefixCodec.
+ */
+ class IGNITE_IMPORT_EXPORT LengthPrefixCodecFactory : public CodecFactory
+ {
+ public:
+ /**
+ * Constructor.
+ */
+ LengthPrefixCodecFactory()
+ {
+ // No-op.
+ }
+
+ /**
+ * Build instance.
+ *
+ * @return New instance of type @c T.
+ */
+ virtual SP_Codec Build()
+ {
+ return SP_Codec(new LengthPrefixCodec());
+ }
+ };
+ }
+}
+
+#endif //_IGNITE_NETWORK_LENGTH_PREFIX_CODEC
\ No newline at end of file
diff --git a/modules/platforms/cpp/network/include/ignite/network/network.h b/modules/platforms/cpp/network/include/ignite/network/network.h
index 5e83cc1..17c7ebd 100644
--- a/modules/platforms/cpp/network/include/ignite/network/network.h
+++ b/modules/platforms/cpp/network/include/ignite/network/network.h
@@ -22,6 +22,7 @@
#include <ignite/common/common.h>
#include <ignite/network/socket_client.h>
+#include <ignite/network/async_client_pool.h>
namespace ignite
{
@@ -40,11 +41,6 @@ namespace ignite
IGNITE_IMPORT_EXPORT void EnsureSslLoaded();
/**
- * Make basic TCP socket.
- */
- IGNITE_IMPORT_EXPORT SocketClient* MakeTcpSocketClient();
-
- /**
* Make secure socket for SSL/TLS connection.
*
* @param certPath Certificate file path.
@@ -56,7 +52,21 @@ namespace ignite
IGNITE_IMPORT_EXPORT SocketClient* MakeSecureSocketClient(const std::string& certPath,
const std::string& keyPath, const std::string& caPath);
}
+
+ /**
+ * Make basic TCP socket.
+ */
+ IGNITE_IMPORT_EXPORT SocketClient* MakeTcpSocketClient();
+
+ /**
+ * Make asynchronous client pool.
+ *
+ * @param handler Event handler.
+ * @param filters Filters.
+ * @return Async client pool.
+ */
+ IGNITE_IMPORT_EXPORT SP_AsyncClientPool MakeAsyncClientPool(const std::vector<SP_DataFilter>& filters);
}
}
-#endif //_IGNITE_NETWORK_SSL_SSL_API
\ No newline at end of file
+#endif //_IGNITE_NETWORK_SSL_SSL_API
diff --git a/modules/platforms/cpp/network/include/ignite/network/socket_client.h b/modules/platforms/cpp/network/include/ignite/network/socket_client.h
index eb1d481..2ad8d0a 100644
--- a/modules/platforms/cpp/network/include/ignite/network/socket_client.h
+++ b/modules/platforms/cpp/network/include/ignite/network/socket_client.h
@@ -18,10 +18,9 @@
#ifndef _IGNITE_NETWORK_SOCKET_CLIENT
#define _IGNITE_NETWORK_SOCKET_CLIENT
+#include <stddef.h>
#include <stdint.h>
-#include <ignite/ignite_error.h>
-
namespace ignite
{
namespace network
@@ -97,19 +96,8 @@ namespace ignite
* @return @c true if the socket is blocking and false otherwise.
*/
virtual bool IsBlocking() const = 0;
-
- protected:
- /**
- * Throw connection error.
- *
- * @param err Error message.
- */
- static void ThrowNetworkError(const std::string& err)
- {
- throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE, err.c_str());
- }
};
}
}
-#endif //_IGNITE_NETWORK_SOCKET_CLIENT
\ No newline at end of file
+#endif //_IGNITE_NETWORK_SOCKET_CLIENT
diff --git a/modules/platforms/cpp/network/include/ignite/network/ssl/secure_data_filter.h b/modules/platforms/cpp/network/include/ignite/network/ssl/secure_data_filter.h
new file mode 100644
index 0000000..dde9548
--- /dev/null
+++ b/modules/platforms/cpp/network/include/ignite/network/ssl/secure_data_filter.h
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_NETWORK_SSL_SECURE_DATA_FILTER
+#define _IGNITE_NETWORK_SSL_SECURE_DATA_FILTER
+
+#include <map>
+
+#include <ignite/common/concurrent.h>
+#include <ignite/network/data_filter_adapter.h>
+
+
+namespace ignite
+{
+ namespace network
+ {
+ namespace ssl
+ {
+ /**
+ * TLS/SSL configuration parameters.
+ */
+ struct SecureConfiguration
+ {
+ /** Path to file containing security certificate to use. */
+ std::string certPath;
+
+ /** Path to file containing private key to use. */
+ std::string keyPath;
+
+ /** Path to file containing Certificate authority to use. */
+ std::string caPath;
+ };
+
+ /**
+ * TLS/SSL Data Filter.
+ */
+ class IGNITE_IMPORT_EXPORT SecureDataFilter : public DataFilterAdapter
+ {
+ public:
+ /**
+ * Constructor.
+ *
+ * @param cfg Configuration.
+ */
+ explicit SecureDataFilter(const SecureConfiguration& cfg);
+
+ /**
+ * Destructor.
+ */
+ virtual ~SecureDataFilter();
+
+ /**
+ * Send data to specific established connection.
+ *
+ * @param id Client ID.
+ * @param data Data to be sent.
+ * @return @c true if connection is present and @c false otherwise.
+ *
+ * @throw IgniteError on error.
+ */
+ virtual bool Send(uint64_t id, const DataBuffer& data);
+
+ /**
+ * Callback that called on successful connection establishment.
+ *
+ * @param addr Address of the new connection.
+ * @param id Connection ID.
+ */
+ virtual void OnConnectionSuccess(const EndPoint& addr, uint64_t id);
+
+ /**
+ * Callback that called on error during connection establishment.
+ *
+ * @param id Async client ID.
+ * @param err Error. Can be null if connection closed without error.
+ */
+ virtual void OnConnectionClosed(uint64_t id, const IgniteError* err);
+
+ /**
+ * Callback that called when new message is received.
+ *
+ * @param id Async client ID.
+ * @param msg Received message.
+ */
+ virtual void OnMessageReceived(uint64_t id, const DataBuffer& msg);
+
+ private:
+ IGNITE_NO_COPY_ASSIGNMENT(SecureDataFilter);
+
+ /**
+ * Secure connection context.
+ */
+ class SecureConnectionContext
+ {
+ public:
+ /**
+ * Default constructor.
+ *
+ * @param id Connection ID.
+ * @param addr Address.
+ * @param filter Filter.
+ */
+ SecureConnectionContext(uint64_t id, const EndPoint &addr, SecureDataFilter& filter);
+
+ /**
+ * Destructor.
+ */
+ ~SecureConnectionContext();
+
+ /**
+ * Start connection procedure including handshake.
+ */
+ void DoConnect();
+
+ /**
+ * Check whether connection is established.
+ *
+ * @return @c true if connection established.
+ */
+ bool IsConnected() const
+ {
+ return connected;
+ }
+
+ /**
+ * Get address.
+ *
+ * @return Address.
+ */
+ const EndPoint& GetAddress() const
+ {
+ return addr;
+ }
+
+ /**
+ * Send data.
+ *
+ * @param data Data to send.
+ * @return @c true on success.
+ */
+ bool Send(const DataBuffer& data);
+
+ /**
+ * Process new data.
+ *
+ * @param data Data received.
+ * @return @c true if connection was established.
+ */
+ bool ProcessData(DataBuffer& data);
+
+ /**
+ * Get pending decrypted data.
+ *
+ * @return Data buffer.
+ */
+ DataBuffer GetPendingDecryptedData();
+
+ private:
+ enum
+ {
+ /** Receive buffer size. */
+ RECEIVE_BUFFER_SIZE = 0x10000
+ };
+
+ /**
+ * Send pending data.
+ */
+ bool SendPendingData();
+
+ /**
+ * Get pending data.
+ *
+ * @param bio BIO to get data from.
+ * @return Data buffer.
+ */
+ static DataBuffer GetPendingData(void* bio);
+
+ /** Flag indicating that secure connection is established. */
+ bool connected;
+
+ /** Connection ID. */
+ const uint64_t id;
+
+ /** Address. */
+ const EndPoint addr;
+
+ /** Filter. */
+ SecureDataFilter& filter;
+
+ /** Receive buffer. */
+ impl::interop::SP_InteropMemory recvBuffer;
+
+ /** SSL instance. */
+ void* ssl;
+
+ /** Input BIO. */
+ void* bioIn;
+
+ /** Output BIO. */
+ void* bioOut;
+ };
+
+ // Shared pointer type alias.
+ typedef common::concurrent::SharedPointer<SecureConnectionContext> SP_SecureConnectionContext;
+
+ /** Context map. */
+ typedef std::map<uint64_t, SP_SecureConnectionContext> ContextMap;
+
+ /**
+ * Get context for connection.
+ *
+ * @param id Connection ID.
+ * @return Context if found or null.
+ */
+ SP_SecureConnectionContext FindContext(uint64_t id);
+
+ /**
+ * Send data to specific established connection.
+ *
+ * @param id Client ID.
+ * @param data Data to be sent.
+ * @return @c true if connection is present and @c false otherwise.
+ *
+ * @throw IgniteError on error.
+ */
+ bool SendInternal(uint64_t id, const DataBuffer& data);
+
+ /** Secure configuration. */
+ const SecureConfiguration cfg;
+
+ /** SSL context. */
+ void* sslContext;
+
+ /** Contexts for connections. */
+ ContextMap* contexts;
+
+ /** Mutex for secure access to context map. */
+ common::concurrent::CriticalSection contextCs;
+ };
+
+ // Shared pointer type alias.
+ typedef common::concurrent::SharedPointer<SecureDataFilter> SP_SecureDataFilter;
+ }
+ }
+}
+
+#endif //_IGNITE_NETWORK_SSL_SECURE_DATA_FILTER
diff --git a/modules/platforms/cpp/network/include/ignite/network/tcp_range.h b/modules/platforms/cpp/network/include/ignite/network/tcp_range.h
index 8644d46..a64df64 100644
--- a/modules/platforms/cpp/network/include/ignite/network/tcp_range.h
+++ b/modules/platforms/cpp/network/include/ignite/network/tcp_range.h
@@ -20,6 +20,7 @@
#include <stdint.h>
#include <string>
+#include <sstream>
namespace ignite
{
@@ -81,6 +82,16 @@ namespace ignite
}
/**
+ * Check whether empty.
+ *
+ * @return @c true if empty.
+ */
+ bool IsEmpty() const
+ {
+ return host.empty();
+ }
+
+ /**
* Comparison operator.
*
* @param val1 First value.
@@ -153,19 +164,32 @@ namespace ignite
return val1.Compare(val2) >= 0;
}
+ /**
+ * Convert to string.
+ *
+ * @return String representation.
+ */
+ std::string ToString() const
+ {
+ std::stringstream buf;
+ buf << host << ':' << port;
+
+ if (range)
+ buf << ".." << (port + range);
+
+ return buf.str();
+ }
+
/** Remote host. */
std::string host;
/** TCP port. */
uint16_t port;
- /**
- * Number of ports after the port that should be tried if
- * the previous are unavailable.
- */
+ /** Number of ports after the port that should be tried if the previous are unavailable. */
uint16_t range;
};
}
}
-#endif //_IGNITE_NETWORK_TCP_RANGE
\ No newline at end of file
+#endif //_IGNITE_NETWORK_TCP_RANGE
diff --git a/modules/platforms/cpp/network/include/ignite/network/utils.h b/modules/platforms/cpp/network/include/ignite/network/utils.h
index e8f0851..082ac97 100644
--- a/modules/platforms/cpp/network/include/ignite/network/utils.h
+++ b/modules/platforms/cpp/network/include/ignite/network/utils.h
@@ -22,6 +22,7 @@
#include <string>
#include <ignite/common/common.h>
+#include <ignite/ignite_error.h>
namespace ignite
{
@@ -35,6 +36,16 @@ namespace ignite
* @param addrs Addresses set.
*/
void IGNITE_IMPORT_EXPORT GetLocalAddresses(std::set<std::string>& addrs);
+
+ /**
+ * Throw connection error.
+ *
+ * @param err Error message.
+ */
+ inline void ThrowNetworkError(const std::string& err)
+ {
+ throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE, err.c_str());
+ }
}
}
}
diff --git a/modules/platforms/cpp/network/os/linux/src/network/connecting_context.cpp b/modules/platforms/cpp/network/os/linux/src/network/connecting_context.cpp
new file mode 100644
index 0000000..4e528ff
--- /dev/null
+++ b/modules/platforms/cpp/network/os/linux/src/network/connecting_context.cpp
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <netinet/tcp.h>
+#include <netdb.h>
+#include <sys/epoll.h>
+#include <unistd.h>
+
+#include <cstring>
+#include <iterator>
+
+#include <ignite/common/utils.h>
+#include <ignite/network/utils.h>
+
+#include "network/connecting_context.h"
+
+namespace ignite
+{
+ namespace network
+ {
+ ConnectingContext::ConnectingContext(const TcpRange &range) :
+ range(range),
+ nextPort(range.port),
+ info(0),
+ currentInfo(0)
+ {
+ // No-op.
+ }
+
+ ConnectingContext::~ConnectingContext()
+ {
+ Reset();
+ }
+
+ void ConnectingContext::Reset()
+ {
+ if (info)
+ {
+ freeaddrinfo(info);
+ info = 0;
+ currentInfo = 0;
+ }
+
+ nextPort = range.port;
+ }
+
+ addrinfo *ConnectingContext::Next()
+ {
+ if (currentInfo)
+ currentInfo = currentInfo->ai_next;
+
+ while (!currentInfo)
+ {
+ if (info)
+ {
+ freeaddrinfo(info);
+ info = 0;
+ }
+
+ if (nextPort > range.port + range.range)
+ return 0;
+
+ addrinfo hints;
+ std::memset(&hints, 0, sizeof(hints));
+
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_protocol = IPPROTO_TCP;
+
+ std::string strPort = common::LexicalCast<std::string>(nextPort);
+
+ // Resolve the server address and port
+ int res = getaddrinfo(range.host.c_str(), strPort.c_str(), &hints, &info);
+ if (res != 0)
+ return 0;
+
+ currentInfo = info;
+ ++nextPort;
+ }
+
+ return currentInfo;
+ }
+
+ EndPoint ConnectingContext::GetAddress() const
+ {
+ return EndPoint(range.host, nextPort - 1);
+ }
+
+ SP_LinuxAsyncClient ConnectingContext::ToClient(int fd)
+ {
+ return SP_LinuxAsyncClient(new LinuxAsyncClient(fd, GetAddress(), range));
+ }
+ }
+}
diff --git a/modules/platforms/cpp/network/os/linux/src/network/connecting_context.h b/modules/platforms/cpp/network/os/linux/src/network/connecting_context.h
new file mode 100644
index 0000000..ee10286
--- /dev/null
+++ b/modules/platforms/cpp/network/os/linux/src/network/connecting_context.h
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_NETWORK_CONNECTING_CONTEXT
+#define _IGNITE_NETWORK_CONNECTING_CONTEXT
+
+#include <netdb.h>
+
+#include <stdint.h>
+#include <memory>
+
+#include <ignite/network/end_point.h>
+#include <ignite/network/tcp_range.h>
+
+#include "network/linux_async_client.h"
+
+namespace ignite
+{
+ namespace network
+ {
+ /**
+ * Connecting context.
+ */
+ class ConnectingContext
+ {
+ public:
+ /**
+ * Constructor.
+ */
+ ConnectingContext(const TcpRange& range);
+
+ /**
+ * Destructor.
+ */
+ ~ConnectingContext();
+
+ /**
+ * Reset connection context to it's initial state.
+ */
+ void Reset();
+
+ /**
+ * Next address in range.
+ *
+ * @return Next addrinfo for connection.
+ */
+ addrinfo* Next();
+
+ /**
+ * Get lastaddress.
+ *
+ * @return Address.
+ */
+ EndPoint GetAddress() const;
+
+ /**
+ * Make client.
+ *
+ * @param fd Socket file descriptor.
+ * @return Client instance from current internal state.
+ */
+ SP_LinuxAsyncClient ToClient(int fd);
+
+ private:
+ IGNITE_NO_COPY_ASSIGNMENT(ConnectingContext);
+
+ /** Range. */
+ const TcpRange range;
+
+ /** Next port. */
+ uint16_t nextPort;
+
+ /** Current addrinfo. */
+ addrinfo* info;
+
+ /** Addrinfo which is currently used for connection */
+ addrinfo* currentInfo;
+ };
+ }
+}
+
+#endif //_IGNITE_NETWORK_CONNECTING_CONTEXT
diff --git a/modules/platforms/cpp/network/os/linux/src/network/linux_async_client.cpp b/modules/platforms/cpp/network/os/linux/src/network/linux_async_client.cpp
new file mode 100644
index 0000000..3b535d9
--- /dev/null
+++ b/modules/platforms/cpp/network/os/linux/src/network/linux_async_client.cpp
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <netinet/tcp.h>
+#include <netdb.h>
+#include <sys/epoll.h>
+#include <unistd.h>
+
+#include <algorithm>
+
+#include <ignite/common/utils.h>
+#include <ignite/network/utils.h>
+
+#include "network/sockets.h"
+#include "network/linux_async_client.h"
+
+namespace ignite
+{
+ namespace network
+ {
+ LinuxAsyncClient::LinuxAsyncClient(int fd, const EndPoint &addr, const TcpRange &range) :
+ state(State::CONNECTED),
+ fd(fd),
+ epoll(-1),
+ id(0),
+ addr(addr),
+ range(range),
+ sendPackets(),
+ sendCs(),
+ recvPacket(),
+ closeErr(IgniteError::IGNITE_SUCCESS)
+ {
+ // No-op.
+ }
+
+ LinuxAsyncClient::~LinuxAsyncClient()
+ {
+ Shutdown(0);
+
+ Close();
+ }
+
+ bool LinuxAsyncClient::Shutdown(const IgniteError* err)
+ {
+ common::concurrent::CsLockGuard lock(sendCs);
+ if (state != State::CONNECTED)
+ return false;
+
+ closeErr = err ? *err : IgniteError(IgniteError::IGNITE_ERR_GENERIC, "Connection closed by application");
+ shutdown(fd, SHUT_RDWR);
+ state = State::SHUTDOWN;
+
+ return true;
+ }
+
+ bool LinuxAsyncClient::Close()
+ {
+ State::Type stateBefore = state;
+
+ if (state != State::CLOSED)
+ {
+ StopMonitoring();
+ close(fd);
+ fd = -1;
+ state = State::CLOSED;
+ }
+
+ return stateBefore == State::CONNECTED;
+ }
+
+ bool LinuxAsyncClient::Send(const DataBuffer& data)
+ {
+ common::concurrent::CsLockGuard lock(sendCs);
+
+ sendPackets.push_back(data);
+
+ if (sendPackets.size() > 1)
+ return true;
+
+ return SendNextPacketLocked();
+ }
+
+ bool LinuxAsyncClient::SendNextPacketLocked()
+ {
+ if (sendPackets.empty())
+ return true;
+
+ DataBuffer& packet = sendPackets.front();
+
+ ssize_t ret = send(fd, packet.GetData(), packet.GetSize(), 0);
+ if (ret < 0)
+ return false;
+
+ packet.Skip(static_cast<int32_t>(ret));
+
+ EnableSendNotifications();
+
+ return true;
+ }
+
+ DataBuffer LinuxAsyncClient::Receive()
+ {
+ using namespace impl::interop;
+
+ if (!recvPacket.IsValid())
+ {
+ recvPacket = SP_InteropMemory(new InteropUnpooledMemory(BUFFER_SIZE));
+ recvPacket.Get()->Length(BUFFER_SIZE);
+ }
+
+ ssize_t res = recv(fd, recvPacket.Get()->Data(), recvPacket.Get()->Length(), 0);
+ if (res < 0)
+ return DataBuffer();
+
+ return DataBuffer(recvPacket, 0, static_cast<int32_t>(res));
+ }
+
+ bool LinuxAsyncClient::StartMonitoring(int epoll0)
+ {
+ if (epoll0 < 0)
+ return false;
+
+ epoll_event event;
+ std::memset(&event, 0, sizeof(event));
+ event.data.ptr = this;
+ event.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP;
+
+ int res = epoll_ctl(epoll0, EPOLL_CTL_ADD, fd, &event);
+ if (res < 0)
+ return false;
+
+ epoll = epoll0;
+
+ return true;
+ }
+
+ void LinuxAsyncClient::StopMonitoring()
+ {
+ epoll_event event;
+ std::memset(&event, 0, sizeof(event));
+
+ epoll_ctl(epoll, EPOLL_CTL_DEL, fd, &event);
+ }
+
+ void LinuxAsyncClient::EnableSendNotifications()
+ {
+ epoll_event event;
+ std::memset(&event, 0, sizeof(event));
+ event.data.ptr = this;
+ event.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP;
+
+ epoll_ctl(epoll, EPOLL_CTL_MOD, fd, &event);
+ }
+
+ void LinuxAsyncClient::DisableSendNotifications()
+ {
+ epoll_event event;
+ std::memset(&event, 0, sizeof(event));
+ event.data.ptr = this;
+ event.events = EPOLLIN | EPOLLRDHUP;
+
+ epoll_ctl(epoll, EPOLL_CTL_MOD, fd, &event);
+ }
+
+ bool LinuxAsyncClient::ProcessSent()
+ {
+ common::concurrent::CsLockGuard lock(sendCs);
+
+ if (sendPackets.empty())
+ {
+ DisableSendNotifications();
+
+ return true;
+ }
+
+ DataBuffer& front = sendPackets.front();
+
+ if (front.IsEmpty())
+ sendPackets.pop_front();
+
+ return SendNextPacketLocked();
+ }
+ }
+}
diff --git a/modules/platforms/cpp/network/os/linux/src/network/linux_async_client.h b/modules/platforms/cpp/network/os/linux/src/network/linux_async_client.h
new file mode 100644
index 0000000..79977fc
--- /dev/null
+++ b/modules/platforms/cpp/network/os/linux/src/network/linux_async_client.h
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_NETWORK_LINUX_ASYNC_CLIENT
+#define _IGNITE_NETWORK_LINUX_ASYNC_CLIENT
+
+#include "network/sockets.h"
+
+#include <stdint.h>
+#include <deque>
+
+#include <ignite/common/concurrent.h>
+#include <ignite/impl/interop/interop_memory.h>
+
+#include <ignite/network/async_handler.h>
+#include <ignite/network/codec.h>
+#include <ignite/network/end_point.h>
+#include <ignite/network/tcp_range.h>
+
+namespace ignite
+{
+ namespace network
+ {
+ /**
+ * Linux-specific implementation of async network client.
+ */
+ class LinuxAsyncClient
+ {
+ /**
+ * State.
+ */
+ struct State
+ {
+ enum Type
+ {
+ CONNECTED,
+
+ SHUTDOWN,
+
+ CLOSED,
+ };
+ };
+
+ public:
+ enum { BUFFER_SIZE = 0x10000 };
+
+ /**
+ * Constructor.
+ *
+ * @param fd Socket file descriptor.
+ * @param addr Address.
+ * @param range Range.
+ */
+ LinuxAsyncClient(int fd, const EndPoint& addr, const TcpRange& range);
+
+ /**
+ * Destructor.
+ *
+ * Should not be destructed from external threads.
+ * Can be destructed from WorkerThread.
+ */
+ ~LinuxAsyncClient();
+
+ /**
+ * Shutdown client.
+ *
+ * Can be called from external threads.
+ * Can be called from WorkerThread.
+ *
+ * @param err Error message. Can be null.
+ * @return @c true if shutdown performed successfully.
+ */
+ bool Shutdown(const IgniteError* err);
+
+ /**
+ * Close client.
+ *
+ * Should not be called from external threads.
+ * Can be called from WorkerThread.
+ *
+ * @return @c true if shutdown performed successfully.
+ */
+ bool Close();
+
+ /**
+ * Send packet using client.
+ *
+ * @param data Data to send.
+ * @return @c true on success.
+ */
+ bool Send(const DataBuffer& data);
+
+ /**
+ * Initiate next receive of data.
+ *
+ * @return @c true on success.
+ */
+ DataBuffer Receive();
+
+ /**
+ * Process sent data.
+ *
+ * @return @c true on success.
+ */
+ bool ProcessSent();
+
+ /**
+ * Start monitoring client.
+ *
+ * @param epoll Epoll file descriptor.
+ * @return @c true on success.
+ */
+ bool StartMonitoring(int epoll);
+
+ /**
+ * Stop monitoring client.
+ */
+ void StopMonitoring();
+
+ /**
+ * Enable epoll notifications.
+ */
+ void EnableSendNotifications();
+
+ /**
+ * Disable epoll notifications.
+ */
+ void DisableSendNotifications();
+
+ /**
+ * Get client ID.
+ *
+ * @return Client ID.
+ */
+ uint64_t GetId() const
+ {
+ return id;
+ }
+
+ /**
+ * Set ID.
+ *
+ * @param id ID to set.
+ */
+ void SetId(uint64_t id)
+ {
+ this->id = id;
+ }
+
+ /**
+ * Get address.
+ *
+ * @return Address.
+ */
+ const EndPoint& GetAddress() const
+ {
+ return addr;
+ }
+
+ /**
+ * Get range.
+ *
+ * @return Range.
+ */
+ const TcpRange& GetRange() const
+ {
+ return range;
+ }
+
+ /**
+ * Check whether client is closed.
+ *
+ * @return @c true if closed.
+ */
+ bool IsClosed() const
+ {
+ return state == State::CLOSED;
+ }
+
+ /**
+ * Get closing error for the connection. Can be IGNITE_SUCCESS.
+ *
+ * @return Connection error.
+ */
+ const IgniteError& GetCloseError() const
+ {
+ return closeErr;
+ }
+
+ private:
+ /**
+ * Send next packet in queue.
+ *
+ * @warning Can only be called when holding sendCs lock.
+ * @return @c true on success.
+ */
+ bool SendNextPacketLocked();
+
+ /** State. */
+ State::Type state;
+
+ /** Socket file descriptor. */
+ int fd;
+
+ /** Epoll file descriptor. */
+ int epoll;
+
+ /** Connection ID. */
+ uint64_t id;
+
+ /** Server end point. */
+ EndPoint addr;
+
+ /** Address range associated with current connection. */
+ TcpRange range;
+
+ /** Packets that should be sent. */
+ std::deque<DataBuffer> sendPackets;
+
+ /** Send critical section. */
+ common::concurrent::CriticalSection sendCs;
+
+ /** Packet that is currently received. */
+ impl::interop::SP_InteropMemory recvPacket;
+
+ /** Closing error. */
+ IgniteError closeErr;
+ };
+
+ /** Shared pointer to async client. */
+ typedef common::concurrent::SharedPointer<LinuxAsyncClient> SP_LinuxAsyncClient;
+ }
+}
+
+#endif //_IGNITE_NETWORK_LINUX_ASYNC_CLIENT
diff --git a/modules/platforms/cpp/network/os/linux/src/network/linux_async_client_pool.cpp b/modules/platforms/cpp/network/os/linux/src/network/linux_async_client_pool.cpp
new file mode 100644
index 0000000..74c4804
--- /dev/null
+++ b/modules/platforms/cpp/network/os/linux/src/network/linux_async_client_pool.cpp
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <algorithm>
+
+#include <ignite/common/utils.h>
+#include <ignite/network/utils.h>
+
+#include "network/sockets.h"
+#include "network/linux_async_client_pool.h"
+
+namespace ignite
+{
+ namespace network
+ {
+ LinuxAsyncClientPool::LinuxAsyncClientPool() :
+ stopping(true),
+ asyncHandler(0),
+ workerThread(*this),
+ idGen(0),
+ clientsCs(),
+ clientIdMap()
+ {
+ // No-op.
+ }
+
+ LinuxAsyncClientPool::~LinuxAsyncClientPool()
+ {
+ InternalStop();
+ }
+
+ void LinuxAsyncClientPool::Start(const std::vector<TcpRange> &addrs, uint32_t connLimit)
+ {
+ if (!stopping)
+ throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, "Client pool is already started");
+
+ idGen = 0;
+ stopping = false;
+
+ try
+ {
+ workerThread.Start0(connLimit, addrs);
+ }
+ catch (...)
+ {
+ Stop();
+
+ throw;
+ }
+ }
+
+ void LinuxAsyncClientPool::Stop()
+ {
+ InternalStop();
+ }
+
+ bool LinuxAsyncClientPool::Send(uint64_t id, const DataBuffer &data)
+ {
+ if (stopping)
+ return false;
+
+ SP_LinuxAsyncClient client = FindClient(id);
+ if (!client.IsValid())
+ return false;
+
+ return client.Get()->Send(data);
+ }
+
+ void LinuxAsyncClientPool::Close(uint64_t id, const IgniteError *err)
+ {
+ if (stopping)
+ return;
+
+ SP_LinuxAsyncClient client = FindClient(id);
+ if (!client.IsValid() || client.Get()->IsClosed())
+ return;
+
+ client.Get()->Shutdown(err);
+ }
+
+ void LinuxAsyncClientPool::CloseAndRelease(uint64_t id, const IgniteError *err)
+ {
+ if (stopping)
+ return;
+
+ SP_LinuxAsyncClient client;
+ {
+ common::concurrent::CsLockGuard lock(clientsCs);
+
+ std::map<uint64_t, SP_LinuxAsyncClient>::iterator it = clientIdMap.find(id);
+ if (it == clientIdMap.end())
+ return;
+
+ client = it->second;
+
+ clientIdMap.erase(it);
+ }
+
+ bool closed = client.Get()->Close();
+ if (closed)
+ HandleConnectionClosed(id, err);
+ }
+
+ bool LinuxAsyncClientPool::AddClient(SP_LinuxAsyncClient &client)
+ {
+ if (stopping)
+ return false;
+
+ LinuxAsyncClient& clientRef = *client.Get();
+ {
+ common::concurrent::CsLockGuard lock(clientsCs);
+
+ uint64_t id = ++idGen;
+ clientRef.SetId(id);
+
+ clientIdMap[id] = client;
+ }
+
+ HandleConnectionSuccess(clientRef.GetAddress(), clientRef.GetId());
+
+ return true;
+ }
+
+ void LinuxAsyncClientPool::HandleConnectionError(const EndPoint &addr, const IgniteError &err)
+ {
+ AsyncHandler* asyncHandler0 = asyncHandler;
+ if (asyncHandler0)
+ asyncHandler0->OnConnectionError(addr, err);
+ }
+
+ void LinuxAsyncClientPool::HandleConnectionSuccess(const EndPoint &addr, uint64_t id)
+ {
+ AsyncHandler* asyncHandler0 = asyncHandler;
+ if (asyncHandler0)
+ asyncHandler0->OnConnectionSuccess(addr, id);
+ }
+
+ void LinuxAsyncClientPool::HandleConnectionClosed(uint64_t id, const IgniteError *err)
+ {
+ AsyncHandler* asyncHandler0 = asyncHandler;
+ if (asyncHandler0)
+ asyncHandler0->OnConnectionClosed(id, err);
+ }
+
+ void LinuxAsyncClientPool::HandleMessageReceived(uint64_t id, const DataBuffer &msg)
+ {
+ AsyncHandler* asyncHandler0 = asyncHandler;
+ if (asyncHandler0)
+ asyncHandler0->OnMessageReceived(id, msg);
+ }
+
+ void LinuxAsyncClientPool::HandleMessageSent(uint64_t id)
+ {
+ AsyncHandler* asyncHandler0 = asyncHandler;
+ if (asyncHandler0)
+ asyncHandler0->OnMessageSent(id);
+ }
+
+ void LinuxAsyncClientPool::InternalStop()
+ {
+ stopping = true;
+ workerThread.Stop();
+
+ {
+ common::concurrent::CsLockGuard lock(clientsCs);
+
+ std::map<uint64_t, SP_LinuxAsyncClient>::iterator it;
+ for (it = clientIdMap.begin(); it != clientIdMap.end(); ++it)
+ {
+ LinuxAsyncClient& client = *it->second.Get();
+
+ IgniteError err(IgniteError::IGNITE_ERR_GENERIC, "Client stopped");
+ HandleConnectionClosed(client.GetId(), &err);
+ }
+
+ clientIdMap.clear();
+ }
+ }
+
+ SP_LinuxAsyncClient LinuxAsyncClientPool::FindClient(uint64_t id) const
+ {
+ common::concurrent::CsLockGuard lock(clientsCs);
+
+ std::map<uint64_t, SP_LinuxAsyncClient>::const_iterator it = clientIdMap.find(id);
+ if (it == clientIdMap.end())
+ return SP_LinuxAsyncClient();
+
+ return it->second;
+ }
+ }
+}
diff --git a/modules/platforms/cpp/network/os/linux/src/network/linux_async_client_pool.h b/modules/platforms/cpp/network/os/linux/src/network/linux_async_client_pool.h
new file mode 100644
index 0000000..ca47894
--- /dev/null
+++ b/modules/platforms/cpp/network/os/linux/src/network/linux_async_client_pool.h
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_NETWORK_LINUX_ASYNC_CLIENT_POOL
+#define _IGNITE_NETWORK_LINUX_ASYNC_CLIENT_POOL
+
+#include <stdint.h>
+#include <map>
+
+#include <ignite/ignite_error.h>
+
+#include <ignite/common/concurrent.h>
+#include <ignite/impl/interop/interop_memory.h>
+
+#include <ignite/network/async_client_pool.h>
+#include <ignite/network/async_handler.h>
+#include <ignite/network/tcp_range.h>
+
+#include "network/linux_async_worker_thread.h"
+#include "network/linux_async_client.h"
+
+namespace ignite
+{
+ namespace network
+ {
+ /**
+ * Linux-specific implementation of asynchronous client pool.
+ */
+ class LinuxAsyncClientPool : public AsyncClientPool
+ {
+ public:
+ /**
+ * Constructor
+ *
+ * @param handler Upper level event handler.
+ */
+ LinuxAsyncClientPool();
+
+ /**
+ * Destructor.
+ */
+ virtual ~LinuxAsyncClientPool();
+
+ /**
+ * Start internal thread that establishes connections to provided addresses and asynchronously sends and
+ * receives messages from them. Function returns either when thread is started and first connection is
+ * established or failure happened.
+ *
+ * @param addrs Addresses to connect to.
+ * @param connLimit Connection upper limit. Zero means limit is disabled.
+ *
+ * @throw IgniteError on error.
+ */
+ virtual void Start(const std::vector<TcpRange>& addrs, uint32_t connLimit);
+
+ /**
+ * Close all established connections and stops handling thread.
+ */
+ virtual void Stop();
+
+ /**
+ * Set handler.
+ *
+ * @param handler Handler to set.
+ */
+ virtual void SetHandler(AsyncHandler *handler)
+ {
+ asyncHandler = handler;
+ }
+
+ /**
+ * Send data to specific established connection.
+ *
+ * @param id Client ID.
+ * @param data Data to be sent.
+ * @return @c true if connection is present and @c false otherwise.
+ *
+ * @throw IgniteError on error.
+ */
+ virtual bool Send(uint64_t id, const DataBuffer& data);
+
+ /**
+ * Closes specified connection if it's established. Connection to the specified address is planned for
+ * re-connect. Event is issued to the handler with specified error.
+ *
+ * @param id Client ID.
+ */
+ virtual void Close(uint64_t id, const IgniteError* err);
+
+ /**
+ * Closes and releases memory allocated for client with specified ID.
+ * Error is reported to handler.
+ *
+ * @param id Client ID.
+ * @param err Error to report. May be null.
+ * @return @c true if connection with specified ID was found.
+ */
+ void CloseAndRelease(uint64_t id, const IgniteError* err);
+
+ /**
+ * Add client to connection map. Notify user.
+ *
+ * @param client Client.
+ * @return Client ID.
+ */
+ bool AddClient(SP_LinuxAsyncClient& client);
+
+ /**
+ * Handle error during connection establishment.
+ *
+ * @param addr Connection address.
+ * @param err Error.
+ */
+ void HandleConnectionError(const EndPoint& addr, const IgniteError& err);
+
+ /**
+ * Handle successful connection establishment.
+ *
+ * @param addr Address of the new connection.
+ * @param id Connection ID.
+ */
+ void HandleConnectionSuccess(const EndPoint& addr, uint64_t id);
+
+ /**
+ * Handle error during connection establishment.
+ *
+ * @param id Async client ID.
+ * @param err Error. Can be null if connection closed without error.
+ */
+ void HandleConnectionClosed(uint64_t id, const IgniteError* err);
+
+ /**
+ * Handle new message.
+ *
+ * @param id Async client ID.
+ * @param msg Received message.
+ */
+ void HandleMessageReceived(uint64_t id, const DataBuffer& msg);
+
+ /**
+ * Handle sent message event.
+ *
+ * @param id Async client ID.
+ */
+ void HandleMessageSent(uint64_t id);
+
+ private:
+ /**
+ * Close all established connections and stops handling threads.
+ */
+ void InternalStop();
+
+ /**
+ * Find client by ID.
+ *
+ * @param id Client ID.
+ * @return Client. Null pointer if is not found.
+ */
+ SP_LinuxAsyncClient FindClient(uint64_t id) const;
+
+ /**
+ * Find client by ID.
+ *
+ * @warning Should only be called with clientsCs lock held.
+ * @param id Client ID.
+ * @return Client. Null pointer if is not found.
+ */
+ SP_LinuxAsyncClient FindClientLocked(uint64_t id) const;
+
+ /** Flag indicating that pool is stopping. */
+ volatile bool stopping;
+
+ /** Event handler. */
+ AsyncHandler* asyncHandler;
+
+ /** Worker thread. */
+ LinuxAsyncWorkerThread workerThread;
+
+ /** ID counter. */
+ uint64_t idGen;
+
+ /** Clients critical section. */
+ mutable common::concurrent::CriticalSection clientsCs;
+
+ /** Client mapping ID -> client */
+ std::map<uint64_t, SP_LinuxAsyncClient> clientIdMap;
+ };
+ }
+}
+
+#endif //_IGNITE_NETWORK_LINUX_ASYNC_CLIENT_POOL
diff --git a/modules/platforms/cpp/network/os/linux/src/network/linux_async_worker_thread.cpp b/modules/platforms/cpp/network/os/linux/src/network/linux_async_worker_thread.cpp
new file mode 100644
index 0000000..9aad7f5
--- /dev/null
+++ b/modules/platforms/cpp/network/os/linux/src/network/linux_async_worker_thread.cpp
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <netdb.h>
+#include <netinet/tcp.h>
+#include <unistd.h>
+
+#include <cstring>
+
+#include <ignite/network/utils.h>
+#include <ignite/common/utils.h>
+
+#include "network/linux_async_worker_thread.h"
+#include "network/linux_async_client_pool.h"
+
+namespace
+{
+ ignite::common::FibonacciSequence<10> fibonacci10;
+}
+
+namespace ignite
+{
+ namespace network
+ {
+ LinuxAsyncWorkerThread::LinuxAsyncWorkerThread(LinuxAsyncClientPool &clientPool) :
+ clientPool(clientPool),
+ stopping(true),
+ epoll(-1),
+ stopEvent(-1),
+ nonConnected(),
+ currentConnection(),
+ currentClient(),
+ failedAttempts(0),
+ lastConnectionTime(),
+ minAddrs(0)
+ {
+ memset(&lastConnectionTime, 0, sizeof(lastConnectionTime));
+ }
+
+ LinuxAsyncWorkerThread::~LinuxAsyncWorkerThread()
+ {
+ Stop();
+ }
+
+ void LinuxAsyncWorkerThread::Start0(size_t limit, const std::vector<TcpRange> &addrs)
+ {
+ epoll = epoll_create(1);
+ if (epoll < 0)
+ ThrowSystemError("Failed to create epoll instance");
+
+ stopEvent = eventfd(0, EFD_NONBLOCK);
+ if (stopEvent < 0)
+ {
+ close(stopEvent);
+ ThrowSystemError("Failed to create stop event instance");
+ }
+
+ epoll_event event;
+ memset(&event, 0, sizeof(event));
+
+ event.events = EPOLLIN;
+
+ int res = epoll_ctl(epoll, EPOLL_CTL_ADD, stopEvent, &event);
+ if (res < 0)
+ {
+ close(stopEvent);
+ close(epoll);
+ ThrowSystemError("Failed to add stop event to epoll");
+ }
+
+ stopping = false;
+ failedAttempts = 0;
+ nonConnected = addrs;
+
+ currentConnection.reset();
+ currentClient = SP_LinuxAsyncClient();
+
+ if (!limit || limit > addrs.size())
+ minAddrs = 0;
+ else
+ minAddrs = addrs.size() - limit;
+
+ Thread::Start();
+ }
+
+ void LinuxAsyncWorkerThread::Stop()
+ {
+ if (stopping)
+ return;
+
+ stopping = true;
+
+ int64_t value = 1;
+ ssize_t res = write(stopEvent, &value, sizeof(value));
+
+ IGNITE_UNUSED(res);
+ assert(res == sizeof(value));
+
+ Thread::Join();
+
+ close(stopEvent);
+ close(epoll);
+
+ nonConnected.clear();
+ currentConnection.reset();
+ }
+
+ void LinuxAsyncWorkerThread::Run()
+ {
+ while (!stopping)
+ {
+ HandleNewConnections();
+
+ if (stopping)
+ break;
+
+ HandleConnectionEvents();
+ }
+ }
+
+ void LinuxAsyncWorkerThread::HandleNewConnections()
+ {
+ if (!ShouldInitiateNewConnection())
+ return;
+
+ if (CalculateConnectionTimeout() > 0)
+ return;
+
+ addrinfo* addr = 0;
+ if (currentConnection.get())
+ addr = currentConnection->Next();
+
+ if (!addr)
+ {
+ size_t idx = rand() % nonConnected.size();
+ const TcpRange& range = nonConnected.at(idx);
+
+ currentConnection.reset(new ConnectingContext(range));
+ addr = currentConnection->Next();
+ if (!addr)
+ {
+ currentConnection.reset();
+ ReportConnectionError(EndPoint(), "Can not resolve a single address from range: " + range.ToString());
+ ++failedAttempts;
+
+ return;
+ }
+ }
+
+ // Create a SOCKET for connecting to server
+ int socketFd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
+ if (SOCKET_ERROR == socketFd)
+ {
+ ReportConnectionError(currentConnection->GetAddress(),
+ "Socket creation failed: " + sockets::GetLastSocketErrorMessage());
+
+ return;
+ }
+
+ sockets::TrySetSocketOptions(socketFd, LinuxAsyncClient::BUFFER_SIZE, true, true, true);
+ bool success = sockets::SetNonBlockingMode(socketFd, true);
+ if (!success)
+ {
+ ReportConnectionError(currentConnection->GetAddress(),
+ "Can not make non-blocking socket: " + sockets::GetLastSocketErrorMessage());
+
+ return;
+ }
+
+ currentClient = currentConnection->ToClient(socketFd);
+ bool ok = currentClient.Get()->StartMonitoring(epoll);
+ if (!ok)
+ ThrowSystemError("Can not add file descriptor to epoll");
+
+ // Connect to server.
+ int res = connect(socketFd, addr->ai_addr, addr->ai_addrlen);
+ if (SOCKET_ERROR == res)
+ {
+ int lastError = errno;
+
+ clock_gettime(CLOCK_MONOTONIC, &lastConnectionTime);
+
+ if (lastError != EWOULDBLOCK && lastError != EINPROGRESS)
+ {
+ HandleConnectionFailed("Failed to establish connection with the host: " +
+ sockets::GetSocketErrorMessage(lastError));
+
+ return;
+ }
+ }
+ }
+
+ void LinuxAsyncWorkerThread::HandleConnectionEvents()
+ {
+ enum { MAX_EVENTS = 16 };
+ epoll_event events[MAX_EVENTS];
+
+ int timeout = CalculateConnectionTimeout();
+
+ int res = epoll_wait(epoll, events, MAX_EVENTS, timeout);
+
+ if (res <= 0)
+ return;
+
+ for (int i = 0; i < res; ++i)
+ {
+ epoll_event& currentEvent = events[i];
+ LinuxAsyncClient* client = static_cast<LinuxAsyncClient*>(currentEvent.data.ptr);
+ if (!client)
+ continue;
+
+ if (client == currentClient.Get())
+ {
+ if (currentEvent.events & (EPOLLRDHUP | EPOLLERR))
+ {
+ HandleConnectionFailed("Can not establish connection");
+
+ continue;
+ }
+
+ HandleConnectionSuccess(client);
+ }
+
+ if (currentEvent.events & (EPOLLRDHUP | EPOLLERR))
+ {
+ HandleConnectionClosed(client);
+
+ continue;
+ }
+
+ if (currentEvent.events & EPOLLIN)
+ {
+ DataBuffer msg = client->Receive();
+ if (msg.IsEmpty())
+ {
+ HandleConnectionClosed(client);
+
+ continue;
+ }
+
+ clientPool.HandleMessageReceived(client->GetId(), msg);
+ }
+
+ if (currentEvent.events & EPOLLOUT)
+ {
+ bool ok = client->ProcessSent();
+ if (!ok)
+ {
+ HandleConnectionClosed(client);
+
+ continue;
+ }
+ }
+ }
+ }
+
+ void LinuxAsyncWorkerThread::ReportConnectionError(const EndPoint& addr, const std::string& msg)
+ {
+ IgniteError err(IgniteError::IGNITE_ERR_NETWORK_FAILURE, msg.c_str());
+ clientPool.HandleConnectionError(addr, err);
+ }
+
+ void LinuxAsyncWorkerThread::HandleConnectionFailed(const std::string& msg)
+ {
+ LinuxAsyncClient* client = currentClient.Get();
+ assert(client != 0);
+
+ client->StopMonitoring();
+ client->Close();
+
+ ReportConnectionError(client->GetAddress(), msg);
+
+ currentClient = SP_LinuxAsyncClient();
+ ++failedAttempts;
+ }
+
+ void LinuxAsyncWorkerThread::HandleConnectionClosed(LinuxAsyncClient *client)
+ {
+ client->StopMonitoring();
+
+ nonConnected.push_back(client->GetRange());
+
+ IgniteError err(IgniteError::IGNITE_ERR_NETWORK_FAILURE, "Connection closed");
+ clientPool.CloseAndRelease(client->GetId(), &err);
+ }
+
+ void LinuxAsyncWorkerThread::HandleConnectionSuccess(LinuxAsyncClient* client)
+ {
+ nonConnected.erase(std::find(nonConnected.begin(), nonConnected.end(), client->GetRange()));
+
+ clientPool.AddClient(currentClient);
+
+ currentClient = SP_LinuxAsyncClient();
+ currentConnection.reset();
+
+ failedAttempts = 0;
+
+ clock_gettime(CLOCK_MONOTONIC, &lastConnectionTime);
+ }
+
+ int LinuxAsyncWorkerThread::CalculateConnectionTimeout() const
+ {
+ if (!ShouldInitiateNewConnection())
+ return -1;
+
+ if (lastConnectionTime.tv_sec == 0)
+ return 0;
+
+ int timeout = fibonacci10.GetValue(failedAttempts) * 1000;
+
+ timespec now;
+ clock_gettime(CLOCK_MONOTONIC, &now);
+
+ int passed = (now.tv_sec - lastConnectionTime.tv_sec) * 1000 +
+ (now.tv_nsec - lastConnectionTime.tv_nsec) / 1000000;
+
+ timeout -= passed;
+ if (timeout < 0)
+ timeout = 0;
+
+ return timeout;
+ }
+
+ bool LinuxAsyncWorkerThread::ShouldInitiateNewConnection() const
+ {
+ return !currentClient.Get() && nonConnected.size() > minAddrs;
+ }
+
+ void LinuxAsyncWorkerThread::ThrowSystemError(const std::string &msg)
+ {
+ std::stringstream buf;
+
+ buf << "Linux system error: " << msg << ", system error code: " << errno;
+
+ throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, buf.str().c_str());
+ }
+ }
+}
diff --git a/modules/platforms/cpp/network/os/linux/src/network/linux_async_worker_thread.h b/modules/platforms/cpp/network/os/linux/src/network/linux_async_worker_thread.h
new file mode 100644
index 0000000..706a8b9
--- /dev/null
+++ b/modules/platforms/cpp/network/os/linux/src/network/linux_async_worker_thread.h
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_NETWORK_LINUX_ASYNC_WORKER_THREAD
+#define _IGNITE_NETWORK_LINUX_ASYNC_WORKER_THREAD
+
+#include <time.h>
+
+#include <stdint.h>
+#include <memory>
+
+#include <ignite/common/concurrent.h>
+#include <ignite/impl/interop/interop_memory.h>
+
+#include <ignite/network/async_handler.h>
+#include <ignite/network/end_point.h>
+#include <ignite/network/tcp_range.h>
+
+#include "network/linux_async_client.h"
+#include "network/connecting_context.h"
+
+namespace ignite
+{
+ namespace network
+ {
+ class LinuxAsyncClientPool;
+
+ /**
+ * Async pool working thread.
+ */
+ class LinuxAsyncWorkerThread : protected common::concurrent::Thread
+ {
+ public:
+ /**
+ * Default constructor.
+ */
+ LinuxAsyncWorkerThread(LinuxAsyncClientPool& clientPool);
+
+ /**
+ * Destructor.
+ */
+ virtual ~LinuxAsyncWorkerThread();
+
+ /**
+ * Start worker thread.
+ *
+ * @param limit Connection limit.
+ * @param addrs Addresses to connect to.
+ */
+ void Start0(size_t limit, const std::vector<TcpRange>& addrs);
+
+ /**
+ * Stop thread.
+ */
+ void Stop();
+
+ private:
+ /**
+ * Run thread.
+ */
+ virtual void Run();
+
+ /**
+ * Initiate new connection process if needed.
+ */
+ void HandleNewConnections();
+
+ /**
+ * Handle epoll events.
+ */
+ void HandleConnectionEvents();
+
+ /**
+ * Add file descriptor to epoll for monitoring.
+ *
+ * @param fd File descriptor to add.
+ * @param client Client. May be null.
+ */
+ void StartMonitoringFd(int fd, LinuxAsyncClient* client);
+
+ /**
+ * Handle network error during connection establishment.
+ *
+ * @param addr End point.
+ * @param msg Error message.
+ */
+ void ReportConnectionError(const EndPoint& addr, const std::string& msg);
+
+ /**
+ * Handle failed connection.
+ *
+ * @param msg Error message.
+ */
+ void HandleConnectionFailed(const std::string& msg);
+
+ /**
+ * Handle network error on established connection.
+ *
+ * @param client Client instance.
+ */
+ void HandleConnectionClosed(LinuxAsyncClient* client);
+
+ /**
+ * Handle successfully established connection.
+ *
+ * @param client Client instance.
+ */
+ void HandleConnectionSuccess(LinuxAsyncClient* client);
+
+ /**
+ * Calculate connection timeout.
+ *
+ * @return Connection timeout.
+ */
+ int CalculateConnectionTimeout() const;
+
+ /**
+ * Check whether new connection should be initiated.
+ *
+ * @return @c true if new connection should be initiated.
+ */
+ bool ShouldInitiateNewConnection() const;
+
+ /**
+ * Throw window specific error with error code.
+ *
+ * @param msg Error message.
+ */
+ void ThrowSystemError(const std::string &msg);
+
+ /** Client pool. */
+ LinuxAsyncClientPool& clientPool;
+
+ /** Flag indicating that thread is stopping. */
+ bool stopping;
+
+ /** Client epoll file descriptor. */
+ int epoll;
+
+ /** Stop event file descriptor. */
+ int stopEvent;
+
+ /** Addresses to use for connection establishment. */
+ std::vector<TcpRange> nonConnected;
+
+ /** Connection which is currently in connecting process. */
+ std::auto_ptr<ConnectingContext> currentConnection;
+
+ /** Currently connected client. */
+ SP_LinuxAsyncClient currentClient;
+
+ /** Failed connection attempts. */
+ size_t failedAttempts;
+
+ /** Last connection time. */
+ timespec lastConnectionTime;
+
+ /** Minimal number of addresses. */
+ size_t minAddrs;
+ };
+ }
+}
+
+#endif //_IGNITE_NETWORK_LINUX_ASYNC_WORKER_THREAD
diff --git a/modules/platforms/cpp/network/os/linux/src/network/sockets.cpp b/modules/platforms/cpp/network/os/linux/src/network/sockets.cpp
index 08e0b5e..45b5ec0 100644
--- a/modules/platforms/cpp/network/os/linux/src/network/sockets.cpp
+++ b/modules/platforms/cpp/network/os/linux/src/network/sockets.cpp
@@ -16,6 +16,11 @@
*/
#include <sys/socket.h>
+#include <sys/types.h>
+#include <netinet/tcp.h>
+#include <netdb.h>
+#include <unistd.h>
+#include <fcntl.h>
#include <poll.h>
#include <errno.h>
@@ -110,6 +115,62 @@ namespace ignite
{
return errorCode == EINTR;
}
+
+ void TrySetSocketOptions(int socketFd, int bufSize, bool noDelay, bool outOfBand, bool keepAlive)
+ {
+ setsockopt(socketFd, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char*>(&bufSize), sizeof(bufSize));
+ setsockopt(socketFd, SOL_SOCKET, SO_RCVBUF, reinterpret_cast<char*>(&bufSize), sizeof(bufSize));
+
+ int iNoDelay = noDelay ? 1 : 0;
+ setsockopt(socketFd, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast<char*>(&iNoDelay), sizeof(iNoDelay));
+
+ int iOutOfBand = outOfBand ? 1 : 0;
+ setsockopt(socketFd, SOL_SOCKET, SO_OOBINLINE,
+ reinterpret_cast<char*>(&iOutOfBand), sizeof(iOutOfBand));
+
+ int iKeepAlive = keepAlive ? 1 : 0;
+ int res = setsockopt(socketFd, SOL_SOCKET, SO_KEEPALIVE,
+ reinterpret_cast<char*>(&iKeepAlive), sizeof(iKeepAlive));
+
+ if (SOCKET_ERROR == res)
+ {
+ // There is no sense in configuring keep alive params if we faileed to set up keep alive mode.
+ return;
+ }
+
+ // The time in seconds the connection needs to remain idle before starts sending keepalive probes.
+ enum { KEEP_ALIVE_IDLE_TIME = 60 };
+
+ // The time in seconds between individual keepalive probes.
+ enum { KEEP_ALIVE_PROBES_PERIOD = 1 };
+
+ int idleOpt = KEEP_ALIVE_IDLE_TIME;
+ int idleRetryOpt = KEEP_ALIVE_PROBES_PERIOD;
+#ifdef __APPLE__
+ setsockopt(socketFd, IPPROTO_TCP, TCP_KEEPALIVE, reinterpret_cast<char*>(&idleOpt), sizeof(idleOpt));
+#else
+ setsockopt(socketFd, IPPROTO_TCP, TCP_KEEPIDLE, reinterpret_cast<char*>(&idleOpt), sizeof(idleOpt));
+#endif
+
+ setsockopt(socketFd, IPPROTO_TCP, TCP_KEEPINTVL,
+ reinterpret_cast<char*>(&idleRetryOpt), sizeof(idleRetryOpt));
+ }
+
+ bool SetNonBlockingMode(int socketFd, bool nonBlocking)
+ {
+ int flags = fcntl(socketFd, F_GETFL, 0);
+ if (flags == -1)
+ return false;
+
+ bool currentNonBlocking = flags & O_NONBLOCK;
+ if (nonBlocking == currentNonBlocking)
+ return true;
+
+ flags ^= O_NONBLOCK;
+ int res = fcntl(socketFd, F_SETFL, flags);
+
+ return res != -1;
+ }
}
}
}
diff --git a/modules/platforms/cpp/network/os/linux/src/network/sockets.h b/modules/platforms/cpp/network/os/linux/src/network/sockets.h
index aad7bb2..2cbac8d 100644
--- a/modules/platforms/cpp/network/os/linux/src/network/sockets.h
+++ b/modules/platforms/cpp/network/os/linux/src/network/sockets.h
@@ -76,8 +76,27 @@ namespace ignite
* WaitResult::SUCCESS on success.
*/
int WaitOnSocket(SocketHandle socket, int32_t timeout, bool rd);
+
+ /**
+ * Try and set socket options.
+ *
+ * @param socketFd Socket file descriptor.
+ * @param bufSize Buffer size.
+ * @param noDelay Set no-delay mode.
+ * @param outOfBand Set out-of-Band mode.
+ * @param keepAlive Keep alive mode.
+ */
+ void TrySetSocketOptions(int socketFd, int bufSize, bool noDelay, bool outOfBand, bool keepAlive);
+
+ /**
+ * Set non blocking mode for socket.
+ *
+ * @param socketFd Socket file descriptor.
+ * @param nonBlocking Non-blocking mode.
+ */
+ bool SetNonBlockingMode(int socketFd, bool nonBlocking);
}
}
}
-#endif //_IGNITE_NETWORK_SOCKETS
\ No newline at end of file
+#endif //_IGNITE_NETWORK_SOCKETS
diff --git a/modules/platforms/cpp/network/os/linux/src/network/tcp_socket_client.cpp b/modules/platforms/cpp/network/os/linux/src/network/tcp_socket_client.cpp
index da5a6b0..e3c4770 100644
--- a/modules/platforms/cpp/network/os/linux/src/network/tcp_socket_client.cpp
+++ b/modules/platforms/cpp/network/os/linux/src/network/tcp_socket_client.cpp
@@ -27,6 +27,7 @@
#include <sstream>
#include <ignite/common/concurrent.h>
+#include <ignite/network/utils.h>
#include <ignite/ignite_error.h>
#include "network/tcp_socket_client.h"
@@ -66,7 +67,7 @@ namespace ignite
int res = getaddrinfo(hostname, strPort.c_str(), &hints, &result);
if (res != 0)
- ThrowNetworkError("Can not resolve host: " + std::string(hostname) + ":" + strPort);
+ utils::ThrowNetworkError("Can not resolve host: " + std::string(hostname) + ":" + strPort);
std::string lastErrorMsg = "Failed to resolve host";
bool isTimeout = false;
@@ -87,7 +88,8 @@ namespace ignite
throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, err.c_str());
}
- TrySetOptions();
+ sockets::TrySetSocketOptions(socketHandle, BUFFER_SIZE, true, true, true);
+ blocking = !sockets::SetNonBlockingMode(socketHandle, true);
// Connect to server.
res = connect(socketHandle, it->ai_addr, static_cast<int>(it->ai_addrlen));
@@ -126,7 +128,7 @@ namespace ignite
if (isTimeout)
return false;
- ThrowNetworkError(lastErrorMsg);
+ utils::ThrowNetworkError(lastErrorMsg);
}
return true;
@@ -178,46 +180,6 @@ namespace ignite
return blocking;
}
- void TcpSocketClient::TrySetOptions()
- {
- int trueOpt = 1;
-
- int idleOpt = KEEP_ALIVE_IDLE_TIME;
- int idleRetryOpt = KEEP_ALIVE_PROBES_PERIOD;
- int bufSizeOpt = BUFFER_SIZE;
-
- setsockopt(socketHandle, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char*>(&bufSizeOpt), sizeof(bufSizeOpt));
-
- setsockopt(socketHandle, SOL_SOCKET, SO_RCVBUF, reinterpret_cast<char*>(&bufSizeOpt), sizeof(bufSizeOpt));
-
- setsockopt(socketHandle, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast<char*>(&trueOpt), sizeof(trueOpt));
-
- setsockopt(socketHandle, SOL_SOCKET, SO_OOBINLINE, reinterpret_cast<char*>(&trueOpt), sizeof(trueOpt));
-
- blocking = false;
-
- int flags;
- blocking = ((flags = fcntl(socketHandle, F_GETFL, 0)) < 0) ||
- (fcntl(socketHandle, F_SETFL, flags | O_NONBLOCK) < 0);
-
- int res = setsockopt(socketHandle, SOL_SOCKET, SO_KEEPALIVE,
- reinterpret_cast<char*>(&trueOpt), sizeof(trueOpt));
-
- if (SOCKET_ERROR == res)
- {
- // There is no sense in configuring keep alive params if we faileed to set up keep alive mode.
- return;
- }
-#ifdef __APPLE__
- setsockopt(socketHandle, IPPROTO_TCP, TCP_KEEPALIVE, reinterpret_cast<char*>(&idleOpt), sizeof(idleOpt));
-#else
- setsockopt(socketHandle, IPPROTO_TCP, TCP_KEEPIDLE, reinterpret_cast<char*>(&idleOpt), sizeof(idleOpt));
-#endif
-
- setsockopt(socketHandle, IPPROTO_TCP, TCP_KEEPINTVL,
- reinterpret_cast<char*>(&idleRetryOpt), sizeof(idleRetryOpt));
- }
-
int TcpSocketClient::WaitOnSocket(int32_t timeout, bool rd)
{
return sockets::WaitOnSocket(socketHandle, timeout, rd);
diff --git a/modules/platforms/cpp/network/os/win/src/network/sockets.cpp b/modules/platforms/cpp/network/os/win/src/network/sockets.cpp
index a6b6bb1..2cdbc7b 100644
--- a/modules/platforms/cpp/network/os/win/src/network/sockets.cpp
+++ b/modules/platforms/cpp/network/os/win/src/network/sockets.cpp
@@ -19,7 +19,10 @@
#include <sstream>
+#include <ignite/common/concurrent.h>
+
#include <ignite/network/socket_client.h>
+#include <ignite/network/utils.h>
namespace ignite
{
@@ -52,7 +55,7 @@ namespace ignite
LPTSTR errorText = NULL;
- DWORD len = FormatMessage(
+ DWORD len = FormatMessageA(
// use system message tables to retrieve error text
FORMAT_MESSAGE_FROM_SYSTEM
// allocate buffer on local heap for error text
@@ -64,7 +67,7 @@ namespace ignite
error,
MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US),
// output
- reinterpret_cast<LPTSTR>(&errorText),
+ reinterpret_cast<LPSTR>(&errorText),
// minimum size for output buffer
0,
// arguments - see note
@@ -132,6 +135,97 @@ namespace ignite
{
return errorCode == WSAEINTR;
}
+
+ void TrySetSocketOptions(SOCKET socket, int bufSize, BOOL noDelay, BOOL outOfBand, BOOL keepAlive)
+ {
+ setsockopt(socket, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char*>(&bufSize), sizeof(bufSize));
+ setsockopt(socket, SOL_SOCKET, SO_RCVBUF, reinterpret_cast<char*>(&bufSize), sizeof(bufSize));
+
+ setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast<char*>(&noDelay), sizeof(noDelay));
+
+ setsockopt(socket, SOL_SOCKET, SO_OOBINLINE, reinterpret_cast<char*>(&outOfBand), sizeof(outOfBand));
+
+ int res = setsockopt(socket, SOL_SOCKET, SO_KEEPALIVE,
+ reinterpret_cast<char*>(&keepAlive), sizeof(keepAlive));
+
+ if (keepAlive)
+ {
+ if (SOCKET_ERROR == res)
+ {
+ // There is no sense in configuring keep alive params if we failed to set up keep alive mode.
+ return;
+ }
+
+ // The time in seconds the connection needs to remain idle before starts sending keepalive probes.
+ enum { KEEP_ALIVE_IDLE_TIME = 60 };
+
+ // The time in seconds between individual keepalive probes.
+ enum { KEEP_ALIVE_PROBES_PERIOD = 1 };
+
+#if defined(TCP_KEEPIDLE) && defined(TCP_KEEPINTVL)
+ // This option is available starting with Windows 10, version 1709.
+ DWORD idleOpt = KEEP_ALIVE_IDLE_TIME;
+ DWORD idleRetryOpt = KEEP_ALIVE_PROBES_PERIOD;
+
+ setsockopt(socket, IPPROTO_TCP, TCP_KEEPIDLE, reinterpret_cast<char*>(&idleOpt), sizeof(idleOpt));
+
+ setsockopt(socket, IPPROTO_TCP, TCP_KEEPINTVL,
+ reinterpret_cast<char*>(&idleRetryOpt), sizeof(idleRetryOpt));
+
+#else // use old hardcore WSAIoctl
+ // WinSock structure for KeepAlive timing settings
+ struct tcp_keepalive settings = { 0 };
+ settings.onoff = 1;
+ settings.keepalivetime = KEEP_ALIVE_IDLE_TIME * 1000;
+ settings.keepaliveinterval = KEEP_ALIVE_PROBES_PERIOD * 1000;
+
+ // pointers for WinSock call
+ DWORD bytesReturned;
+ WSAOVERLAPPED overlapped;
+ overlapped.hEvent = NULL;
+
+ // Set KeepAlive settings
+ WSAIoctl(
+ socket,
+ SIO_KEEPALIVE_VALS,
+ &settings,
+ sizeof(struct tcp_keepalive),
+ NULL,
+ 0,
+ &bytesReturned,
+ &overlapped,
+ NULL
+ );
+#endif
+ }
+ }
+
+ bool SetNonBlockingMode(SOCKET socket, bool nonBlocking)
+ {
+ ULONG uTrueOpt = nonBlocking ? TRUE : FALSE;
+
+ return (ioctlsocket(socket, FIONBIO, &uTrueOpt) != SOCKET_ERROR);
+ }
+
+ void InitWsa()
+ {
+ static common::concurrent::CriticalSection initCs;
+ static bool networkInited = false;
+
+ if (!networkInited)
+ {
+ common::concurrent::CsLockGuard lock(initCs);
+ if (!networkInited)
+ {
+ WSADATA wsaData;
+
+ networkInited = WSAStartup(MAKEWORD(2, 2), &wsaData) == 0;
+
+ if (!networkInited)
+ utils::ThrowNetworkError("Networking initialisation failed: " + sockets::GetLastSocketErrorMessage());
+ }
+ }
+ }
}
}
}
diff --git a/modules/platforms/cpp/network/os/win/src/network/sockets.h b/modules/platforms/cpp/network/os/win/src/network/sockets.h
index 8da1fc0..ab87b93 100644
--- a/modules/platforms/cpp/network/os/win/src/network/sockets.h
+++ b/modules/platforms/cpp/network/os/win/src/network/sockets.h
@@ -82,6 +82,32 @@ namespace ignite
* WaitResult::SUCCESS on success.
*/
int WaitOnSocket(SocketHandle socket, int32_t timeout, bool rd);
+
+ /**
+ * Try and set socket options.
+ *
+ * @param socket Socket.
+ * @param bufSize Buffer size.
+ * @param noDelay Set no-delay mode.
+ * @param outOfBand Set out-of-Band mode.
+ * @param keepAlive Keep alive mode.
+ */
+ void TrySetSocketOptions(SOCKET socket, int bufSize, BOOL noDelay, BOOL outOfBand, BOOL keepAlive);
+
+ /**
+ * Set non blocking mode for socket.
+ *
+ * @param socket Socket.
+ * @param nonBlocking Non-blocking mode.
+ */
+ bool SetNonBlockingMode(SOCKET socket, bool nonBlocking);
+
+ /**
+ * Init windows sockets.
+ *
+ * Thread-safe.
+ */
+ void InitWsa();
}
}
}
diff --git a/modules/platforms/cpp/network/os/win/src/network/tcp_socket_client.cpp b/modules/platforms/cpp/network/os/win/src/network/tcp_socket_client.cpp
index 6c3c9a1..2ac211f 100644
--- a/modules/platforms/cpp/network/os/win/src/network/tcp_socket_client.cpp
+++ b/modules/platforms/cpp/network/os/win/src/network/tcp_socket_client.cpp
@@ -22,6 +22,7 @@
#include <ignite/ignite_error.h>
#include <ignite/common/concurrent.h>
+#include <ignite/network/utils.h>
#include "network/tcp_socket_client.h"
@@ -43,23 +44,7 @@ namespace ignite
bool TcpSocketClient::Connect(const char* hostname, uint16_t port, int32_t timeout)
{
- static common::concurrent::CriticalSection initCs;
- static bool networkInited = false;
-
- // Initing networking if is not inited.
- if (!networkInited)
- {
- common::concurrent::CsLockGuard lock(initCs);
- if (!networkInited)
- {
- WSADATA wsaData;
-
- networkInited = WSAStartup(MAKEWORD(2, 2), &wsaData) == 0;
-
- if (!networkInited)
- ThrowNetworkError("Networking initialisation failed: " + sockets::GetLastSocketErrorMessage());
- }
- }
+ sockets::InitWsa();
InternalClose();
@@ -78,7 +63,7 @@ namespace ignite
int res = getaddrinfo(hostname, strPort.c_str(), &hints, &result);
if (res != 0)
- ThrowNetworkError("Can not resolve host: " + std::string(hostname) + ":" + strPort);
+ utils::ThrowNetworkError("Can not resolve host: " + std::string(hostname) + ":" + strPort);
std::string lastErrorMsg = "Failed to resolve host";
bool isTimeout = false;
@@ -93,9 +78,11 @@ namespace ignite
socketHandle = socket(it->ai_family, it->ai_socktype, it->ai_protocol);
if (socketHandle == INVALID_SOCKET)
- ThrowNetworkError("Socket creation failed: " + sockets::GetLastSocketErrorMessage());
+ utils::ThrowNetworkError("Socket creation failed: " + sockets::GetLastSocketErrorMessage());
+
+ sockets::TrySetSocketOptions(socketHandle, BUFFER_SIZE, TRUE, TRUE, TRUE);
- TrySetOptions();
+ blocking = !sockets::SetNonBlockingMode(socketHandle, true);
// Connect to server.
res = connect(socketHandle, it->ai_addr, static_cast<int>(it->ai_addrlen));
@@ -134,7 +121,7 @@ namespace ignite
if (isTimeout)
return false;
- ThrowNetworkError(lastErrorMsg);
+ utils::ThrowNetworkError(lastErrorMsg);
}
return true;
@@ -186,68 +173,6 @@ namespace ignite
return blocking;
}
- void TcpSocketClient::TrySetOptions()
- {
- BOOL trueOpt = TRUE;
- ULONG uTrueOpt = TRUE;
- int bufSizeOpt = BUFFER_SIZE;
-
- setsockopt(socketHandle, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char*>(&bufSizeOpt), sizeof(bufSizeOpt));
-
- setsockopt(socketHandle, SOL_SOCKET, SO_RCVBUF, reinterpret_cast<char*>(&bufSizeOpt), sizeof(bufSizeOpt));
-
- setsockopt(socketHandle, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast<char*>(&trueOpt), sizeof(trueOpt));
-
- setsockopt(socketHandle, SOL_SOCKET, SO_OOBINLINE, reinterpret_cast<char*>(&trueOpt), sizeof(trueOpt));
-
- blocking = ioctlsocket(socketHandle, FIONBIO, &uTrueOpt) == SOCKET_ERROR;
-
- int res = setsockopt(socketHandle, SOL_SOCKET, SO_KEEPALIVE,
- reinterpret_cast<char*>(&trueOpt), sizeof(trueOpt));
-
- if (SOCKET_ERROR == res)
- {
- // There is no sense in configuring keep alive params if we faileed to set up keep alive mode.
- return;
- }
-
- // This option is available starting with Windows 10, version 1709.
-#if defined(TCP_KEEPIDLE) && defined(TCP_KEEPINTVL)
- DWORD idleOpt = KEEP_ALIVE_IDLE_TIME;
- DWORD idleRetryOpt = KEEP_ALIVE_PROBES_PERIOD;
-
- setsockopt(socketHandle, IPPROTO_TCP, TCP_KEEPIDLE, reinterpret_cast<char*>(&idleOpt), sizeof(idleOpt));
-
- setsockopt(socketHandle, IPPROTO_TCP, TCP_KEEPINTVL,
- reinterpret_cast<char*>(&idleRetryOpt), sizeof(idleRetryOpt));
-#else // use old hardcore WSAIoctl
-
- // WinSock structure for KeepAlive timing settings
- struct tcp_keepalive settings = { 0 };
- settings.onoff = 1;
- settings.keepalivetime = KEEP_ALIVE_IDLE_TIME * 1000;
- settings.keepaliveinterval = KEEP_ALIVE_PROBES_PERIOD * 1000;
-
- // pointers for WinSock call
- DWORD bytesReturned;
- WSAOVERLAPPED overlapped;
- overlapped.hEvent = NULL;
-
- // Set KeepAlive settings
- WSAIoctl(
- socketHandle,
- SIO_KEEPALIVE_VALS,
- &settings,
- sizeof(struct tcp_keepalive),
- NULL,
- 0,
- &bytesReturned,
- &overlapped,
- NULL
- );
-#endif
- }
-
int TcpSocketClient::WaitOnSocket(int32_t timeout, bool rd)
{
return sockets::WaitOnSocket(socketHandle, timeout, rd);
diff --git a/modules/platforms/cpp/network/os/win/src/network/win_async_client.cpp b/modules/platforms/cpp/network/os/win/src/network/win_async_client.cpp
new file mode 100644
index 0000000..e50332a
--- /dev/null
+++ b/modules/platforms/cpp/network/os/win/src/network/win_async_client.cpp
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <algorithm>
+
+#include <ignite/network/utils.h>
+
+#include <ignite/impl/binary/binary_utils.h>
+
+#include "network/sockets.h"
+#include "network/win_async_client.h"
+
+namespace ignite
+{
+ namespace network
+ {
+ WinAsyncClient::WinAsyncClient(SOCKET socket, const EndPoint &addr, const TcpRange& range, int32_t bufLen) :
+ bufLen(bufLen),
+ state(State::CONNECTED),
+ socket(socket),
+ id(0),
+ addr(addr),
+ range(range),
+ closeErr(IgniteError::IGNITE_SUCCESS)
+ {
+ memset(¤tSend, 0, sizeof(currentSend));
+ currentSend.kind = IoOperationKind::SEND;
+
+ memset(¤tRecv, 0, sizeof(currentRecv));
+ currentRecv.kind = IoOperationKind::RECEIVE;
+ }
+
+ WinAsyncClient::~WinAsyncClient()
+ {
+ if (State::IN_POOL == state)
+ Shutdown(0);
+
+ if (State::CLOSED != state)
+ Close();
+ }
+
+ bool WinAsyncClient::Shutdown(const IgniteError* err)
+ {
+ common::concurrent::CsLockGuard lock(sendCs);
+
+ if (State::CONNECTED != state && State::IN_POOL != state)
+ return false;
+
+ closeErr = err ? *err : IgniteError(IgniteError::IGNITE_ERR_GENERIC, "Connection closed by application");
+
+ shutdown(socket, SD_BOTH);
+
+ state = State::SHUTDOWN;
+
+ return true;
+ }
+
+ bool WinAsyncClient::Close()
+ {
+ if (State::CLOSED == state)
+ return false;
+
+ closesocket(socket);
+
+ sendPackets.clear();
+ recvPacket = impl::interop::SP_InteropMemory();
+
+ state = State::CLOSED;
+
+ return true;
+ }
+
+ HANDLE WinAsyncClient::AddToIocp(HANDLE iocp)
+ {
+ assert(State::CONNECTED == state);
+
+ HANDLE res = CreateIoCompletionPort((HANDLE)socket, iocp, reinterpret_cast<DWORD_PTR>(this), 0);
+
+ if (!res)
+ return res;
+
+ state = State::IN_POOL;
+
+ return res;
+ }
+
+ bool WinAsyncClient::Send(const DataBuffer& data)
+ {
+ common::concurrent::CsLockGuard lock(sendCs);
+
+ if (State::CONNECTED != state && State::IN_POOL != state)
+ return false;
+
+ sendPackets.push_back(data);
+
+ if (sendPackets.size() > 1)
+ return true;
+
+ return SendNextPacketLocked();
+ }
+
+ bool WinAsyncClient::SendNextPacketLocked()
+ {
+ if (sendPackets.empty())
+ return true;
+
+ const DataBuffer& packet0 = sendPackets.front();
+ DWORD flags = 0;
+
+ WSABUF buffer;
+ buffer.buf = (CHAR*)packet0.GetData();
+ buffer.len = packet0.GetSize();
+
+ int ret = WSASend(socket, &buffer, 1, NULL, flags, ¤tSend.overlapped, NULL);
+
+ return ret != SOCKET_ERROR || WSAGetLastError() == ERROR_IO_PENDING;
+ }
+
+ bool WinAsyncClient::Receive()
+ {
+ // We do not need locking on receive as we're always reading in a single thread at most.
+ // If this ever changes we'd need to add mutex locking here.
+ if (State::CONNECTED != state && State::IN_POOL != state)
+ return false;
+
+ if (!recvPacket.IsValid())
+ ClearReceiveBuffer();
+
+ impl::interop::InteropMemory& packet0 = *recvPacket.Get();
+
+ DWORD flags = 0;
+ WSABUF buffer;
+ buffer.buf = (CHAR*)packet0.Data();
+ buffer.len = (ULONG)packet0.Length();
+
+ int ret = WSARecv(socket, &buffer, 1, NULL, &flags, ¤tRecv.overlapped, NULL);
+
+ return ret != SOCKET_ERROR || WSAGetLastError() == ERROR_IO_PENDING;
+ }
+
+ void WinAsyncClient::ClearReceiveBuffer()
+ {
+ using namespace impl::interop;
+
+ if (!recvPacket.IsValid())
+ {
+ recvPacket = SP_InteropMemory(new InteropUnpooledMemory(bufLen));
+ recvPacket.Get()->Length(bufLen);
+ }
+ }
+
+ DataBuffer WinAsyncClient::ProcessReceived(size_t bytes)
+ {
+ impl::interop::InteropMemory& packet0 = *recvPacket.Get();
+
+ return DataBuffer(recvPacket, 0, static_cast<int32_t>(bytes));
+ }
+
+ bool WinAsyncClient::ProcessSent(size_t bytes)
+ {
+ common::concurrent::CsLockGuard lock(sendCs);
+
+ DataBuffer& front = sendPackets.front();
+
+ front.Skip(static_cast<int32_t>(bytes));
+
+ if (front.IsEmpty())
+ sendPackets.pop_front();
+
+ return SendNextPacketLocked();
+ }
+ }
+}
diff --git a/modules/platforms/cpp/network/os/win/src/network/win_async_client.h b/modules/platforms/cpp/network/os/win/src/network/win_async_client.h
new file mode 100644
index 0000000..3e24fd4
--- /dev/null
+++ b/modules/platforms/cpp/network/os/win/src/network/win_async_client.h
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_NETWORK_WIN_ASYNC_CLIENT
+#define _IGNITE_NETWORK_WIN_ASYNC_CLIENT
+
+#include "network/sockets.h"
+
+#include <stdint.h>
+#include <deque>
+
+#include <ignite/common/concurrent.h>
+#include <ignite/impl/interop/interop_memory.h>
+
+#include <ignite/network/async_handler.h>
+#include <ignite/network/codec.h>
+#include <ignite/network/end_point.h>
+#include <ignite/network/tcp_range.h>
+
+namespace ignite
+{
+ namespace network
+ {
+ /**
+ * Operation kind.
+ */
+ struct IoOperationKind
+ {
+ enum Type
+ {
+ SEND,
+
+ RECEIVE,
+ };
+ };
+
+ /**
+ * Represents single IO operation.
+ * Needed to be able to distinguish one operation from another.
+ */
+ struct IoOperation
+ {
+ /** Overlapped structure that should be passed to every IO operation. */
+ WSAOVERLAPPED overlapped;
+
+ /** Operation type. */
+ IoOperationKind::Type kind;
+ };
+
+ /**
+ * Windows-specific implementation of async network client.
+ */
+ class WinAsyncClient
+ {
+ public:
+ /**
+ * State.
+ */
+ struct State
+ {
+ enum Type
+ {
+ CONNECTED,
+
+ IN_POOL,
+
+ SHUTDOWN,
+
+ CLOSED,
+ };
+ };
+
+ /**
+ * Constructor.
+ *
+ * @param socket Socket.
+ * @param addr Address.
+ * @param range Range.
+ * @param bufLen Buffer length.
+ */
+ WinAsyncClient(SOCKET socket, const EndPoint& addr, const TcpRange& range, int32_t bufLen);
+
+ /**
+ * Destructor.
+ */
+ ~WinAsyncClient();
+
+ /**
+ * Shutdown client.
+ *
+ * Can be called from external threads.
+ * Can be called from WorkerThread.
+ *
+ * @param err Error message. Can be null.
+ * @return @c true if shutdown performed successfully.
+ */
+ bool Shutdown(const IgniteError* err);
+
+ /**
+ * Close client.
+ *
+ * Should not be called from external threads.
+ * Can be called from WorkerThread.
+ *
+ * @return @c true if shutdown performed successfully.
+ */
+ bool Close();
+
+ /**
+ * Add client to IOCP.
+ *
+ * @return IOCP handle on success and NULL otherwise.
+ */
+ HANDLE AddToIocp(HANDLE iocp);
+
+ /**
+ * Send packet using client.
+ *
+ * @param data Data to send.
+ * @return @c true on success.
+ */
+ bool Send(const DataBuffer& data);
+
+ /**
+ * Initiate next receive of data.
+ *
+ * @return @c true on success.
+ */
+ bool Receive();
+
+ /**
+ * Get client ID.
+ *
+ * @return Client ID.
+ */
+ uint64_t GetId() const
+ {
+ return id;
+ }
+
+ /**
+ * Set ID.
+ *
+ * @param id ID to set.
+ */
+ void SetId(uint64_t id)
+ {
+ this->id = id;
+ }
+
+ /**
+ * Get address.
+ *
+ * @return Address.
+ */
+ const EndPoint& GetAddress() const
+ {
+ return addr;
+ }
+
+ /**
+ * Get range.
+ *
+ * @return Range.
+ */
+ const TcpRange& GetRange() const
+ {
+ return range;
+ }
+
+ /**
+ * Check whether client is closed.
+ *
+ * @return @c true if closed.
+ */
+ bool IsClosed() const
+ {
+ return socket == NULL;
+ }
+
+ /**
+ * Process sent data.
+ *
+ * @param bytes Bytes.
+ * @return @c true on success.
+ */
+ bool ProcessSent(size_t bytes);
+
+ /**
+ * Process received bytes.
+ *
+ * @param bytes Number of received bytes.
+ */
+ DataBuffer ProcessReceived(size_t bytes);
+
+ /**
+ * Get closing error for the connection. Can be IGNITE_SUCCESS.
+ *
+ * @return Connection error.
+ */
+ const IgniteError& GetCloseError() const
+ {
+ return closeErr;
+ }
+
+ private:
+
+ /**
+ * Clears client's receive buffer.
+ *
+ * @return Data received so far.
+ */
+ void ClearReceiveBuffer();
+
+ /**
+ * Send next packet in queue.
+ *
+ * @warning Can only be called when holding sendCs lock.
+ * @return @c true on success.
+ */
+ bool SendNextPacketLocked();
+
+ /** Buffer length. */
+ int32_t bufLen;
+
+ /** Client state. */
+ State::Type state;
+
+ /** Socket. */
+ SOCKET socket;
+
+ /** Connection ID. */
+ uint64_t id;
+
+ /** Server end point. */
+ EndPoint addr;
+
+ /** Address range associated with current connection. */
+ TcpRange range;
+
+ /** Current send operation. */
+ IoOperation currentSend;
+
+ /** Packets that should be sent. */
+ std::deque<DataBuffer> sendPackets;
+
+ /** Send critical section. */
+ common::concurrent::CriticalSection sendCs;
+
+ /** Current receive operation. */
+ IoOperation currentRecv;
+
+ /** Packet that is currently received. */
+ impl::interop::SP_InteropMemory recvPacket;
+
+ /** Closing error. */
+ IgniteError closeErr;
+ };
+
+ /** Shared pointer to async client. */
+ typedef common::concurrent::SharedPointer<WinAsyncClient> SP_WinAsyncClient;
+ }
+}
+
+#endif //_IGNITE_NETWORK_WIN_ASYNC_CLIENT
\ No newline at end of file
diff --git a/modules/platforms/cpp/network/os/win/src/network/win_async_client_pool.cpp b/modules/platforms/cpp/network/os/win/src/network/win_async_client_pool.cpp
new file mode 100644
index 0000000..33eb4df
--- /dev/null
+++ b/modules/platforms/cpp/network/os/win/src/network/win_async_client_pool.cpp
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <algorithm>
+
+#include <ignite/common/utils.h>
+#include <ignite/network/utils.h>
+
+#include "network/sockets.h"
+#include "network/win_async_client_pool.h"
+
+namespace ignite
+{
+ namespace network
+ {
+ WinAsyncClientPool::WinAsyncClientPool() :
+ stopping(true),
+ asyncHandler(0),
+ connectingThread(),
+ workerThread(),
+ idGen(0),
+ iocp(NULL),
+ clientsCs(),
+ clientIdMap()
+ {
+ // No-op.
+ }
+
+ WinAsyncClientPool::~WinAsyncClientPool()
+ {
+ InternalStop();
+ }
+
+ void WinAsyncClientPool::Start(const std::vector<TcpRange>& addrs, uint32_t connLimit)
+ {
+ if (!stopping)
+ throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, "Client pool is already started");
+
+ stopping = false;
+
+ sockets::InitWsa();
+
+ iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
+ if (iocp == NULL)
+ ThrowSystemError("Failed to create IOCP instance");
+
+ try
+ {
+ connectingThread.Start(*this, connLimit, addrs);
+ workerThread.Start(*this, iocp);
+ }
+ catch (...)
+ {
+ Stop();
+
+ throw;
+ }
+ }
+
+ void WinAsyncClientPool::Stop()
+ {
+ InternalStop();
+ }
+
+ void WinAsyncClientPool::InternalStop()
+ {
+ stopping = true;
+ connectingThread.Stop();
+
+ {
+ common::concurrent::CsLockGuard lock(clientsCs);
+
+ std::map<uint64_t, SP_WinAsyncClient>::iterator it;
+ for (it = clientIdMap.begin(); it != clientIdMap.end(); ++it)
+ {
+ WinAsyncClient& client = *it->second.Get();
+
+ client.Shutdown(0);
+ client.Close();
+ }
+ }
+
+ workerThread.Stop();
+
+ CloseHandle(iocp);
+ iocp = NULL;
+
+ clientIdMap.clear();
+ }
+
+ bool WinAsyncClientPool::AddClient(SP_WinAsyncClient& client)
+ {
+ uint64_t id;
+ {
+ WinAsyncClient& clientRef = *client.Get();
+
+ common::concurrent::CsLockGuard lock(clientsCs);
+
+ if (stopping)
+ return false;
+
+ id = ++idGen;
+ clientRef.SetId(id);
+
+ HANDLE iocp0 = clientRef.AddToIocp(iocp);
+ if (iocp0 == NULL)
+ ThrowSystemError("Can not add socket to IOCP");
+
+ iocp = iocp0;
+
+ clientIdMap[id] = client;
+ }
+
+ PostQueuedCompletionStatus(iocp, 0, reinterpret_cast<ULONG_PTR>(client.Get()), 0);
+
+ return true;
+ }
+
+ void WinAsyncClientPool::HandleConnectionError(const EndPoint &addr, const IgniteError &err)
+ {
+ AsyncHandler* asyncHandler0 = asyncHandler;
+ if (asyncHandler0)
+ asyncHandler0->OnConnectionError(addr, err);
+ }
+
+ void WinAsyncClientPool::HandleConnectionSuccess(const EndPoint &addr, uint64_t id)
+ {
+ AsyncHandler* asyncHandler0 = asyncHandler;
+ if (asyncHandler0)
+ asyncHandler0->OnConnectionSuccess(addr, id);
+ }
+
+ void WinAsyncClientPool::HandleConnectionClosed(uint64_t id, const IgniteError *err)
+ {
+ AsyncHandler* asyncHandler0 = asyncHandler;
+ if (asyncHandler0)
+ asyncHandler0->OnConnectionClosed(id, err);
+ }
+
+ void WinAsyncClientPool::HandleMessageReceived(uint64_t id, const DataBuffer &msg)
+ {
+ AsyncHandler* asyncHandler0 = asyncHandler;
+ if (asyncHandler0)
+ asyncHandler0->OnMessageReceived(id, msg);
+ }
+
+ void WinAsyncClientPool::HandleMessageSent(uint64_t id)
+ {
+ AsyncHandler* asyncHandler0 = asyncHandler;
+ if (asyncHandler0)
+ asyncHandler0->OnMessageSent(id);
+ }
+
+ bool WinAsyncClientPool::Send(uint64_t id, const DataBuffer& data)
+ {
+ if (stopping)
+ return false;
+
+ SP_WinAsyncClient client = FindClient(id);
+ if (!client.IsValid())
+ return false;
+
+ return client.Get()->Send(data);
+ }
+
+ void WinAsyncClientPool::CloseAndRelease(uint64_t id, const IgniteError* err)
+ {
+ SP_WinAsyncClient client;
+ {
+ common::concurrent::CsLockGuard lock(clientsCs);
+
+ std::map<uint64_t, SP_WinAsyncClient>::iterator it = clientIdMap.find(id);
+ if (it == clientIdMap.end())
+ return;
+
+ client = it->second;
+
+ clientIdMap.erase(it);
+ }
+
+ bool closed = client.Get()->Close();
+ if (closed)
+ {
+ connectingThread.NotifyFreeAddress(client.Get()->GetRange());
+
+ IgniteError err0(client.Get()->GetCloseError());
+ if (err0.GetCode() == IgniteError::IGNITE_SUCCESS)
+ err0 = IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE, "Connection closed by server");
+
+ if (!err)
+ err = &err0;
+
+ HandleConnectionClosed(id, err);
+ }
+ }
+
+ void WinAsyncClientPool::Close(uint64_t id, const IgniteError* err)
+ {
+ SP_WinAsyncClient client = FindClient(id);
+ if (client.IsValid() && client.Get()->IsClosed())
+ client.Get()->Shutdown(err);
+ }
+
+ void WinAsyncClientPool::ThrowSystemError(const std::string& msg)
+ {
+ std::stringstream buf;
+
+ buf << "Windows system error: " << msg << ", system error code: " << GetLastError();
+
+ throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, buf.str().c_str());
+ }
+
+ SP_WinAsyncClient WinAsyncClientPool::FindClient(uint64_t id) const
+ {
+ common::concurrent::CsLockGuard lock(clientsCs);
+
+ return FindClientLocked(id);
+ }
+
+ SP_WinAsyncClient WinAsyncClientPool::FindClientLocked(uint64_t id) const
+ {
+ std::map<uint64_t, SP_WinAsyncClient>::const_iterator it = clientIdMap.find(id);
+ if (it == clientIdMap.end())
+ return SP_WinAsyncClient();
+
+ return it->second;
+ }
+
+ void WinAsyncClientPool::SetHandler(AsyncHandler *handler)
+ {
+ asyncHandler = handler;
+ }
+ }
+}
diff --git a/modules/platforms/cpp/network/os/win/src/network/win_async_client_pool.h b/modules/platforms/cpp/network/os/win/src/network/win_async_client_pool.h
new file mode 100644
index 0000000..bfd005b
--- /dev/null
+++ b/modules/platforms/cpp/network/os/win/src/network/win_async_client_pool.h
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_NETWORK_WIN_ASYNC_CLIENT_POOL
+#define _IGNITE_NETWORK_WIN_ASYNC_CLIENT_POOL
+
+#include <stdint.h>
+
+#include <ignite/ignite_error.h>
+
+#include <ignite/common/concurrent.h>
+#include <ignite/impl/interop/interop_memory.h>
+
+#include <ignite/network/async_client_pool.h>
+#include <ignite/network/async_handler.h>
+#include <ignite/network/tcp_range.h>
+
+#include "network/win_async_client.h"
+#include "network/win_async_connecting_thread.h"
+#include "network/win_async_worker_thread.h"
+
+namespace ignite
+{
+ namespace network
+ {
+ /**
+ * Windows-specific implementation of asynchronous client pool.
+ */
+ class WinAsyncClientPool : public AsyncClientPool
+ {
+ public:
+ /**
+ * Constructor
+ *
+ * @param handler Upper level event handler.
+ */
+ WinAsyncClientPool();
+
+ /**
+ * Destructor.
+ */
+ virtual ~WinAsyncClientPool();
+
+ /**
+ * Start internal thread that establishes connections to provided addresses and asynchronously sends and
+ * receives messages from them. Function returns either when thread is started and first connection is
+ * established or failure happened.
+ *
+ * @param addrs Addresses to connect to.
+ * @param connLimit Connection upper limit. Zero means limit is disabled.
+ *
+ * @throw IgniteError on error.
+ */
+ virtual void Start(const std::vector<TcpRange>& addrs, uint32_t connLimit);
+
+ /**
+ * Close all established connections and stops handling thread.
+ */
+ virtual void Stop();
+
+ /**
+ * Set handler.
+ *
+ * @param handler Handler to set.
+ */
+ virtual void SetHandler(AsyncHandler *handler);
+
+ /**
+ * Send data to specific established connection.
+ *
+ * @param id Client ID.
+ * @param data Data to be sent.
+ * @return @c true if connection is present and @c false otherwise.
+ *
+ * @throw IgniteError on error.
+ */
+ virtual bool Send(uint64_t id, const DataBuffer& data);
+
+ /**
+ * Closes specified connection if it's established. Connection to the specified address is planned for
+ * re-connect. Event is issued to the handler with specified error.
+ *
+ * @param id Client ID.
+ */
+ virtual void Close(uint64_t id, const IgniteError* err);
+
+ /**
+ * Closes and releases memory allocated for client with specified ID.
+ * Error is reported to handler.
+ *
+ * @param id Client ID.
+ * @param err Error to report. May be null.
+ * @return @c true if connection with specified ID was found.
+ */
+ void CloseAndRelease(uint64_t id, const IgniteError* err);
+
+ /**
+ * Add client to connection map. Notify user.
+ *
+ * @param client Client.
+ * @return Client ID.
+ */
+ bool AddClient(SP_WinAsyncClient& client);
+
+ /**
+ * Handle error during connection establishment.
+ *
+ * @param addr Connection address.
+ * @param err Error.
+ */
+ void HandleConnectionError(const EndPoint& addr, const IgniteError& err);
+
+ /**
+ * Handle successful connection establishment.
+ *
+ * @param addr Address of the new connection.
+ * @param id Connection ID.
+ */
+ void HandleConnectionSuccess(const EndPoint& addr, uint64_t id);
+
+ /**
+ * Handle error during connection establishment.
+ *
+ * @param id Async client ID.
+ * @param err Error. Can be null if connection closed without error.
+ */
+ void HandleConnectionClosed(uint64_t id, const IgniteError* err);
+
+ /**
+ * Handle new message.
+ *
+ * @param id Async client ID.
+ * @param msg Received message.
+ */
+ void HandleMessageReceived(uint64_t id, const DataBuffer& msg);
+
+ /**
+ * Handle sent message event.
+ *
+ * @param id Async client ID.
+ */
+ void HandleMessageSent(uint64_t id);
+
+ private:
+ /**
+ * Close all established connections and stops handling threads.
+ */
+ void InternalStop();
+
+ /**
+ * Find client by ID.
+ *
+ * @param id Client ID.
+ * @return Client. Null pointer if is not found.
+ */
+ SP_WinAsyncClient FindClient(uint64_t id) const;
+
+ /**
+ * Find client by ID.
+ *
+ * @warning Should only be called with clientsCs lock held.
+ * @param id Client ID.
+ * @return Client. Null pointer if is not found.
+ */
+ SP_WinAsyncClient FindClientLocked(uint64_t id) const;
+
+ /**
+ * Throw window specific error with error code.
+ *
+ * @param msg Error message.
+ */
+ static void ThrowSystemError(const std::string& msg);
+
+ /** Flag indicating that pool is stopping. */
+ volatile bool stopping;
+
+ /** Event handler. */
+ AsyncHandler* asyncHandler;
+
+ /** Connecting thread. */
+ WinAsyncConnectingThread connectingThread;
+
+ /** Internal thread. */
+ WinAsyncWorkerThread workerThread;
+
+ /** ID counter. */
+ uint64_t idGen;
+
+ /** IO Completion Port. Windows-specific primitive for asynchronous IO. */
+ HANDLE iocp;
+
+ /** Clients critical section. */
+ mutable common::concurrent::CriticalSection clientsCs;
+
+ /** Client mapping ID -> client */
+ std::map<uint64_t, SP_WinAsyncClient> clientIdMap;
+ };
+
+ // Type alias
+ typedef common::concurrent::SharedPointer<network::AsyncClientPool> SP_AsyncClientPool;
+ }
+}
+
+#endif //_IGNITE_NETWORK_WIN_ASYNC_CLIENT_POOL
\ No newline at end of file
diff --git a/modules/platforms/cpp/network/os/win/src/network/win_async_connecting_thread.cpp b/modules/platforms/cpp/network/os/win/src/network/win_async_connecting_thread.cpp
new file mode 100644
index 0000000..4ceb4a6
--- /dev/null
+++ b/modules/platforms/cpp/network/os/win/src/network/win_async_connecting_thread.cpp
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <ignite/common/utils.h>
+#include <ignite/network/utils.h>
+
+#include "network/sockets.h"
+#include "network/win_async_client_pool.h"
+#include "network/win_async_connecting_thread.h"
+
+namespace
+{
+ ignite::common::FibonacciSequence<10> fibonacci10;
+}
+
+namespace ignite
+{
+ namespace network
+ {
+ WinAsyncConnectingThread::WinAsyncConnectingThread() :
+ clientPool(0),
+ stopping(false),
+ failedAttempts(0),
+ minAddrs(0),
+ addrsCs(),
+ connectNeeded(),
+ nonConnected()
+ {
+ // No-op.
+ }
+
+ void WinAsyncConnectingThread::Run()
+ {
+ assert(clientPool != 0);
+
+ while (!stopping)
+ {
+ TcpRange range = GetRandomAddress();
+
+ if (stopping || range.IsEmpty())
+ break;
+
+ SP_WinAsyncClient client = TryConnect(range);
+
+ if (!client.IsValid())
+ {
+ ++failedAttempts;
+
+ DWORD msToWait = static_cast<DWORD>(1000 * fibonacci10.GetValue(failedAttempts));
+ if (msToWait)
+ Sleep(msToWait);
+
+ continue;
+ }
+
+ failedAttempts = 0;
+
+ if (stopping)
+ {
+ client.Get()->Close();
+
+ return;
+ }
+
+ try
+ {
+ bool added = clientPool->AddClient(client);
+
+ if (!added)
+ {
+ client.Get()->Close();
+
+ continue;
+ }
+
+ common::concurrent::CsLockGuard lock(addrsCs);
+ std::vector<TcpRange>::iterator it = std::find(nonConnected.begin(), nonConnected.end(), range);
+ if (it != nonConnected.end())
+ nonConnected.erase(it);
+ }
+ catch (const IgniteError& err)
+ {
+ client.Get()->Close();
+
+ clientPool->HandleConnectionError(client.Get()->GetAddress(), err);
+
+ continue;
+ }
+ }
+ }
+
+ void WinAsyncConnectingThread::NotifyFreeAddress(const TcpRange &range)
+ {
+ common::concurrent::CsLockGuard lock(addrsCs);
+
+ nonConnected.push_back(range);
+ connectNeeded.NotifyOne();
+ }
+
+ void WinAsyncConnectingThread::Start(
+ WinAsyncClientPool& clientPool0,
+ size_t limit,
+ const std::vector<TcpRange>& addrs)
+ {
+ stopping = false;
+ clientPool = &clientPool0;
+ failedAttempts = 0;
+ nonConnected = addrs;
+
+ if (!limit || limit > addrs.size())
+ minAddrs = 0;
+ else
+ minAddrs = addrs.size() - limit;
+
+ Thread::Start();
+ }
+
+ void WinAsyncConnectingThread::Stop()
+ {
+ stopping = true;
+
+ {
+ common::concurrent::CsLockGuard lock(addrsCs);
+ connectNeeded.NotifyOne();
+ }
+
+ Join();
+ nonConnected.clear();
+ }
+
+ SP_WinAsyncClient WinAsyncConnectingThread::TryConnect(const TcpRange& range)
+ {
+ for (uint16_t port = range.port; port <= (range.port + range.range); ++port)
+ {
+ EndPoint addr(range.host, port);
+ try
+ {
+ SOCKET socket = TryConnect(addr);
+
+ return SP_WinAsyncClient(new WinAsyncClient(socket, addr, range, BUFFER_SIZE));
+ }
+ catch (const IgniteError&)
+ {
+ // No-op.
+ }
+ }
+
+ return SP_WinAsyncClient();
+ }
+
+ SOCKET WinAsyncConnectingThread::TryConnect(const EndPoint& addr)
+ {
+ addrinfo hints;
+ memset(&hints, 0, sizeof(hints));
+
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_protocol = IPPROTO_TCP;
+
+ std::stringstream converter;
+ converter << addr.port;
+ std::string strPort = converter.str();
+
+ // Resolve the server address and port
+ addrinfo *result = NULL;
+ int res = getaddrinfo(addr.host.c_str(), strPort.c_str(), &hints, &result);
+
+ if (res != 0)
+ utils::ThrowNetworkError("Can not resolve host: " + addr.host + ":" + strPort);
+
+ std::string lastErrorMsg = "Failed to resolve host";
+
+ SOCKET socket = INVALID_SOCKET;
+
+ // Attempt to connect to an address until one succeeds
+ for (addrinfo *it = result; it != NULL; it = it->ai_next)
+ {
+ lastErrorMsg = "Failed to establish connection with the host";
+
+ socket = WSASocket(it->ai_family, it->ai_socktype, it->ai_protocol, NULL, 0, WSA_FLAG_OVERLAPPED);
+
+ if (socket == INVALID_SOCKET)
+ utils::ThrowNetworkError("Socket creation failed: " + sockets::GetLastSocketErrorMessage());
+
+ sockets::TrySetSocketOptions(socket, BUFFER_SIZE, TRUE, TRUE, TRUE);
+
+ // Connect to server.
+ res = WSAConnect(socket, it->ai_addr, static_cast<int>(it->ai_addrlen), NULL, NULL, NULL, NULL);
+ if (SOCKET_ERROR == res)
+ {
+ closesocket(socket);
+ socket = INVALID_SOCKET;
+
+ int lastError = WSAGetLastError();
+
+ if (lastError != WSAEWOULDBLOCK)
+ {
+ lastErrorMsg.append(": ").append(sockets::GetSocketErrorMessage(lastError));
+
+ continue;
+ }
+ }
+
+ break;
+ }
+
+ freeaddrinfo(result);
+
+ if (socket == INVALID_SOCKET)
+ utils::ThrowNetworkError(lastErrorMsg);
+
+ return socket;
+ }
+
+ TcpRange WinAsyncConnectingThread::GetRandomAddress() const
+ {
+ common::concurrent::CsLockGuard lock(addrsCs);
+
+ if (stopping)
+ return TcpRange();
+
+ while (nonConnected.size() <= minAddrs)
+ {
+ connectNeeded.Wait(addrsCs);
+
+ if (stopping)
+ return TcpRange();
+ }
+
+ size_t idx = rand() % nonConnected.size();
+ TcpRange range = nonConnected.at(idx);
+
+ lock.Reset();
+
+ return range;
+ }
+ }
+}
diff --git a/modules/platforms/cpp/network/os/win/src/network/win_async_connecting_thread.h b/modules/platforms/cpp/network/os/win/src/network/win_async_connecting_thread.h
new file mode 100644
index 0000000..90dbbc7
--- /dev/null
+++ b/modules/platforms/cpp/network/os/win/src/network/win_async_connecting_thread.h
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_NETWORK_WIN_ASYNC_CONNECTING_THREAD
+#define _IGNITE_NETWORK_WIN_ASYNC_CONNECTING_THREAD
+
+#include <stdint.h>
+
+#include <ignite/ignite_error.h>
+
+#include <ignite/common/concurrent.h>
+#include <ignite/impl/interop/interop_memory.h>
+
+#include <ignite/network/async_client_pool.h>
+#include <ignite/network/async_handler.h>
+#include <ignite/network/tcp_range.h>
+
+#include "network/win_async_client.h"
+
+namespace ignite
+{
+ namespace network
+ {
+ class WinAsyncClientPool;
+
+ /**
+ * Async pool connecting thread.
+ */
+ class WinAsyncConnectingThread : protected common::concurrent::Thread
+ {
+ /** Send and receive buffers size. */
+ enum { BUFFER_SIZE = 0x10000 };
+
+ public:
+ /**
+ * Constructor.
+ */
+ explicit WinAsyncConnectingThread();
+
+ /**
+ * Start thread.
+ *
+ * @param clientPool Client pool.
+ * @param limit Connection limit.
+ * @param addrs Addresses.
+ */
+ void Start(WinAsyncClientPool& clientPool, size_t limit, const std::vector<TcpRange>& addrs);
+
+ /**
+ * Stop thread.
+ */
+ void Stop();
+
+ /**
+ * Notify about new address available for connection.
+ *
+ * @param range Address range.
+ */
+ void NotifyFreeAddress(const TcpRange &range);
+
+ private:
+ /**
+ * Run thread.
+ */
+ virtual void Run();
+
+ /**
+ * Try establish connection to address in the range.
+ * @param range TCP range.
+ * @return New client.
+ */
+ static SP_WinAsyncClient TryConnect(const TcpRange& range);
+
+ /**
+ * Try establish connection to address.
+ * @param addr Address.
+ * @return Socket.
+ */
+ static SOCKET TryConnect(const EndPoint& addr);
+
+ /**
+ * Get random address.
+ *
+ * @warning Will block if no addresses are available for connect.
+ * @return @c true if a new connection should be established.
+ */
+ TcpRange GetRandomAddress() const;
+
+ /** Client pool. */
+ WinAsyncClientPool* clientPool;
+
+ /** Flag to signal that thread is stopping. */
+ volatile bool stopping;
+
+ /** Failed connection attempts. */
+ size_t failedAttempts;
+
+ /** Minimal number of addresses. */
+ size_t minAddrs;
+
+ /** Addresses critical section. */
+ mutable common::concurrent::CriticalSection addrsCs;
+
+ /** Condition variable, which signalled when new connect is needed. */
+ mutable common::concurrent::ConditionVariable connectNeeded;
+
+ /** Addresses to use for connection establishment. */
+ std::vector<TcpRange> nonConnected;
+ };
+ }
+}
+
+#endif //_IGNITE_NETWORK_WIN_ASYNC_CONNECTING_THREAD
\ No newline at end of file
diff --git a/modules/platforms/cpp/network/os/win/src/network/win_async_worker_thread.cpp b/modules/platforms/cpp/network/os/win/src/network/win_async_worker_thread.cpp
new file mode 100644
index 0000000..58c7bd2
--- /dev/null
+++ b/modules/platforms/cpp/network/os/win/src/network/win_async_worker_thread.cpp
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <algorithm>
+
+#include <ignite/common/utils.h>
+#include <ignite/network/utils.h>
+
+#include "network/sockets.h"
+#include "network/win_async_client.h"
+#include "network/win_async_client_pool.h"
+#include "network/win_async_worker_thread.h"
+
+namespace
+{
+ ignite::common::FibonacciSequence<10> fibonacci10;
+}
+
+namespace ignite
+{
+ namespace network
+ {
+ WinAsyncWorkerThread::WinAsyncWorkerThread() :
+ stopping(false),
+ clientPool(0),
+ iocp(NULL)
+ {
+ // No-op.
+ }
+
+ void WinAsyncWorkerThread::Start(WinAsyncClientPool& clientPool0, HANDLE iocp0)
+ {
+ assert(iocp0 != NULL);
+ iocp = iocp0;
+ clientPool = &clientPool0;
+
+ Thread::Start();
+ }
+
+ void WinAsyncWorkerThread::Run()
+ {
+ assert(clientPool != 0);
+
+ while (!stopping)
+ {
+ DWORD bytesTransferred = 0;
+ ULONG_PTR key = NULL;
+ LPOVERLAPPED overlapped = NULL;
+
+ BOOL ok = GetQueuedCompletionStatus(iocp, &bytesTransferred, &key, &overlapped, INFINITE);
+
+ if (stopping)
+ break;
+
+ if (!key)
+ continue;
+
+ WinAsyncClient* client = reinterpret_cast<WinAsyncClient*>(key);
+
+ if (!ok || (0 != overlapped && 0 == bytesTransferred))
+ {
+ clientPool->CloseAndRelease(client->GetId(), 0);
+
+ continue;
+ }
+
+ if (!overlapped)
+ {
+ // This mean new client is connected.
+ clientPool->HandleConnectionSuccess(client->GetAddress(), client->GetId());
+
+ bool success = client->Receive();
+ if (!success)
+ clientPool->CloseAndRelease(client->GetId(), 0);
+
+ continue;
+ }
+
+ try
+ {
+ IoOperation* operation = reinterpret_cast<IoOperation*>(overlapped);
+ switch (operation->kind)
+ {
+ case IoOperationKind::SEND:
+ {
+ bool success = client->ProcessSent(bytesTransferred);
+
+ if (!success)
+ clientPool->CloseAndRelease(client->GetId(), 0);
+
+ clientPool->HandleMessageSent(client->GetId());
+
+ break;
+ }
+
+ case IoOperationKind::RECEIVE:
+ {
+ DataBuffer data = client->ProcessReceived(bytesTransferred);
+
+ if (!data.IsEmpty())
+ clientPool->HandleMessageReceived(client->GetId(), data);
+
+ client->Receive();
+
+ break;
+ }
+
+ default:
+ break;
+ }
+ }
+ catch (const IgniteError& err)
+ {
+ clientPool->CloseAndRelease(client->GetId(), &err);
+ }
+ }
+ }
+
+ void WinAsyncWorkerThread::Stop()
+ {
+ stopping = true;
+
+ PostQueuedCompletionStatus(iocp, 0, 0, 0);
+
+ Join();
+ }
+ }
+}
diff --git a/modules/platforms/cpp/network/os/win/src/network/win_async_worker_thread.h b/modules/platforms/cpp/network/os/win/src/network/win_async_worker_thread.h
new file mode 100644
index 0000000..c4bc575
--- /dev/null
+++ b/modules/platforms/cpp/network/os/win/src/network/win_async_worker_thread.h
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_NETWORK_WIN_ASYNC_WORKER_THREAD
+#define _IGNITE_NETWORK_WIN_ASYNC_WORKER_THREAD
+
+#include <stdint.h>
+
+#include <ignite/ignite_error.h>
+
+#include <ignite/common/concurrent.h>
+
+namespace ignite
+{
+ namespace network
+ {
+ /** Windows async client pool. */
+ class WinAsyncClientPool;
+
+ /**
+ * Async pool worker thread.
+ */
+ class WinAsyncWorkerThread : protected common::concurrent::Thread
+ {
+ public:
+ /**
+ * Constructor.
+ */
+ explicit WinAsyncWorkerThread();
+
+ /**
+ * Start thread.
+ *
+ * @param clientPool Client pool.
+ * @param iocp Valid IOCP instance handle.
+ */
+ void Start(WinAsyncClientPool& clientPool, HANDLE iocp);
+
+ /**
+ * Stop thread.
+ */
+ void Stop();
+
+ private:
+ /**
+ * Run thread.
+ */
+ virtual void Run();
+
+ /** Flag to signal that thread should stop. */
+ volatile bool stopping;
+
+ /** Client pool. */
+ WinAsyncClientPool* clientPool;
+
+ /** IO Completion Port. Windows-specific primitive for asynchronous IO. */
+ HANDLE iocp;
+ };
+ }
+}
+
+#endif //_IGNITE_NETWORK_WIN_ASYNC_WORKER_THREAD
\ No newline at end of file
diff --git a/modules/platforms/cpp/network/src/network/async_client_pool_adapter.cpp b/modules/platforms/cpp/network/src/network/async_client_pool_adapter.cpp
new file mode 100644
index 0000000..06bec2a
--- /dev/null
+++ b/modules/platforms/cpp/network/src/network/async_client_pool_adapter.cpp
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "network/async_client_pool_adapter.h"
+#include "network/error_handling_filter.h"
+
+namespace ignite
+{
+ namespace network
+ {
+ AsyncClientPoolAdapter::AsyncClientPoolAdapter(
+ const std::vector<SP_DataFilter> &filters0,
+ const SP_AsyncClientPool& pool0
+ ) :
+ filters(filters0),
+ pool(pool0),
+ sink(pool.Get()),
+ handler(0)
+ {
+ filters.insert(filters.begin(), SP_DataFilter(new ErrorHandlingFilter()));
+
+ for (std::vector<SP_DataFilter>::iterator it = filters.begin(); it != filters.end(); ++it)
+ {
+ it->Get()->SetSink(sink);
+ sink = it->Get();
+ }
+ }
+
+ void AsyncClientPoolAdapter::Start(const std::vector<TcpRange>& addrs, uint32_t connLimit)
+ {
+ pool.Get()->Start(addrs, connLimit);
+ }
+
+ void AsyncClientPoolAdapter::Stop()
+ {
+ pool.Get()->Stop();
+ }
+
+ void AsyncClientPoolAdapter::SetHandler(AsyncHandler* handler0)
+ {
+ handler = handler0;
+ for (std::vector<SP_DataFilter>::reverse_iterator it = filters.rbegin(); it != filters.rend(); ++it)
+ {
+ it->Get()->SetHandler(handler);
+ handler = it->Get();
+ }
+
+ pool.Get()->SetHandler(handler);
+ }
+
+ bool AsyncClientPoolAdapter::Send(uint64_t id, const DataBuffer& data)
+ {
+ return sink->Send(id, data);
+ }
+
+ void AsyncClientPoolAdapter::Close(uint64_t id, const IgniteError* err)
+ {
+ sink->Close(id, err);
+ }
+ }
+}
diff --git a/modules/platforms/cpp/network/src/network/async_client_pool_adapter.h b/modules/platforms/cpp/network/src/network/async_client_pool_adapter.h
new file mode 100644
index 0000000..48ac025
--- /dev/null
+++ b/modules/platforms/cpp/network/src/network/async_client_pool_adapter.h
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_NETWORK_ASYNC_CLIENT_POOL_ADAPTER
+#define _IGNITE_NETWORK_ASYNC_CLIENT_POOL_ADAPTER
+
+#include <ignite/network/async_client_pool.h>
+
+namespace ignite
+{
+ namespace network
+ {
+ /**
+ * Asynchronous client pool adapter.
+ */
+ class AsyncClientPoolAdapter : public AsyncClientPool
+ {
+ public:
+ /**
+ * Constructor.
+ *
+ * @param filters Filters.
+ * @param pool Client pool.
+ */
+ AsyncClientPoolAdapter(const std::vector<SP_DataFilter>& filters, const SP_AsyncClientPool& pool);
+
+ /**
+ * Destructor.
+ */
+ virtual ~AsyncClientPoolAdapter()
+ {
+ // No-op.
+ }
+
+ /**
+ * Start internal thread that establishes connections to provided addresses and asynchronously sends and
+ * receives messages from them. Function returns either when thread is started and first connection is
+ * established or failure happened.
+ *
+ * @param addrs Addresses to connect to.
+ * @param connLimit Connection upper limit. Zero means limit is disabled.
+ *
+ * @throw IgniteError on error.
+ */
+ virtual void Start(const std::vector<TcpRange>& addrs, uint32_t connLimit);
+
+ /**
+ * Close all established connections and stops handling threads.
+ */
+ virtual void Stop();
+
+ /**
+ * Set handler.
+ *
+ * @param handler Handler to set.
+ */
+ virtual void SetHandler(AsyncHandler *handler);
+
+ /**
+ * Send data to specific established connection.
+ *
+ * @param id Client ID.
+ * @param data Data to be sent.
+ * @return @c true if connection is present and @c false otherwise.
+ *
+ * @throw IgniteError on error.
+ */
+ virtual bool Send(uint64_t id, const DataBuffer& data);
+
+ /**
+ * Closes specified connection if it's established. Connection to the specified address is planned for
+ * re-connect. Error is reported to handler.
+ *
+ * @param id Client ID.
+ */
+ virtual void Close(uint64_t id, const IgniteError* err);
+
+ private:
+ /** Filters. */
+ std::vector<SP_DataFilter> filters;
+
+ /** Underlying pool. */
+ SP_AsyncClientPool pool;
+
+ /** Lower level data sink. */
+ DataSink* sink;
+
+ /** Upper level event handler. */
+ AsyncHandler* handler;
+ };
+
+ // Type alias
+ typedef common::concurrent::SharedPointer<AsyncClientPoolAdapter> SP_AsyncClientPoolAdapter;
+ }
+}
+
+#endif //_IGNITE_NETWORK_ASYNC_CLIENT_POOL_ADAPTER
\ No newline at end of file
diff --git a/modules/platforms/cpp/network/src/network/codec_data_filter.cpp b/modules/platforms/cpp/network/src/network/codec_data_filter.cpp
new file mode 100644
index 0000000..d5d6c42
--- /dev/null
+++ b/modules/platforms/cpp/network/src/network/codec_data_filter.cpp
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <ignite/network/codec_data_filter.h>
+
+namespace ignite
+{
+ namespace network
+ {
+ CodecDataFilter::CodecDataFilter(const SP_CodecFactory &factory) :
+ codecFactory(factory),
+ codecs(new CodecMap()),
+ codecsCs()
+ {
+ // No-op.
+ }
+
+ CodecDataFilter::~CodecDataFilter()
+ {
+ delete codecs;
+ }
+
+ bool CodecDataFilter::Send(uint64_t id, const DataBuffer &data)
+ {
+ SP_Codec codec = FindCodec(id);
+ if (!codec.IsValid())
+ return false;
+
+ DataBuffer data0(data);
+ while (true)
+ {
+ DataBuffer out = codec.Get()->Encode(data0);
+
+ if (out.IsEmpty())
+ break;
+
+ DataFilterAdapter::Send(id, out);
+ }
+
+ return true;
+ }
+
+ void CodecDataFilter::OnConnectionSuccess(const EndPoint &addr, uint64_t id)
+ {
+ {
+ common::concurrent::CsLockGuard lock(codecsCs);
+
+ codecs->insert(std::make_pair(id, codecFactory.Get()->Build()));
+ }
+
+ DataFilterAdapter::OnConnectionSuccess(addr, id);
+ }
+
+ void CodecDataFilter::OnConnectionClosed(uint64_t id, const IgniteError *err)
+ {
+ {
+ common::concurrent::CsLockGuard lock(codecsCs);
+
+ codecs->erase(id);
+ }
+
+ DataFilterAdapter::OnConnectionClosed(id, err);
+ }
+
+ void CodecDataFilter::OnMessageReceived(uint64_t id, const DataBuffer &msg)
+ {
+ SP_Codec codec = FindCodec(id);
+ if (!codec.IsValid())
+ return;
+
+ DataBuffer msg0(msg);
+ while (true)
+ {
+ DataBuffer out = codec.Get()->Decode(msg0);
+
+ if (out.IsEmpty())
+ break;
+
+ DataFilterAdapter::OnMessageReceived(id, out);
+ }
+ }
+
+ SP_Codec CodecDataFilter::FindCodec(uint64_t id)
+ {
+ common::concurrent::CsLockGuard lock(codecsCs);
+
+ std::map<uint64_t, SP_Codec>::iterator it = codecs->find(id);
+ if (it == codecs->end())
+ return SP_Codec();
+
+ return it->second;
+ }
+ }
+}
diff --git a/modules/platforms/cpp/network/src/network/data_buffer.cpp b/modules/platforms/cpp/network/src/network/data_buffer.cpp
new file mode 100644
index 0000000..9caf5ca
--- /dev/null
+++ b/modules/platforms/cpp/network/src/network/data_buffer.cpp
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cstring>
+
+#include <ignite/ignite_error.h>
+#include <ignite/network/data_buffer.h>
+
+namespace ignite
+{
+ namespace network
+ {
+ DataBuffer::DataBuffer() :
+ position(0),
+ length(0),
+ data()
+ {
+ // No-op.
+ }
+
+ DataBuffer::DataBuffer(const impl::interop::SP_ConstInteropMemory& data0) :
+ position(0),
+ length(data0.Get()->Length()),
+ data(data0)
+ {
+ // No-op.
+ }
+
+ DataBuffer::DataBuffer(const impl::interop::SP_ConstInteropMemory& data0, int32_t pos, int32_t len) :
+ position(pos),
+ length(len),
+ data(data0)
+ {
+ // No-op.
+ }
+
+ void DataBuffer::Consume(int8_t *dst, int32_t size)
+ {
+ if (!size)
+ return;
+
+ if (size < 0)
+ throw IgniteError(IgniteError::IGNITE_ERR_GENERIC,
+ "Codec error: Can not read negative number of bytes");
+
+ if (GetSize() < size)
+ throw IgniteError(IgniteError::IGNITE_ERR_GENERIC,
+ "Codec error: Not enough data to read data from buffer");
+
+ std::memcpy(dst, data.Get()->Data() + position, size);
+ Advance(size);
+ }
+
+ const int8_t *DataBuffer::GetData() const
+ {
+ return data.Get()->Data() + position;
+ }
+
+ int32_t DataBuffer::GetSize() const
+ {
+ if (!data.IsValid())
+ return 0;
+
+ return length - position;
+ }
+
+ bool DataBuffer::IsEmpty() const
+ {
+ return GetSize() <= 0;
+ }
+
+ DataBuffer DataBuffer::ConsumeEntirely()
+ {
+ DataBuffer res(*this);
+ Advance(GetSize());
+
+ return res;
+ }
+
+ void DataBuffer::Advance(int32_t val)
+ {
+ position += val;
+ }
+
+ impl::interop::InteropInputStream DataBuffer::GetInputStream() const
+ {
+ impl::interop::InteropInputStream stream = impl::interop::InteropInputStream(data.Get(), length);
+ stream.Position(position);
+
+ return stream;
+ }
+
+ DataBuffer DataBuffer::Clone() const
+ {
+ if (IsEmpty())
+ return DataBuffer();
+
+ impl::interop::SP_InteropMemory mem(new impl::interop::InteropUnpooledMemory(length));
+ mem.Get()->Length(length);
+ std::memcpy(mem.Get()->Data(), data.Get()->Data() + position, length);
+
+ return DataBuffer(mem, 0, length);
+ }
+
+ void DataBuffer::Skip(int32_t bytes)
+ {
+ int32_t toSkip = bytes < GetSize() ? bytes : GetSize();
+
+ Advance(toSkip);
+ }
+ }
+}
diff --git a/modules/platforms/cpp/network/src/network/error_handling_filter.cpp b/modules/platforms/cpp/network/src/network/error_handling_filter.cpp
new file mode 100644
index 0000000..fe14e40
--- /dev/null
+++ b/modules/platforms/cpp/network/src/network/error_handling_filter.cpp
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "network/error_handling_filter.h"
+
+#define CLOSE_CONNECTION_ON_EXCEPTION(...) \
+ try \
+ { \
+ __VA_ARGS__; \
+ } \
+ catch (const IgniteError& err) \
+ { \
+ DataFilterAdapter::Close(id, &err); \
+ } \
+ catch (std::exception& err) \
+ { \
+ std::string msg("Standard library exception is thrown: "); \
+ msg += err.what(); \
+ IgniteError err0(IgniteError::IGNITE_ERR_GENERIC, msg.c_str()); \
+ DataFilterAdapter::Close(id, &err0); \
+ } \
+ catch (...) \
+ { \
+ IgniteError err0(IgniteError::IGNITE_ERR_UNKNOWN, \
+ "Unknown error is encountered when processing network event"); \
+ DataFilterAdapter::Close(id, &err0); \
+ }
+
+namespace ignite
+{
+ namespace network
+ {
+ void ErrorHandlingFilter::OnConnectionSuccess(const EndPoint &addr, uint64_t id)
+ {
+ CLOSE_CONNECTION_ON_EXCEPTION(DataFilterAdapter::OnConnectionSuccess(addr, id))
+ }
+
+ void ErrorHandlingFilter::OnConnectionError(const EndPoint &addr, const IgniteError &err)
+ {
+ try
+ {
+ DataFilterAdapter::OnConnectionError(addr, err);
+ }
+ catch (...)
+ {
+ // No-op.
+ }
+ }
+
+ void ErrorHandlingFilter::OnConnectionClosed(uint64_t id, const IgniteError *err)
+ {
+ try
+ {
+ DataFilterAdapter::OnConnectionClosed(id, err);
+ }
+ catch (...)
+ {
+ // No-op.
+ }
+ }
+
+ void ErrorHandlingFilter::OnMessageReceived(uint64_t id, const DataBuffer &data)
+ {
+ CLOSE_CONNECTION_ON_EXCEPTION(DataFilterAdapter::OnMessageReceived(id, data))
+ }
+
+ void ErrorHandlingFilter::OnMessageSent(uint64_t id)
+ {
+ CLOSE_CONNECTION_ON_EXCEPTION(DataFilterAdapter::OnMessageSent(id))
+ }
+ }
+}
diff --git a/modules/platforms/cpp/network/src/network/error_handling_filter.h b/modules/platforms/cpp/network/src/network/error_handling_filter.h
new file mode 100644
index 0000000..5b74d77
--- /dev/null
+++ b/modules/platforms/cpp/network/src/network/error_handling_filter.h
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_NETWORK_ERROR_HANDLING_FILTER
+#define _IGNITE_NETWORK_ERROR_HANDLING_FILTER
+
+#include <ignite/network/data_filter_adapter.h>
+
+namespace ignite
+{
+ namespace network
+ {
+ /**
+ * Filter that handles exceptions thrown by upper level handlers.
+ */
+ class IGNITE_IMPORT_EXPORT ErrorHandlingFilter : public DataFilterAdapter
+ {
+ public:
+ /**
+ * Destructor.
+ */
+ virtual ~ErrorHandlingFilter()
+ {
+ // No-op.
+ }
+
+ /**
+ * Callback that called on successful connection establishment.
+ *
+ * @param addr Address of the new connection.
+ * @param id Connection ID.
+ */
+ virtual void OnConnectionSuccess(const EndPoint& addr, uint64_t id);
+
+ /**
+ * Callback that called on error during connection establishment.
+ *
+ * @param addr Connection address.
+ * @param err Error.
+ */
+ virtual void OnConnectionError(const EndPoint& addr, const IgniteError& err);
+
+ /**
+ * Callback that called on error during connection establishment.
+ *
+ * @param id Async client ID.
+ * @param err Error. Can be null if connection closed without error.
+ */
+ virtual void OnConnectionClosed(uint64_t id, const IgniteError* err);
+
+ /**
+ * Callback that called when new message is received.
+ *
+ * @param id Async client ID.
+ * @param msg Received message.
+ */
+ virtual void OnMessageReceived(uint64_t id, const DataBuffer &msg);
+
+ /**
+ * Callback that called when message is sent.
+ *
+ * @param id Async client ID.
+ */
+ virtual void OnMessageSent(uint64_t id);
+ };
+ }
+}
+
+#endif //_IGNITE_NETWORK_ERROR_HANDLING_FILTER
\ No newline at end of file
diff --git a/modules/platforms/cpp/network/src/network/length_prefix_codec.cpp b/modules/platforms/cpp/network/src/network/length_prefix_codec.cpp
new file mode 100644
index 0000000..b1333fd
--- /dev/null
+++ b/modules/platforms/cpp/network/src/network/length_prefix_codec.cpp
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <ignite/impl/binary/binary_utils.h>
+
+#include <ignite/network/length_prefix_codec.h>
+
+namespace ignite
+{
+ namespace network
+ {
+ using impl::interop::SP_InteropMemory;
+
+ LengthPrefixCodec::LengthPrefixCodec() :
+ packetSize(-1)
+ {
+ // No-op.
+ }
+
+ LengthPrefixCodec::~LengthPrefixCodec()
+ {
+ // No-op.
+ }
+
+ DataBuffer LengthPrefixCodec::Encode(DataBuffer& data)
+ {
+ // Just pass data as is, because we encode message size in
+ // the application to avoid unnecessary re-allocations and copying.
+ return data.ConsumeEntirely();
+ }
+
+ DataBuffer LengthPrefixCodec::Decode(DataBuffer& data)
+ {
+ if (packet.IsValid() && packet.Get()->Length() == (PACKET_HEADER_SIZE + packetSize))
+ {
+ packetSize = -1;
+ packet.Get()->Length(0);
+ }
+
+ if (packetSize < 0)
+ {
+ Consume(data, PACKET_HEADER_SIZE);
+
+ if (packet.Get()->Length() < PACKET_HEADER_SIZE)
+ return DataBuffer();
+
+ packetSize = impl::binary::BinaryUtils::ReadInt32(*packet.Get(), 0);
+ }
+
+ Consume(data, PACKET_HEADER_SIZE + packetSize);
+
+ if (packet.Get()->Length() == (PACKET_HEADER_SIZE + packetSize))
+ return DataBuffer(packet, 0, PACKET_HEADER_SIZE + packetSize);
+
+ return DataBuffer();
+ }
+
+ void LengthPrefixCodec::Consume(DataBuffer &data, int32_t desired)
+ {
+ if (!packet.IsValid())
+ packet = impl::interop::SP_InteropMemory(new impl::interop::InteropUnpooledMemory(desired));
+
+ impl::interop::InteropMemory& packet0 = *packet.Get();
+
+ if (packet0.Capacity() < desired)
+ packet0.Reallocate(desired);
+
+ int32_t toCopy = desired - packet0.Length();
+ if (toCopy <= 0)
+ return;
+
+ if (data.GetSize() < toCopy)
+ toCopy = data.GetSize();
+
+ int8_t* dst = packet0.Data() + packet0.Length();
+ packet0.Length(packet0.Length() + toCopy);
+ data.Consume(dst, toCopy);
+ }
+ }
+}
diff --git a/modules/platforms/cpp/network/src/network/network.cpp b/modules/platforms/cpp/network/src/network/network.cpp
index ab1b295..f0d46d4 100644
--- a/modules/platforms/cpp/network/src/network/network.cpp
+++ b/modules/platforms/cpp/network/src/network/network.cpp
@@ -17,8 +17,15 @@
#include "network/sockets.h"
+#ifdef WIN32
+# include "network/win_async_client_pool.h"
+#else // Other. Assume Linux
+# include "network/linux_async_client_pool.h"
+#endif
+
#include <ignite/network/network.h>
+#include "network/async_client_pool_adapter.h"
#include "network/ssl/ssl_gateway.h"
#include "network/ssl/secure_socket_client.h"
#include "network/tcp_socket_client.h"
@@ -34,11 +41,6 @@ namespace ignite
SslGateway::GetInstance().LoadAll();
}
- IGNITE_IMPORT_EXPORT SocketClient* MakeTcpSocketClient()
- {
- return new TcpSocketClient;
- }
-
IGNITE_IMPORT_EXPORT SocketClient* MakeSecureSocketClient(const std::string& certPath,
const std::string& keyPath, const std::string& caPath)
{
@@ -47,5 +49,23 @@ namespace ignite
return new SecureSocketClient(certPath, keyPath, caPath);
}
}
+
+ IGNITE_IMPORT_EXPORT SocketClient* MakeTcpSocketClient()
+ {
+ return new TcpSocketClient;
+ }
+
+ IGNITE_IMPORT_EXPORT SP_AsyncClientPool MakeAsyncClientPool(const std::vector<SP_DataFilter>& filters)
+ {
+ SP_AsyncClientPool platformPool = SP_AsyncClientPool(
+#ifdef WIN32
+ new WinAsyncClientPool()
+#else // Other. Assume Linux
+ new LinuxAsyncClientPool()
+#endif
+ );
+
+ return SP_AsyncClientPool(new AsyncClientPoolAdapter(filters, platformPool));
+ }
}
}
diff --git a/modules/platforms/cpp/network/src/network/ssl/secure_data_filter.cpp b/modules/platforms/cpp/network/src/network/ssl/secure_data_filter.cpp
new file mode 100644
index 0000000..02b1c50
--- /dev/null
+++ b/modules/platforms/cpp/network/src/network/ssl/secure_data_filter.cpp
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+
+#include <ignite/common/utils.h>
+
+#include <ignite/network/network.h>
+#include <ignite/network/ssl/secure_data_filter.h>
+
+#include "network/ssl/ssl_gateway.h"
+
+enum
+{
+ SSL_OPERATION_SUCCESS = 1,
+};
+
+namespace
+{
+ void FreeContext(SSL_CTX* ctx)
+ {
+ using namespace ignite::network::ssl;
+
+ SslGateway &sslGateway = SslGateway::GetInstance();
+
+ assert(sslGateway.Loaded());
+
+ sslGateway.SSL_CTX_free_(ctx);
+ }
+
+ bool IsActualError(int err)
+ {
+ switch (err)
+ {
+ case SSL_ERROR_NONE:
+ case SSL_ERROR_WANT_READ:
+ case SSL_ERROR_WANT_WRITE:
+ case SSL_ERROR_WANT_X509_LOOKUP:
+ case SSL_ERROR_WANT_CONNECT:
+ case SSL_ERROR_WANT_ACCEPT:
+ return false;
+
+ default:
+ return true;
+ }
+ }
+
+ std::string GetSslError(void* ssl, int ret)
+ {
+ using namespace ignite::network::ssl;
+
+ SslGateway &sslGateway = SslGateway::GetInstance();
+
+ assert(sslGateway.Loaded());
+
+ SSL* ssl0 = reinterpret_cast<SSL*>(ssl);
+
+ int sslError = sslGateway.SSL_get_error_(ssl0, ret);
+
+ switch (sslError)
+ {
+ case SSL_ERROR_NONE:
+ break;
+
+ case SSL_ERROR_WANT_WRITE:
+ return std::string("SSL_connect wants write");
+
+ case SSL_ERROR_WANT_READ:
+ return std::string("SSL_connect wants read");
+
+ default:
+ return std::string("SSL error: ") + ignite::common::LexicalCast<std::string>(sslError);
+ }
+
+ unsigned long error = sslGateway.ERR_get_error_();
+
+ char errBuf[1024] = { 0 };
+
+ sslGateway.ERR_error_string_n_(error, errBuf, sizeof(errBuf));
+
+ return std::string(errBuf);
+ }
+}
+
+namespace ignite
+{
+ namespace network
+ {
+ namespace ssl
+ {
+ SecureDataFilter::SecureDataFilter(const SecureConfiguration &cfg) :
+ cfg(cfg),
+ contexts(0),
+ contextCs()
+ {
+ EnsureSslLoaded();
+
+ SslGateway &sslGateway = SslGateway::GetInstance();
+
+ const SSL_METHOD* method = sslGateway.SSLv23_client_method_();
+ if (!method)
+ throw IgniteError(IgniteError::IGNITE_ERR_SECURE_CONNECTION_FAILURE, "Can not get SSL method");
+
+ SSL_CTX* sslContext0 = sslGateway.SSL_CTX_new_(method);
+ if (!sslContext)
+ throw IgniteError(IgniteError::IGNITE_ERR_SECURE_CONNECTION_FAILURE,
+ "Can not create new SSL context");
+
+ common::DeinitGuard<SSL_CTX> guard(sslContext0, &FreeContext);
+
+ sslGateway.SSL_CTX_set_verify_(sslContext0, SSL_VERIFY_PEER, 0);
+
+ sslGateway.SSL_CTX_set_verify_depth_(sslContext0, 8);
+
+ sslGateway.SSL_CTX_set_options_(sslContext0, SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3 | SSL_OP_NO_COMPRESSION);
+
+ const char* cCaPath = cfg.caPath.empty() ? 0 : cfg.caPath.c_str();
+
+ long res = sslGateway.SSL_CTX_load_verify_locations_(sslContext0, cCaPath, 0);
+ if (res != SSL_OPERATION_SUCCESS)
+ throw IgniteError(IgniteError::IGNITE_ERR_SECURE_CONNECTION_FAILURE,
+ "Can not set Certificate Authority path for secure connection");
+
+ res = sslGateway.SSL_CTX_use_certificate_chain_file_(sslContext0, cfg.certPath.c_str());
+ if (res != SSL_OPERATION_SUCCESS)
+ throw IgniteError(IgniteError::IGNITE_ERR_SECURE_CONNECTION_FAILURE,
+ "Can not set client certificate file for secure connection");
+
+ res = sslGateway.SSL_CTX_use_RSAPrivateKey_file_(sslContext0, cfg.keyPath.c_str(), SSL_FILETYPE_PEM);
+ if (res != SSL_OPERATION_SUCCESS)
+ throw IgniteError(IgniteError::IGNITE_ERR_SECURE_CONNECTION_FAILURE,
+ "Can not set private key file for secure connection");
+
+ const char* const PREFERRED_CIPHERS = "HIGH:!aNULL:!kRSA:!PSK:!SRP:!MD5:!RC4";
+ res = sslGateway.SSL_CTX_set_cipher_list_(sslContext0, PREFERRED_CIPHERS);
+ if (res != SSL_OPERATION_SUCCESS)
+ throw IgniteError(IgniteError::IGNITE_ERR_SECURE_CONNECTION_FAILURE,
+ "Can not set ciphers list for secure connection");
+
+ guard.Release();
+ sslContext = sslContext0;
+ contexts = new ContextMap;
+ }
+
+ SecureDataFilter::~SecureDataFilter()
+ {
+ delete contexts;
+ FreeContext(static_cast<SSL_CTX*>(sslContext));
+ }
+
+ bool SecureDataFilter::Send(uint64_t id, const DataBuffer &data)
+ {
+ SP_SecureConnectionContext context = FindContext(id);
+ if (!context.IsValid())
+ return false;
+
+ return context.Get()->Send(data);
+ }
+
+ void SecureDataFilter::OnConnectionSuccess(const EndPoint &addr, uint64_t id)
+ {
+ SP_SecureConnectionContext context(new SecureConnectionContext(id, addr, *this));
+
+ {
+ common::concurrent::CsLockGuard lock(contextCs);
+
+ contexts->insert(std::make_pair(id, context));
+ }
+
+ context.Get()->DoConnect();
+ }
+
+ void SecureDataFilter::OnConnectionClosed(uint64_t id, const IgniteError *err)
+ {
+ SP_SecureConnectionContext context = FindContext(id);
+ if (!context.IsValid())
+ return;
+
+ if (context.Get()->IsConnected())
+ DataFilterAdapter::OnConnectionClosed(id, err);
+ else
+ {
+ IgniteError err0(IgniteError::IGNITE_ERR_SECURE_CONNECTION_FAILURE,
+ "Connection closed during SSL/TLS handshake");
+
+ DataFilterAdapter::OnConnectionError(context.Get()->GetAddress(), err0);
+ }
+
+ {
+ common::concurrent::CsLockGuard lock(contextCs);
+
+ contexts->erase(id);
+ }
+ }
+
+ void SecureDataFilter::OnMessageReceived(uint64_t id, const DataBuffer &msg)
+ {
+ SP_SecureConnectionContext context = FindContext(id);
+ if (!context.IsValid())
+ return;
+
+ SecureConnectionContext& context0 = *context.Get();
+
+ DataBuffer in(msg);
+
+ while (!in.IsEmpty())
+ {
+ bool connectionHappened = context0.ProcessData(in);
+
+ if (connectionHappened)
+ DataFilterAdapter::OnConnectionSuccess(context0.GetAddress(), id);
+
+ if (context0.IsConnected())
+ {
+ DataBuffer data = context0.GetPendingDecryptedData();
+ while (!data.IsEmpty())
+ {
+ DataFilterAdapter::OnMessageReceived(id, data);
+ data = context0.GetPendingDecryptedData();
+ }
+ }
+ }
+ }
+
+ SecureDataFilter::SP_SecureConnectionContext SecureDataFilter::FindContext(uint64_t id)
+ {
+ common::concurrent::CsLockGuard lock(contextCs);
+
+ std::map<uint64_t, SP_SecureConnectionContext>::iterator it = contexts->find(id);
+ if (it == contexts->end())
+ return SP_SecureConnectionContext();
+
+ return it->second;
+ }
+
+ bool SecureDataFilter::SendInternal(uint64_t id, const DataBuffer& data)
+ {
+ return DataFilterAdapter::Send(id, data);
+ }
+
+ SecureDataFilter::SecureConnectionContext::SecureConnectionContext(
+ uint64_t id,
+ const EndPoint &addr,
+ SecureDataFilter& filter
+ ) :
+ connected(false),
+ id(id),
+ addr(addr),
+ filter(filter),
+ ssl(0),
+ bioIn(0),
+ bioOut(0)
+ {
+ SslGateway &sslGateway = SslGateway::GetInstance();
+
+ ssl = sslGateway.SSL_new_(static_cast<SSL_CTX*>(filter.sslContext));
+ if (!ssl)
+ throw IgniteError(IgniteError::IGNITE_ERR_SECURE_CONNECTION_FAILURE,
+ "Can not create secure connection");
+
+ bioIn = sslGateway.BIO_new_(sslGateway.BIO_s_mem_());
+ if (!bioIn)
+ throw IgniteError(IgniteError::IGNITE_ERR_SECURE_CONNECTION_FAILURE, "Can not create input BIO");
+
+ bioOut = sslGateway.BIO_new_(sslGateway.BIO_s_mem_());
+ if (!bioOut)
+ throw IgniteError(IgniteError::IGNITE_ERR_SECURE_CONNECTION_FAILURE, "Can not create output BIO");
+
+ sslGateway.SSL_set_bio_(static_cast<SSL*>(ssl), static_cast<BIO*>(bioIn), static_cast<BIO*>(bioOut));
+ sslGateway.SSL_set_connect_state_(static_cast<SSL*>(ssl));
+ }
+
+ SecureDataFilter::SecureConnectionContext::~SecureConnectionContext()
+ {
+ SslGateway &sslGateway = SslGateway::GetInstance();
+
+ if (ssl)
+ sslGateway.SSL_free_(static_cast<SSL*>(ssl));
+ else
+ {
+ if (bioIn)
+ sslGateway.BIO_free_all_(static_cast<BIO*>(bioIn));
+
+ if (bioOut)
+ sslGateway.BIO_free_all_(static_cast<BIO*>(bioOut));
+ }
+ }
+
+ void SecureDataFilter::SecureConnectionContext::DoConnect()
+ {
+ SslGateway &sslGateway = SslGateway::GetInstance();
+
+ SSL* ssl0 = static_cast<SSL*>(ssl);
+ int res = sslGateway.SSL_connect_(ssl0);
+
+ if (res != SSL_OPERATION_SUCCESS)
+ {
+ int sslError = sslGateway.SSL_get_error_(ssl0, res);
+ if (IsActualError(sslError))
+ {
+ std::string msg = "Can not establish secure connection: " + GetSslError(ssl0, res);
+
+ throw IgniteError(IgniteError::IGNITE_ERR_SECURE_CONNECTION_FAILURE, msg.c_str());
+ }
+ }
+
+ SendPendingData();
+ }
+
+ bool SecureDataFilter::SecureConnectionContext::SendPendingData()
+ {
+ DataBuffer data(GetPendingData(bioOut));
+
+ if (data.IsEmpty())
+ return false;
+
+ return filter.SendInternal(id, data);
+ }
+
+ bool SecureDataFilter::SecureConnectionContext::Send(const DataBuffer& data)
+ {
+ if (!connected)
+ return false;
+
+ SslGateway &sslGateway = SslGateway::GetInstance();
+
+ int res = sslGateway.SSL_write_(static_cast<SSL*>(ssl), data.GetData(), data.GetSize());
+ if (res <= 0)
+ return false;
+
+ return SendPendingData();
+ }
+
+ bool SecureDataFilter::SecureConnectionContext::ProcessData(DataBuffer& data)
+ {
+ SslGateway &sslGateway = SslGateway::GetInstance();
+ int res = sslGateway.BIO_write_(static_cast<BIO*>(bioIn), data.GetData(), data.GetSize());
+ if (res <= 0)
+ throw IgniteError(IgniteError::IGNITE_ERR_SECURE_CONNECTION_FAILURE, "Failed to process SSL data");
+
+ data.Skip(res);
+
+ SendPendingData();
+
+ if (connected)
+ return false;
+
+ if (!sslGateway.SSL_is_init_finished_(static_cast<SSL*>(ssl)))
+ {
+ DoConnect();
+
+ SendPendingData();
+
+ if (!sslGateway.SSL_is_init_finished_(static_cast<SSL*>(ssl)))
+ return false;
+ }
+
+ connected = true;
+
+ recvBuffer = impl::interop::SP_InteropMemory(
+ new impl::interop::InteropUnpooledMemory(RECEIVE_BUFFER_SIZE));
+
+ return true;
+ }
+
+ DataBuffer SecureDataFilter::SecureConnectionContext::GetPendingData(void* bio)
+ {
+ SslGateway &sslGateway = SslGateway::GetInstance();
+
+ BIO *bio0 = static_cast<BIO*>(bio);
+ int available = sslGateway.BIO_pending_(bio0);
+
+ impl::interop::SP_InteropMemory buf(new impl::interop::InteropUnpooledMemory(available));
+ buf.Get()->Length(available);
+
+ int res = sslGateway.BIO_read_(bio0, buf.Get()->Data(), buf.Get()->Length());
+ if (res <= 0)
+ return DataBuffer();
+
+ return DataBuffer(buf);
+ }
+
+ DataBuffer SecureDataFilter::SecureConnectionContext::GetPendingDecryptedData()
+ {
+ SslGateway &sslGateway = SslGateway::GetInstance();
+
+ SSL *ssl0 = static_cast<SSL*>(ssl);
+ int res = sslGateway.SSL_read_(ssl0, recvBuffer.Get()->Data(), recvBuffer.Get()->Capacity());
+ if (res <= 0)
+ return DataBuffer();
+
+ recvBuffer.Get()->Length(res);
+ return DataBuffer(recvBuffer);
+ }
+ }
+ }
+}
diff --git a/modules/platforms/cpp/network/src/network/ssl/secure_socket_client.cpp b/modules/platforms/cpp/network/src/network/ssl/secure_socket_client.cpp
index 9e4143d..1ed329c 100644
--- a/modules/platforms/cpp/network/src/network/ssl/secure_socket_client.cpp
+++ b/modules/platforms/cpp/network/src/network/ssl/secure_socket_client.cpp
@@ -23,6 +23,7 @@
#include <ignite/common/utils.h>
#include <ignite/common/concurrent.h>
#include <ignite/ignite_error.h>
+#include <ignite/network/utils.h>
#include "network/ssl/secure_socket_client.h"
#include "network/ssl/ssl_gateway.h"
@@ -120,8 +121,8 @@ namespace ignite
else
ThrowSecureError("Remote host did not provide certificate: " + GetSslError(ssl0, res));
- // Verify the result of chain verification
- // Verification performed according to RFC 4158
+ // Verify the result of chain verification.
+ // Verification performed according to RFC 4158.
res = sslGateway.SSL_get_verify_result_(ssl0);
if (X509_V_OK != res)
ThrowSecureError("Certificate chain verification failed: " + GetSslError(ssl0, res));
@@ -143,7 +144,7 @@ namespace ignite
assert(sslGateway.Loaded());
if (!ssl)
- ThrowNetworkError("Trying to send data using closed connection");
+ utils::ThrowNetworkError("Trying to send data using closed connection");
SSL* ssl0 = reinterpret_cast<SSL*>(ssl);
@@ -157,7 +158,7 @@ namespace ignite
assert(sslGateway.Loaded());
if (!ssl)
- ThrowNetworkError("Trying to receive data using closed connection");
+ utils::ThrowNetworkError("Trying to receive data using closed connection");
SSL* ssl0 = reinterpret_cast<SSL*>(ssl);
@@ -392,7 +393,7 @@ namespace ignite
ss << "Can not get file descriptor from the SSL socket: " << fd << ", " << GetSslError(ssl, fd);
- ThrowNetworkError(ss.str());
+ utils::ThrowNetworkError(ss.str());
}
return sockets::WaitOnSocket(fd, timeout, rd);
diff --git a/modules/platforms/cpp/network/src/network/ssl/ssl_gateway.cpp b/modules/platforms/cpp/network/src/network/ssl/ssl_gateway.cpp
index b2afb0f..33377b9 100644
--- a/modules/platforms/cpp/network/src/network/ssl/ssl_gateway.cpp
+++ b/modules/platforms/cpp/network/src/network/ssl/ssl_gateway.cpp
@@ -172,14 +172,25 @@ namespace ignite
functions.fpSSL_set_connect_state = LoadSslMethod("SSL_set_connect_state");
functions.fpSSL_connect = LoadSslMethod("SSL_connect");
+ functions.fpSSL_set_bio = LoadSslMethod("SSL_set_bio");
functions.fpSSL_get_error = LoadSslMethod("SSL_get_error");
functions.fpSSL_want = LoadSslMethod("SSL_want");
functions.fpSSL_write = LoadSslMethod("SSL_write");
functions.fpSSL_read = LoadSslMethod("SSL_read");
functions.fpSSL_pending = LoadSslMethod("SSL_pending");
+ functions.fpSSL_get_state = TryLoadSslMethod("SSL_get_state");
+ if (!functions.fpSSL_get_state)
+ functions.fpSSL_get_state = LoadSslMethod("SSL_state");
+
functions.fpSSL_get_fd = LoadSslMethod("SSL_get_fd");
+ functions.fpSSL_new = LoadSslMethod("SSL_new");
functions.fpSSL_free = LoadSslMethod("SSL_free");
+ functions.fpBIO_new = LoadSslMethod("BIO_new");
functions.fpBIO_new_ssl_connect = LoadSslMethod("BIO_new_ssl_connect");
+ functions.fpBIO_s_mem = LoadSslMethod("BIO_s_mem");
+ functions.fpBIO_read = LoadSslMethod("BIO_read");
+ functions.fpBIO_write = LoadSslMethod("BIO_write");
+
functions.fpOPENSSL_config = LoadSslMethod("OPENSSL_config");
functions.fpX509_free = LoadSslMethod("X509_free");
@@ -495,6 +506,17 @@ namespace ignite
return fp(s);
}
+ void SslGateway::SSL_set_bio_(SSL* s, BIO* rbio, BIO* wbio)
+ {
+ assert(functions.fpSSL_set_bio != 0);
+
+ typedef void (FuncType)(SSL*, BIO*, BIO*);
+
+ FuncType* fp = reinterpret_cast<FuncType*>(functions.fpSSL_set_bio);
+
+ fp(s, rbio, wbio);
+ }
+
int SslGateway::SSL_get_error_(const SSL* s, int ret)
{
assert(functions.fpSSL_get_error != 0);
@@ -550,6 +572,26 @@ namespace ignite
return fp(ssl);
}
+ int SslGateway::SSL_is_init_finished_(const SSL* ssl)
+ {
+ assert(functions.fpSSL_get_state != 0);
+
+ typedef int (FuncType)(const SSL*);
+
+ FuncType* fp = reinterpret_cast<FuncType*>(functions.fpSSL_get_state);
+
+ enum {
+ IGNITE_SSL_STATE_OK =
+#ifdef SSL_ST_OK
+ SSL_ST_OK
+#else
+ TLS_ST_OK
+#endif
+ };
+
+ return fp(ssl) == IGNITE_SSL_STATE_OK ? 1 : 0;
+ }
+
int SslGateway::SSL_get_fd_(const SSL* ssl)
{
assert(functions.fpSSL_get_fd != 0);
@@ -561,6 +603,17 @@ namespace ignite
return fp(ssl);
}
+ SSL* SslGateway::SSL_new_(SSL_CTX* ctx)
+ {
+ assert(functions.fpSSL_new != 0);
+
+ typedef SSL* (FuncType)(SSL_CTX*);
+
+ FuncType* fp = reinterpret_cast<FuncType*>(functions.fpSSL_new);
+
+ return fp(ctx);
+ }
+
void SslGateway::SSL_free_(SSL* ssl)
{
assert(functions.fpSSL_free != 0);
@@ -619,6 +672,17 @@ namespace ignite
fp(a);
}
+ BIO* SslGateway::BIO_new_(const BIO_METHOD* method)
+ {
+ assert(functions.fpBIO_new != 0);
+
+ typedef BIO*(FuncType)(const BIO_METHOD*);
+
+ FuncType* fp = reinterpret_cast<FuncType*>(functions.fpBIO_new);
+
+ return fp(method);
+ }
+
BIO* SslGateway::BIO_new_ssl_connect_(SSL_CTX* ctx)
{
assert(functions.fpBIO_new_ssl_connect != 0);
@@ -641,6 +705,44 @@ namespace ignite
fp(a);
}
+ const BIO_METHOD* SslGateway::BIO_s_mem_()
+ {
+ assert(functions.fpBIO_s_mem != 0);
+
+ typedef const BIO_METHOD* (FuncType)();
+
+ FuncType* fp = reinterpret_cast<FuncType*>(functions.fpBIO_s_mem);
+
+ return fp();
+ }
+
+ int SslGateway::BIO_read_(BIO* b, void* data, int len)
+ {
+ assert(functions.fpBIO_read != 0);
+
+ typedef int (FuncType)(BIO*, void*, int);
+
+ FuncType* fp = reinterpret_cast<FuncType*>(functions.fpBIO_read);
+
+ return fp(b, data, len);
+ }
+
+ int SslGateway::BIO_write_(BIO* b, const void *data, int len)
+ {
+ assert(functions.fpBIO_write != 0);
+
+ typedef int (FuncType)(BIO*, const void*, int);
+
+ FuncType* fp = reinterpret_cast<FuncType*>(functions.fpBIO_write);
+
+ return fp(b, data, len);
+ }
+
+ int SslGateway::BIO_pending_(BIO* b)
+ {
+ return BIO_ctrl_(b, BIO_CTRL_PENDING, 0, NULL);
+ }
+
long SslGateway::BIO_ctrl_(BIO* bp, int cmd, long larg, void* parg)
{
assert(functions.fpBIO_ctrl != 0);
diff --git a/modules/platforms/cpp/network/src/network/ssl/ssl_gateway.h b/modules/platforms/cpp/network/src/network/ssl/ssl_gateway.h
index 30134f3..13326b5 100644
--- a/modules/platforms/cpp/network/src/network/ssl/ssl_gateway.h
+++ b/modules/platforms/cpp/network/src/network/ssl/ssl_gateway.h
@@ -54,18 +54,25 @@ namespace ignite
void *fpSSL_ctrl;
void *fpSSLv23_client_method;
void *fpSSL_set_connect_state;
+ void *fpSSL_set_bio;
void *fpSSL_connect;
void *fpSSL_get_error;
void *fpSSL_want;
void *fpSSL_write;
void *fpSSL_read;
void *fpSSL_pending;
+ void *fpSSL_get_state;
void *fpSSL_get_fd;
+ void *fpSSL_new;
void *fpSSL_free;
void *fpOPENSSL_config;
void *fpX509_free;
+ void *fpBIO_new;
void *fpBIO_new_ssl_connect;
void *fpBIO_free_all;
+ void *fpBIO_s_mem;
+ void *fpBIO_read;
+ void *fpBIO_write;
void *fpBIO_ctrl;
void *fpERR_get_error;
void *fpERR_error_string_n;
@@ -154,6 +161,8 @@ namespace ignite
int SSL_connect_(SSL* s);
+ void SSL_set_bio_(SSL* s, BIO* rbio, BIO* wbio);
+
int SSL_get_error_(const SSL* s, int ret);
int SSL_want_(const SSL* s);
@@ -164,8 +173,12 @@ namespace ignite
int SSL_pending_(const SSL* ssl);
+ int SSL_is_init_finished_(const SSL* ssl);
+
int SSL_get_fd_(const SSL* ssl);
+ SSL* SSL_new_(SSL_CTX* ctx);
+
void SSL_free_(SSL* ssl);
const SSL_METHOD* SSLv23_client_method_();
@@ -176,10 +189,20 @@ namespace ignite
void X509_free_(X509* a);
+ BIO* BIO_new_(const BIO_METHOD* method);
+
BIO* BIO_new_ssl_connect_(SSL_CTX* ctx);
void BIO_free_all_(BIO* a);
+ const BIO_METHOD* BIO_s_mem_();
+
+ int BIO_read_(BIO* b, void* data, int len);
+
+ int BIO_write_(BIO* b, const void *data, int len);
+
+ int BIO_pending_(BIO* b);
+
long BIO_ctrl_(BIO* bp, int cmd, long larg, void* parg);
long BIO_get_fd_(BIO* bp, int* fd);
@@ -274,4 +297,4 @@ namespace ignite
}
}
-#endif //_IGNITE_NETWORK_SSL_SSL_GATEWAY
\ No newline at end of file
+#endif //_IGNITE_NETWORK_SSL_SSL_GATEWAY
diff --git a/modules/platforms/cpp/network/src/network/tcp_socket_client.h b/modules/platforms/cpp/network/src/network/tcp_socket_client.h
index 8627d99..c034b76 100644
--- a/modules/platforms/cpp/network/src/network/tcp_socket_client.h
+++ b/modules/platforms/cpp/network/src/network/tcp_socket_client.h
@@ -39,12 +39,6 @@ namespace ignite
/** Buffers size */
enum { BUFFER_SIZE = 0x10000 };
- /** The time in seconds the connection needs to remain idle before starts sending keepalive probes. */
- enum { KEEP_ALIVE_IDLE_TIME = 60 };
-
- /** The time in seconds between individual keepalive probes. */
- enum { KEEP_ALIVE_PROBES_PERIOD = 1 };
-
/**
* Constructor.
*/
@@ -105,11 +99,6 @@ namespace ignite
void InternalClose();
/**
- * Tries set socket options.
- */
- void TrySetOptions();
-
- /**
* Wait on the socket for any event for specified time.
* This function uses poll to achive timeout functionality
* for every separate socket operation.
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h b/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h
index b670ee0..1e13348 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h
@@ -99,7 +99,7 @@ namespace ignite
*
* @param cfg Configuration.
*/
- void Establish(const config::Configuration cfg);
+ void Establish(const config::Configuration& cfg);
/**
* Release established connection.
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/utility.h b/modules/platforms/cpp/odbc/include/ignite/odbc/utility.h
index 152da66..44977bc 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/utility.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/utility.h
@@ -101,14 +101,6 @@ namespace ignite
* @return Standard string containing the same data.
*/
std::string SqlStringToString(const unsigned char* sqlStr, int32_t sqlStrLen);
-
- /**
- * Convert binary data to hex dump form
- * @param data pointer to data
- * @param count data length
- * @return standard string containing the formated hex dump
- */
- std::string HexDump(const void* data, size_t count);
}
}
diff --git a/modules/platforms/cpp/odbc/src/connection.cpp b/modules/platforms/cpp/odbc/src/connection.cpp
index ba94e0b..dd1779c 100644
--- a/modules/platforms/cpp/odbc/src/connection.cpp
+++ b/modules/platforms/cpp/odbc/src/connection.cpp
@@ -136,7 +136,7 @@ namespace ignite
return InternalEstablish(config);
}
- void Connection::Establish(const config::Configuration cfg)
+ void Connection::Establish(const config::Configuration& cfg)
{
IGNITE_ODBC_API_CALL(InternalEstablish(cfg));
}
@@ -147,7 +147,7 @@ namespace ignite
if (sslMode == ssl::SslMode::DISABLE)
{
- socket.reset(network::ssl::MakeTcpSocketClient());
+ socket.reset(network::MakeTcpSocketClient());
return SqlResult::AI_SUCCESS;
}
@@ -288,7 +288,7 @@ namespace ignite
throw OdbcError(SqlState::S08S01_LINK_FAILURE, "Can not send message due to connection failure");
#ifdef PER_BYTE_DEBUG
- LOG_MSG("message sent: (" << msg.GetSize() << " bytes)" << utility::HexDump(msg.GetData(), msg.GetSize()));
+ LOG_MSG("message sent: (" << msg.GetSize() << " bytes)" << common::HexDump(msg.GetData(), msg.GetSize()));
#endif //PER_BYTE_DEBUG
return true;
@@ -357,7 +357,7 @@ namespace ignite
throw OdbcError(SqlState::S08S01_LINK_FAILURE, "Can not receive message body");
#ifdef PER_BYTE_DEBUG
- LOG_MSG("Message received: " << utility::HexDump(&msg[0], msg.size()));
+ LOG_MSG("Message received: " << common::HexDump(&msg[0], msg.size()));
#endif //PER_BYTE_DEBUG
return true;
diff --git a/modules/platforms/cpp/odbc/src/utility.cpp b/modules/platforms/cpp/odbc/src/utility.cpp
index c060a0a..6f34d10 100644
--- a/modules/platforms/cpp/odbc/src/utility.cpp
+++ b/modules/platforms/cpp/odbc/src/utility.cpp
@@ -157,21 +157,6 @@ namespace ignite
else
res.clear();
}
-
- std::string HexDump(const void* data, size_t count)
- {
- std::stringstream dump;
- size_t cnt = 0;
- for(const uint8_t* p = (const uint8_t*)data, *e = (const uint8_t*)data + count; p != e; ++p)
- {
- if (cnt++ % 16 == 0)
- {
- dump << std::endl;
- }
- dump << std::hex << std::setfill('0') << std::setw(2) << (int)*p << " ";
- }
- return dump.str();
- }
}
}
diff --git a/modules/platforms/cpp/thin-client-test/src/cache_client_test.cpp b/modules/platforms/cpp/thin-client-test/src/cache_client_test.cpp
index 9bd4d89..3840b87 100644
--- a/modules/platforms/cpp/thin-client-test/src/cache_client_test.cpp
+++ b/modules/platforms/cpp/thin-client-test/src/cache_client_test.cpp
@@ -85,14 +85,14 @@ public:
StartNode("node1");
StartNode("node2");
- boost::this_thread::sleep_for(boost::chrono::seconds(2));
-
IgniteClientConfiguration cfg;
cfg.SetEndPoints("127.0.0.1:11110,127.0.0.1:11111,127.0.0.1:11112");
cfg.SetPartitionAwareness(true);
IgniteClient client = IgniteClient::Start(cfg);
+ boost::this_thread::sleep_for(boost::chrono::seconds(2));
+
cache::CacheClient<KeyType, int64_t> cache =
client.GetCache<KeyType, int64_t>("partitioned");
@@ -465,14 +465,14 @@ BOOST_AUTO_TEST_CASE(CacheClientPartitionsString)
StartNode("node1");
StartNode("node2");
- boost::this_thread::sleep_for(boost::chrono::seconds(2));
-
IgniteClientConfiguration cfg;
cfg.SetEndPoints("127.0.0.1:11110,127.0.0.1:11111,127.0.0.1:11112");
cfg.SetPartitionAwareness(true);
IgniteClient client = IgniteClient::Start(cfg);
+ boost::this_thread::sleep_for(boost::chrono::seconds(2));
+
cache::CacheClient<std::string, int64_t> cache =
client.GetCache<std::string, int64_t>("partitioned");
@@ -495,14 +495,14 @@ BOOST_AUTO_TEST_CASE(CacheClientPartitionsGuid)
StartNode("node1");
StartNode("node2");
- boost::this_thread::sleep_for(boost::chrono::seconds(2));
-
IgniteClientConfiguration cfg;
cfg.SetEndPoints("127.0.0.1:11110,127.0.0.1:11111,127.0.0.1:11112");
cfg.SetPartitionAwareness(true);
IgniteClient client = IgniteClient::Start(cfg);
+ boost::this_thread::sleep_for(boost::chrono::seconds(2));
+
cache::CacheClient<ignite::Guid, int64_t> cache =
client.GetCache<ignite::Guid, int64_t>("partitioned");
@@ -525,14 +525,14 @@ BOOST_AUTO_TEST_CASE(CacheClientPartitionsComplexType)
StartNode("node1");
StartNode("node2");
- boost::this_thread::sleep_for(boost::chrono::seconds(2));
-
IgniteClientConfiguration cfg;
cfg.SetEndPoints("127.0.0.1:11110,127.0.0.1:11111,127.0.0.1:11112");
cfg.SetPartitionAwareness(true);
IgniteClient client = IgniteClient::Start(cfg);
+ boost::this_thread::sleep_for(boost::chrono::seconds(2));
+
cache::CacheClient<ignite::ComplexType, int64_t> cache =
client.GetCache<ignite::ComplexType, int64_t>("partitioned");
@@ -571,14 +571,14 @@ BOOST_AUTO_TEST_CASE(CacheClientPartitionsDate)
StartNode("node1");
StartNode("node2");
- boost::this_thread::sleep_for(boost::chrono::seconds(2));
-
IgniteClientConfiguration cfg;
cfg.SetEndPoints("127.0.0.1:11110,127.0.0.1:11111,127.0.0.1:11112");
cfg.SetPartitionAwareness(true);
IgniteClient client = IgniteClient::Start(cfg);
+ boost::this_thread::sleep_for(boost::chrono::seconds(2));
+
cache::CacheClient<ignite::Date, int64_t> cache =
client.GetCache<ignite::Date, int64_t>("partitioned");
@@ -615,14 +615,14 @@ BOOST_AUTO_TEST_CASE(CacheClientPartitionsTime)
StartNode("node1");
StartNode("node2");
- boost::this_thread::sleep_for(boost::chrono::seconds(2));
-
IgniteClientConfiguration cfg;
cfg.SetEndPoints("127.0.0.1:11110,127.0.0.1:11111,127.0.0.1:11112");
cfg.SetPartitionAwareness(true);
IgniteClient client = IgniteClient::Start(cfg);
+ boost::this_thread::sleep_for(boost::chrono::seconds(2));
+
cache::CacheClient<ignite::Time, int64_t> cache =
client.GetCache<ignite::Time, int64_t>("partitioned");
@@ -653,14 +653,14 @@ BOOST_AUTO_TEST_CASE(CacheClientPartitionsTimestamp)
StartNode("node1");
StartNode("node2");
- boost::this_thread::sleep_for(boost::chrono::seconds(2));
-
IgniteClientConfiguration cfg;
cfg.SetEndPoints("127.0.0.1:11110,127.0.0.1:11111,127.0.0.1:11112");
cfg.SetPartitionAwareness(true);
IgniteClient client = IgniteClient::Start(cfg);
+ boost::this_thread::sleep_for(boost::chrono::seconds(2));
+
cache::CacheClient<ignite::Timestamp, int64_t> cache =
client.GetCache<ignite::Timestamp, int64_t>("partitioned");
@@ -845,14 +845,14 @@ BOOST_AUTO_TEST_CASE(CacheClientPartitionsDefaultDynamicCacheThreeNodes)
StartNode("node1");
StartNode("node2");
- boost::this_thread::sleep_for(boost::chrono::seconds(2));
-
IgniteClientConfiguration cfg;
cfg.SetEndPoints("127.0.0.1:11110,127.0.0.1:11111,127.0.0.1:11112");
cfg.SetPartitionAwareness(true);
IgniteClient client = IgniteClient::Start(cfg);
+ boost::this_thread::sleep_for(boost::chrono::seconds(2));
+
cache::CacheClient<std::string, int64_t> cache =
client.CreateCache<std::string, int64_t>("defaultdynamic2");
@@ -877,14 +877,14 @@ BOOST_AUTO_TEST_CASE(CacheClientPartitionsRebalance)
StartNode("node2");
StartNode("node3");
- boost::this_thread::sleep_for(boost::chrono::seconds(2));
-
IgniteClientConfiguration cfg;
cfg.SetEndPoints("127.0.0.1:11110,127.0.0.1:11111,127.0.0.1:11112,127.0.0.1:11113");
cfg.SetPartitionAwareness(true);
IgniteClient client = IgniteClient::Start(cfg);
+ boost::this_thread::sleep_for(boost::chrono::seconds(2));
+
cache::CacheClient<std::string, int64_t> cache =
client.CreateCache<std::string, int64_t>("defaultdynamic3");
@@ -920,13 +920,13 @@ BOOST_AUTO_TEST_CASE(CacheClientPartitionsDisabledThreeNodes)
StartNode("node1");
StartNode("node2");
- boost::this_thread::sleep_for(boost::chrono::seconds(2));
-
IgniteClientConfiguration cfg;
cfg.SetEndPoints("127.0.0.1:11110,127.0.0.1:11111,127.0.0.1:11112");
IgniteClient client = IgniteClient::Start(cfg);
+ boost::this_thread::sleep_for(boost::chrono::seconds(2));
+
cache::CacheClient<std::string, int64_t> cache =
client.CreateCache<std::string, int64_t>("defaultdynamic4");
diff --git a/modules/platforms/cpp/thin-client-test/src/ignite_client_test.cpp b/modules/platforms/cpp/thin-client-test/src/ignite_client_test.cpp
index 7476305..786b883 100644
--- a/modules/platforms/cpp/thin-client-test/src/ignite_client_test.cpp
+++ b/modules/platforms/cpp/thin-client-test/src/ignite_client_test.cpp
@@ -66,6 +66,9 @@ public:
IgniteClient client = IgniteClient::Start(cfg);
BOOST_CHECK(WaitForConnections(expect));
+
+ boost::this_thread::sleep_for(boost::chrono::seconds(2));
+
BOOST_CHECK_EQUAL(GetActiveConnections(), expect);
}
@@ -109,30 +112,34 @@ BOOST_FIXTURE_TEST_SUITE(IgniteClientTestSuite, IgniteClientTestSuiteFixture)
BOOST_AUTO_TEST_CASE(IgniteClientConnection)
{
- ignite::Ignite serverNode = ignite_test::StartCrossPlatformServerNode("cache.xml", "ServerNode");
+ ignite::Ignite serverNode = StartNodeWithLog("0");
IgniteClientConfiguration cfg;
cfg.SetEndPoints("127.0.0.1:11110");
- IgniteClient::Start(cfg);
+ IgniteClient client = IgniteClient::Start(cfg);
+
+ BOOST_CHECK(WaitForConnections(1));
+ BOOST_CHECK_EQUAL(GetActiveConnections(), 1);
}
BOOST_AUTO_TEST_CASE(IgniteClientConnectionFailover)
{
- ignite::Ignite serverNode = ignite_test::StartCrossPlatformServerNode("cache.xml", "ServerNode");
+ ignite::Ignite serverNode = StartNodeWithLog("0");
IgniteClientConfiguration cfg;
cfg.SetEndPoints("127.0.0.1:11109..11111");
- IgniteClient::Start(cfg);
+ IgniteClient client = IgniteClient::Start(cfg);
+
+ BOOST_CHECK(WaitForConnections(1));
+ BOOST_CHECK_EQUAL(GetActiveConnections(), 1);
}
BOOST_AUTO_TEST_CASE(IgniteClientConnectionLimit)
{
- ignite::common::DeletePath("logs");
-
ignite::Ignite serverNode0 = StartNodeWithLog("0");
ignite::Ignite serverNode1 = StartNodeWithLog("1");
ignite::Ignite serverNode2 = StartNodeWithLog("2");
@@ -149,4 +156,41 @@ BOOST_AUTO_TEST_CASE(IgniteClientConnectionLimit)
CheckConnectionsNum(cfg, 100500, 3);
}
+BOOST_AUTO_TEST_CASE(IgniteClientReconnect)
+{
+ ignite::Ignite serverNode0 = StartNodeWithLog("0");
+ ignite::Ignite serverNode1 = StartNodeWithLog("1");
+
+ IgniteClientConfiguration cfg;
+
+ cfg.SetEndPoints("127.0.0.1:11110,127.0.0.1:11111,127.0.0.1:11112");
+
+ IgniteClient client = IgniteClient::Start(cfg);
+
+ BOOST_CHECK(WaitForConnections(2));
+ BOOST_CHECK_EQUAL(GetActiveConnections(), 2);
+
+ ignite::Ignite serverNode2 = StartNodeWithLog("2");
+
+ BOOST_CHECK(WaitForConnections(3));
+ BOOST_CHECK_EQUAL(GetActiveConnections(), 3);
+
+ ignite::Ignition::Stop(serverNode1.GetName(), true);
+
+ BOOST_CHECK(WaitForConnections(2));
+ BOOST_CHECK_EQUAL(GetActiveConnections(), 2);
+
+ serverNode1 = StartNodeWithLog("1");
+
+ BOOST_CHECK(WaitForConnections(3, 20000));
+ BOOST_CHECK_EQUAL(GetActiveConnections(), 3);
+
+ ignite::Ignition::StopAll(true);
+
+ BOOST_CHECK(WaitForConnections(0));
+ BOOST_CHECK_EQUAL(GetActiveConnections(), 0);
+
+ BOOST_REQUIRE_THROW((client.GetOrCreateCache<int, int>("test")), ignite::IgniteError);
+}
+
BOOST_AUTO_TEST_SUITE_END()
diff --git a/modules/platforms/cpp/thin-client-test/src/ssl_test.cpp b/modules/platforms/cpp/thin-client-test/src/ssl_test.cpp
index 845360b..94031eb 100644
--- a/modules/platforms/cpp/thin-client-test/src/ssl_test.cpp
+++ b/modules/platforms/cpp/thin-client-test/src/ssl_test.cpp
@@ -17,6 +17,8 @@
#include <boost/test/unit_test.hpp>
+#include <ignite/common/utils.h>
+
#include <ignite/ignition.h>
#include <ignite/thin/ignite_client_configuration.h>
@@ -30,14 +32,14 @@ using namespace boost::unit_test;
class SslTestSuiteFixture
{
public:
- ignite::Ignite StartSslNode()
+ ignite::Ignite StartSslNode(const std::string& name = "ServerNode")
{
- return ignite_test::StartCrossPlatformServerNode("ssl.xml", "ServerNode");
+ return ignite_test::StartCrossPlatformServerNode("ssl.xml", name.c_str());
}
- ignite::Ignite StartNonSslNode()
+ ignite::Ignite StartNonSslNode(const std::string& name = "ServerNode")
{
- return ignite_test::StartCrossPlatformServerNode("non-ssl.xml", "ServerNode");
+ return ignite_test::StartCrossPlatformServerNode("non-ssl.xml", name.c_str());
}
SslTestSuiteFixture()
@@ -124,4 +126,74 @@ BOOST_AUTO_TEST_CASE(SslConnectionTimeout)
BOOST_CHECK_THROW(IgniteClient::Start(cfg), ignite::IgniteError);
}
+BOOST_AUTO_TEST_CASE(SslCacheClientPutAllGetAll)
+{
+ StartSslNode("node1");
+ StartSslNode("node2");
+
+ IgniteClientConfiguration cfg;
+ cfg.SetEndPoints("127.0.0.1:11110,127.0.0.1:11110");
+
+ cfg.SetSslMode(SslMode::REQUIRE);
+ cfg.SetSslCertFile(GetConfigFile("client_full.pem"));
+ cfg.SetSslKeyFile(GetConfigFile("client_full.pem"));
+ cfg.SetSslCaFile(GetConfigFile("ca.pem"));
+
+ IgniteClient client = IgniteClient::Start(cfg);
+
+ cache::CacheClient<int32_t, std::string> cache =
+ client.CreateCache<int32_t, std::string>("test");
+
+ enum { BATCH_SIZE = 20000 };
+
+ std::map<int32_t, std::string> values;
+ std::set<int32_t> keys;
+
+ for (int32_t j = 0; j < BATCH_SIZE; ++j)
+ {
+ int32_t key = BATCH_SIZE + j;
+
+ values[key] = "value_" + ignite::common::LexicalCast<std::string>(key);
+ keys.insert(key);
+ }
+
+ cache.PutAll(values);
+
+ std::map<int32_t, std::string> retrieved;
+ cache.GetAll(keys, retrieved);
+
+ BOOST_REQUIRE(values == retrieved);
+}
+
+BOOST_AUTO_TEST_CASE(SslCacheClientPutGet)
+{
+ StartSslNode("node1");
+ StartSslNode("node2");
+
+ IgniteClientConfiguration cfg;
+ cfg.SetEndPoints("127.0.0.1:11110,127.0.0.1:11110");
+
+ cfg.SetSslMode(SslMode::REQUIRE);
+ cfg.SetSslCertFile(GetConfigFile("client_full.pem"));
+ cfg.SetSslKeyFile(GetConfigFile("client_full.pem"));
+ cfg.SetSslCaFile(GetConfigFile("ca.pem"));
+
+ IgniteClient client = IgniteClient::Start(cfg);
+
+ cache::CacheClient<int32_t, std::string> cache =
+ client.CreateCache<int32_t, std::string>("test");
+
+ enum { OPS_NUM = 100 };
+ for (int32_t j = 0; j < OPS_NUM; ++j)
+ {
+ int32_t key = OPS_NUM + j;
+ std::string value = "value_" + ignite::common::LexicalCast<std::string>(key);
+
+ cache.Put(key, value);
+ std::string retrieved = cache.Get(key);
+
+ BOOST_REQUIRE_EQUAL(value, retrieved);
+ }
+}
+
BOOST_AUTO_TEST_SUITE_END()
diff --git a/modules/platforms/cpp/thin-client-test/src/test_utils.cpp b/modules/platforms/cpp/thin-client-test/src/test_utils.cpp
index fd179d2..0c56d70 100644
--- a/modules/platforms/cpp/thin-client-test/src/test_utils.cpp
+++ b/modules/platforms/cpp/thin-client-test/src/test_utils.cpp
@@ -71,7 +71,7 @@ namespace ignite_test
cfg.jvmOpts.push_back("-DIGNITE_LOG_CLASSPATH_CONTENT_ON_STARTUP=false");
cfg.jvmOpts.push_back("-Duser.language=en");
// Un-comment to debug SSL
- //cfg.jvmOpts.push_back("-Djavax.net.debug=ssl");
+ // cfg.jvmOpts.push_back("-Djavax.net.debug=ssl");
cfg.igniteHome = jni::ResolveIgniteHome();
cfg.jvmClassPath = jni::CreateIgniteHomeClasspath(cfg.igniteHome, true);
diff --git a/modules/platforms/cpp/thin-client/include/ignite/thin/ignite_client_configuration.h b/modules/platforms/cpp/thin-client/include/ignite/thin/ignite_client_configuration.h
index acb8a53..68750a2 100644
--- a/modules/platforms/cpp/thin-client/include/ignite/thin/ignite_client_configuration.h
+++ b/modules/platforms/cpp/thin-client/include/ignite/thin/ignite_client_configuration.h
@@ -39,6 +39,9 @@ namespace ignite
class IgniteClientConfiguration
{
public:
+ /** Connection operation timeout in milliseconds. */
+ enum { DEFAULT_CONNECTION_TIMEOUT = 20000 };
+
/**
* Default constructor.
*
@@ -47,7 +50,8 @@ namespace ignite
IgniteClientConfiguration() :
sslMode(SslMode::DISABLE),
partitionAwareness(true),
- connectionsLimit(0)
+ connectionsLimit(0),
+ connectionTimeout(DEFAULT_CONNECTION_TIMEOUT)
{
// No-op.
}
@@ -70,7 +74,7 @@ namespace ignite
*
* For example: "localhost,example.com:12345,127.0.0.1:10800..10900,192.168.3.80:5893".
*
- * @param endPoints Addressess of the remote servers to connect.
+ * @param endPoints Addresses of the remote servers to connect.
*/
void SetEndPoints(const std::string& endPoints)
{
@@ -262,6 +266,34 @@ namespace ignite
connectionsLimit = limit;
}
+ /**
+ * Get connection timeout.
+ *
+ * Used as a timeout for any operation performed over TCP sockets.
+ *
+ * Zero value means that there is no timeout.
+ *
+ * The default value is @c DEFAULT_CONNECTION_TIMEOUT.
+ *
+ * @return Connection timeout in milliseconds.
+ */
+ int32_t GetConnectionTimeout() const
+ {
+ return connectionTimeout;
+ }
+
+ /**
+ * Set connection timeout.
+ *
+ * @see GetConnectionTimeout for details.
+ *
+ * @param timeout Connection timeout in milliseconds to set.
+ */
+ void SetConnectionTimeout(int32_t timeout)
+ {
+ connectionTimeout = timeout;
+ }
+
private:
/** Connection end points */
std::string endPoints;
@@ -289,6 +321,9 @@ namespace ignite
/** Active connections limit. */
uint32_t connectionsLimit;
+
+ /** Connection timeout in milliseconds. */
+ int32_t connectionTimeout;
};
}
}
diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp
index 6be7064..74219e9 100644
--- a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp
@@ -51,11 +51,13 @@ namespace ignite
CacheClientImpl::~CacheClientImpl()
{
- // No-op.
+ DataRouter* router0 = router.Get();
+ if (router0)
+ router0->Close();
}
template<typename ReqT, typename RspT>
- void CacheClientImpl::SyncCacheKeyMessage(const WritableKey& key, const ReqT& req, RspT& rsp)
+ void CacheClientImpl::SyncCacheKeyMessage(const WritableKey& key, ReqT& req, RspT& rsp)
{
DataRouter& router0 = *router.Get();
@@ -91,7 +93,7 @@ namespace ignite
}
template<typename ReqT, typename RspT>
- SP_DataChannel CacheClientImpl::SyncMessage(const ReqT& req, RspT& rsp)
+ SP_DataChannel CacheClientImpl::SyncMessage(ReqT& req, RspT& rsp)
{
SP_DataChannel channel = router.Get()->SyncMessage(req, rsp);
@@ -102,7 +104,7 @@ namespace ignite
}
template<typename ReqT, typename RspT>
- SP_DataChannel CacheClientImpl::SyncMessageSql(const ReqT& req, RspT& rsp)
+ SP_DataChannel CacheClientImpl::SyncMessageSql(ReqT& req, RspT& rsp)
{
SP_DataChannel channel;
try {
diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h
index a7ab0be..0df1ae3 100644
--- a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h
+++ b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h
@@ -308,7 +308,7 @@ namespace ignite
* @throw IgniteError on error.
*/
template<typename ReqT, typename RspT>
- void SyncCacheKeyMessage(const WritableKey& key, const ReqT& req, RspT& rsp);
+ void SyncCacheKeyMessage(const WritableKey& key, ReqT& req, RspT& rsp);
/**
* Synchronously send message and receive response.
@@ -319,7 +319,7 @@ namespace ignite
* @throw IgniteError on error.
*/
template<typename ReqT, typename RspT>
- SP_DataChannel SyncMessage(const ReqT& req, RspT& rsp);
+ SP_DataChannel SyncMessage(ReqT& req, RspT& rsp);
/**
* Synchronously send message and receive response.
@@ -331,7 +331,7 @@ namespace ignite
* @throw IgniteError on error.
*/
template<typename ReqT, typename RspT>
- SP_DataChannel SyncMessageSql(const ReqT& req, RspT& rsp);
+ SP_DataChannel SyncMessageSql(ReqT& req, RspT& rsp);
/**
* Synchronously send request message and receive response taking in account that it can be
diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/query/cursor_page.h b/modules/platforms/cpp/thin-client/src/impl/cache/query/cursor_page.h
index 34be0a7..f23aa09 100644
--- a/modules/platforms/cpp/thin-client/src/impl/cache/query/cursor_page.h
+++ b/modules/platforms/cpp/thin-client/src/impl/cache/query/cursor_page.h
@@ -67,11 +67,12 @@ namespace ignite
startPos = stream->Position();
interop::InteropUnpooledMemory* streamMem =
- static_cast<interop::InteropUnpooledMemory*>(stream->GetMemory());
+ const_cast<interop::InteropUnpooledMemory*>(
+ static_cast<const interop::InteropUnpooledMemory*>(stream->GetMemory()));
bool gotOwnership = streamMem->TryGetOwnership(mem);
- (void) gotOwnership;
+ IGNITE_UNUSED(gotOwnership);
assert(gotOwnership);
}
diff --git a/modules/platforms/cpp/thin-client/src/impl/compute/compute_client_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/channel_state_handler.h
similarity index 50%
copy from modules/platforms/cpp/thin-client/src/impl/compute/compute_client_impl.cpp
copy to modules/platforms/cpp/thin-client/src/impl/channel_state_handler.h
index b42fab3..14b4712 100644
--- a/modules/platforms/cpp/thin-client/src/impl/compute/compute_client_impl.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/channel_state_handler.h
@@ -15,10 +15,10 @@
* limitations under the License.
*/
-#include "impl/compute/compute_client_impl.h"
-#include "impl/message.h"
+#ifndef _IGNITE_IMPL_THIN_CHANNEL_STATE_HANDLER
+#define _IGNITE_IMPL_THIN_CHANNEL_STATE_HANDLER
-using namespace ignite::common::concurrent;
+#include <stdint.h>
namespace ignite
{
@@ -26,22 +26,35 @@ namespace ignite
{
namespace thin
{
- namespace compute
+ /** Channel state handler. */
+ class ChannelStateHandler
{
- void ComputeClientImpl::ExecuteJavaTask(int8_t flags, int64_t timeout, const std::string& taskName,
- Writable& wrArg, Readable& res)
+ public:
+ /**
+ * Destructor.
+ */
+ virtual ~ChannelStateHandler()
{
- ComputeTaskExecuteRequest req(flags, timeout, taskName, wrArg);
- ComputeTaskFinishedNotification notification(res);
-
- router.Get()->SyncMessageWithNotification(req, notification);
+ // No-op.
+ }
- if (notification.IsFailure())
- throw IgniteError(IgniteError::IGNITE_ERR_COMPUTE_TASK_CANCELLED,
- notification.GetErrorMessage().c_str());
+ /**
+ * Channel handshake completion callback.
+ *
+ * @param id Channel ID.
+ */
+ virtual void OnHandshakeSuccess(uint64_t id) = 0;
- }
- }
+ /**
+ * Channel handshake error callback.
+ *
+ * @param id Channel ID.
+ * @param err Error.
+ */
+ virtual void OnHandshakeError(uint64_t id, const IgniteError& err) = 0;
+ };
}
}
}
+
+#endif //_IGNITE_IMPL_THIN_CHANNEL_STATE_HANDLER
\ No newline at end of file
diff --git a/modules/platforms/cpp/thin-client/src/impl/compute/compute_client_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/compute/compute_client_impl.cpp
index b42fab3..c7904ed 100644
--- a/modules/platforms/cpp/thin-client/src/impl/compute/compute_client_impl.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/compute/compute_client_impl.cpp
@@ -15,11 +15,80 @@
* limitations under the License.
*/
+#include <ignite/common/promise.h>
+
#include "impl/compute/compute_client_impl.h"
#include "impl/message.h"
using namespace ignite::common::concurrent;
+namespace
+{
+ using namespace ignite;
+ using namespace impl;
+ using namespace impl::thin;
+
+ /**
+ * Handler for java task notification.
+ */
+ class JavaTaskNotificationHandler : public NotificationHandler
+ {
+ public:
+ /**
+ * Constructor.
+ * @param channel Channel.
+ * @param res Result.
+ */
+ JavaTaskNotificationHandler(DataChannel& channel, Readable& res) :
+ channel(channel),
+ res(res)
+ {
+ // No-op.
+ }
+
+ virtual ~JavaTaskNotificationHandler()
+ {
+ // No-op.
+ }
+
+ virtual bool OnNotification(const network::DataBuffer& msg)
+ {
+ ComputeTaskFinishedNotification notification(res);
+ channel.DeserializeMessage(msg, notification);
+
+ if (notification.IsFailure())
+ {
+ promise.SetError(IgniteError(IgniteError::IGNITE_ERR_COMPUTE_EXECUTION_REJECTED,
+ notification.GetErrorMessage().c_str()));
+ }
+ else
+ promise.SetValue();
+
+ return true;
+ }
+
+ /**
+ * Get future result.
+ *
+ * @return Future.
+ */
+ ignite::Future<void> GetFuture() const
+ {
+ return promise.GetFuture();
+ }
+
+ private:
+ /** Channel. */
+ DataChannel& channel;
+
+ /** Result. */
+ Readable& res;
+
+ /** Completion promise. */
+ ignite::common::Promise<void> promise;
+ };
+}
+
namespace ignite
{
namespace impl
@@ -32,14 +101,16 @@ namespace ignite
Writable& wrArg, Readable& res)
{
ComputeTaskExecuteRequest req(flags, timeout, taskName, wrArg);
- ComputeTaskFinishedNotification notification(res);
+ ComputeTaskExecuteResponse rsp;
+
+ SP_DataChannel channel = router.Get()->SyncMessage(req, rsp);
- router.Get()->SyncMessageWithNotification(req, notification);
+ common::concurrent::SharedPointer<JavaTaskNotificationHandler> handler(
+ new JavaTaskNotificationHandler(*channel.Get(), res));
- if (notification.IsFailure())
- throw IgniteError(IgniteError::IGNITE_ERR_COMPUTE_TASK_CANCELLED,
- notification.GetErrorMessage().c_str());
+ channel.Get()->RegisterNotificationHandler(rsp.GetNotificationId(), handler);
+ handler.Get()->GetFuture().GetValue();
}
}
}
diff --git a/modules/platforms/cpp/thin-client/src/impl/data_channel.cpp b/modules/platforms/cpp/thin-client/src/impl/data_channel.cpp
index df219d0..2bdd6dc 100644
--- a/modules/platforms/cpp/thin-client/src/impl/data_channel.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/data_channel.cpp
@@ -18,10 +18,11 @@
#include <cstddef>
#include <ignite/common/fixed_size_array.h>
+#include <ignite/common/promise.h>
+
#include <ignite/network/network.h>
#include "impl/message.h"
-#include "impl/remote_type_updater.h"
#include "impl/data_channel.h"
namespace ignite
@@ -50,15 +51,24 @@ namespace ignite
const DataChannel::VersionSet DataChannel::supportedVersions(supportedArray,
supportedArray + (sizeof(supportedArray) / sizeof(supportedArray[0])));
- DataChannel::DataChannel(const ignite::thin::IgniteClientConfiguration& cfg,
- binary::BinaryTypeManager& typeMgr) :
- ioMutex(),
- node(),
+ DataChannel::DataChannel(
+ uint64_t id,
+ const network::EndPoint& addr,
+ const ignite::network::SP_AsyncClientPool& asyncPool,
+ const ignite::thin::IgniteClientConfiguration& cfg,
+ binary::BinaryTypeManager& typeMgr,
+ ChannelStateHandler& stateHandler
+ ) :
+ stateHandler(stateHandler),
+ handshakePerformed(false),
+ id(id),
+ asyncPool(asyncPool),
+ node(addr),
config(cfg),
typeMgr(typeMgr),
currentVersion(VERSION_DEFAULT),
reqIdCounter(0),
- socket()
+ responseMutex()
{
// No-op.
}
@@ -68,224 +78,166 @@ namespace ignite
Close();
}
- bool DataChannel::Connect(const std::string& host, uint16_t port, int32_t timeout)
+ void DataChannel::StartHandshake()
{
- using ignite::thin::SslMode;
-
- if (socket.get() != 0)
- throw IgniteError(IgniteError::IGNITE_ERR_ILLEGAL_STATE, "Already connected.");
-
- if (config.GetEndPoints().empty())
- throw IgniteError(IgniteError::IGNITE_ERR_ILLEGAL_ARGUMENT, "No valid address to connect.");
-
- SslMode::Type sslMode = config.GetSslMode();
-
- if (sslMode != SslMode::DISABLE)
- {
- socket.reset(network::ssl::MakeSecureSocketClient(
- config.GetSslCertFile(), config.GetSslKeyFile(), config.GetSslCaFile()));
- }
- else
- socket.reset(network::ssl::MakeTcpSocketClient());
-
- node = IgniteNode(host, port);
-
- return TryRestoreConnection(timeout);
+ DoHandshake(VERSION_DEFAULT);
}
void DataChannel::Close()
{
- if (socket.get() != 0)
- {
- socket->Close();
-
- socket.reset();
- }
+ asyncPool.Get()->Close(id, 0);
+ handlerMap.clear();
}
- void DataChannel::InternalSyncMessage(interop::InteropUnpooledMemory& mem, int32_t timeout)
+ void DataChannel::SyncMessage(Request &req, Response &rsp, int32_t timeout)
{
- common::concurrent::CsLockGuard lock(ioMutex);
+ Future<network::DataBuffer> rspFut = AsyncMessage(req);
- InternalSyncMessageUnguarded(mem, timeout);
- }
-
- void DataChannel::InternalSyncMessageUnguarded(interop::InteropUnpooledMemory& mem, int32_t timeout)
- {
- bool success = Send(mem.Data(), mem.Length(), timeout);
+ bool success = true;
+ if (timeout)
+ success = rspFut.WaitFor(timeout);
+ else
+ rspFut.Wait();
if (!success)
{
- success = TryRestoreConnection(timeout);
+ common::concurrent::CsLockGuard lock(responseMutex);
- if (!success)
- throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE,
- "Can not send message to remote host: timeout");
+ responseMap.erase(req.GetId());
- success = Send(mem.Data(), mem.Length(), timeout);
+ std::string msg = "Can not send message to remote host " +
+ node.GetEndPoint().ToString() + " within timeout.";
- if (!success)
- throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE,
- "Can not send message to remote host: timeout");
+ throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE, msg.c_str());
}
- success = Receive(mem, timeout);
-
- if (!success)
- throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE,
- "Can not receive message response from the remote host: timeout");
+ DeserializeMessage(rspFut.GetValue(), rsp);
}
- bool DataChannel::Send(const int8_t* data, size_t len, int32_t timeout)
+ int64_t DataChannel::GenerateRequestMessage(Request &req, interop::InteropMemory &mem)
{
- if (socket.get() == 0)
- throw IgniteError(IgniteError::IGNITE_ERR_ILLEGAL_STATE, "Connection is not established");
-
- OperationResult::T res = SendAll(data, len, timeout);
-
- if (res == OperationResult::TIMEOUT)
- return false;
-
- if (res == OperationResult::FAIL)
- throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE,
- "Can not send message due to connection failure");
-
- return true;
- }
-
- DataChannel::OperationResult::T DataChannel::SendAll(const int8_t* data, size_t len, int32_t timeout)
- {
- int sent = 0;
+ interop::InteropOutputStream outStream(&mem);
+ binary::BinaryWriterImpl writer(&outStream, &typeMgr);
- while (sent != static_cast<int64_t>(len))
- {
- int res = socket->Send(data + sent, len - sent, timeout);
+ // Space for RequestSize + OperationCode + RequestID.
+ outStream.Reserve(4 + 2 + 8);
- if (res < 0 || res == network::SocketClient::WaitResult::TIMEOUT)
- {
- Close();
+ req.Write(writer, currentVersion);
- return res < 0 ? OperationResult::FAIL : OperationResult::TIMEOUT;
- }
+ int64_t reqId = GenerateRequestId();
+ req.SetId(reqId);
- sent += res;
- }
+ outStream.WriteInt32(0, outStream.Position() - 4);
+ outStream.WriteInt16(4, req.GetOperationCode());
+ outStream.WriteInt64(6, reqId);
- assert(static_cast<size_t>(sent) == len);
+ outStream.Synchronize();
- return OperationResult::SUCCESS;
+ return reqId;
}
- bool DataChannel::Receive(interop::InteropMemory& msg, int32_t timeout)
+ Future<network::DataBuffer> DataChannel::AsyncMessage(Request &req)
{
- assert(msg.Capacity() > 4);
+ // Allocating 64 KB to decrease number of re-allocations.
+ enum { BUFFER_SIZE = 1024 * 64 };
- if (socket.get() == 0)
- throw IgniteError(IgniteError::IGNITE_ERR_ILLEGAL_STATE, "DataChannel is not established");
+ interop::SP_InteropMemory mem(new interop::InteropUnpooledMemory(BUFFER_SIZE));
- // Message size
- msg.Length(4);
+ int64_t reqId = GenerateRequestMessage(req, *mem.Get());
- OperationResult::T res = ReceiveAll(msg.Data(), static_cast<size_t>(msg.Length()), timeout);
+ common::concurrent::CsLockGuard lock1(responseMutex);
+ SP_PromiseDataBuffer& sp = responseMap[reqId];
+ if (!sp.IsValid())
+ sp = SP_PromiseDataBuffer(new common::Promise<network::DataBuffer>());
- if (res == OperationResult::TIMEOUT)
- return false;
+ Future<network::DataBuffer> future = sp.Get()->GetFuture();
+ lock1.Reset();
- if (res == OperationResult::FAIL)
- throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE, "Can not receive message header");
+ network::DataBuffer buffer(mem);
+ bool success = asyncPool.Get()->Send(id, buffer);
- interop::InteropInputStream inStream(&msg);
+ if (!success)
+ {
+ common::concurrent::CsLockGuard lock2(responseMutex);
- int32_t msgLen = inStream.ReadInt32();
+ responseMap.erase(reqId);
- if (msgLen < 0)
- {
- Close();
+ std::string msg = "Can not send message to remote host " + node.GetEndPoint().ToString();
- throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE,
- "Protocol error: Message length is negative");
+ throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE, msg.c_str());
}
- if (msgLen == 0)
- return true;
-
- if (msg.Capacity() < msgLen + 4)
- msg.Reallocate(msgLen + 4);
+ return future;
+ }
- msg.Length(4 + msgLen);
+ void DataChannel::ProcessMessage(const network::DataBuffer& msg)
+ {
+ if (!handshakePerformed)
+ {
+ OnHandshakeResponse(msg);
+ return;
+ }
- res = ReceiveAll(msg.Data() + 4, msgLen, timeout);
+ interop::InteropInputStream inStream(msg.GetInputStream());
- if (res == OperationResult::TIMEOUT)
- return false;
+ inStream.Ignore(4);
- if (res == OperationResult::FAIL)
- throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE,
- "Connection failure: Can not receive message body");
+ int64_t rspId = inStream.ReadInt64();
+ int16_t flags = inStream.ReadInt16();
- return true;
- }
+ if (flags & Flag::NOTIFICATION)
+ {
+ common::concurrent::CsLockGuard lock(handlerMutex);
- DataChannel::OperationResult::T DataChannel::ReceiveAll(void* dst, size_t len, int32_t timeout)
- {
- size_t remain = len;
- int8_t* buffer = reinterpret_cast<int8_t*>(dst);
+ NotificationHandlerHolder& holder = handlerMap[rspId];
+ holder.ProcessNotification(msg);
- while (remain)
+ if (holder.IsProcessingComplete())
+ handlerMap.erase(rspId);
+ }
+ else
{
- size_t received = len - remain;
-
- int res = socket->Receive(buffer + received, remain, timeout);
+ common::concurrent::CsLockGuard lock(responseMutex);
- if (res < 0 || res == network::SocketClient::WaitResult::TIMEOUT)
+ ResponseMap::iterator it = responseMap.find(rspId);
+ if (it != responseMap.end())
{
- Close();
+ common::Promise<network::DataBuffer>& rsp = *it->second.Get();
- return res < 0 ? OperationResult::FAIL : OperationResult::TIMEOUT;
- }
+ rsp.SetValue(std::auto_ptr<network::DataBuffer>(new network::DataBuffer(msg.Clone())));
- remain -= static_cast<size_t>(res);
+ responseMap.erase(rspId);
+ }
}
-
- return OperationResult::SUCCESS;
}
- bool DataChannel::MakeRequestHandshake(const ProtocolVersion& propVer,
- ProtocolVersion& resVer, int32_t timeout)
+ void DataChannel::RegisterNotificationHandler(int64_t notId, const SP_NotificationHandler& handler)
{
- currentVersion = propVer;
+ common::concurrent::CsLockGuard lock(handlerMutex);
- resVer = ProtocolVersion();
- bool accepted = false;
+ NotificationHandlerHolder& holder = handlerMap[notId];
+ holder.SetHandler(handler);
- try
- {
- // Workaround for some Linux systems that report connection on non-blocking
- // sockets as successful but fail to establish real connection.
- accepted = Handshake(propVer, resVer, timeout);
- }
- catch (const IgniteError&)
- {
- return false;
- }
-
- if (!accepted)
- return false;
+ if (holder.IsProcessingComplete())
+ handlerMap.erase(notId);
+ }
- resVer = propVer;
+ bool DataChannel::DoHandshake(const ProtocolVersion& propVer)
+ {
+ currentVersion = propVer;
- return true;
+ return Handshake(propVer);
}
- bool DataChannel::Handshake(const ProtocolVersion& propVer, ProtocolVersion& resVer, int32_t timeout)
+ bool DataChannel::Handshake(const ProtocolVersion& propVer)
{
- // Allocating 4KB just in case.
- enum { BUFFER_SIZE = 1024 * 4 };
+ // Allocating 4 KB just in case.
+ enum {
+ BUFFER_SIZE = 1024 * 4
+ };
- common::concurrent::CsLockGuard lock(ioMutex);
-
- interop::InteropUnpooledMemory mem(BUFFER_SIZE);
- interop::InteropOutputStream outStream(&mem);
+ interop::SP_InteropMemory mem(new interop::InteropUnpooledMemory(BUFFER_SIZE));
+ interop::InteropOutputStream outStream(mem.Get());
binary::BinaryWriterImpl writer(&outStream, 0);
int32_t lenPos = outStream.Reserve(4);
@@ -297,10 +249,9 @@ namespace ignite
writer.WriteInt8(ClientType::THIN_CLIENT);
- if (propVer >= VERSION_1_7_0)
- {
+ if (propVer >= VERSION_1_7_0) {
// Use features for any new changes in protocol.
- int8_t features[] = { 0 };
+ int8_t features[] = {0};
writer.WriteInt8Array(features, 0);
}
@@ -310,19 +261,17 @@ namespace ignite
outStream.WriteInt32(lenPos, outStream.Position() - 4);
- bool success = Send(mem.Data(), outStream.Position(), timeout);
+ outStream.Synchronize();
- if (!success)
- return false;
-
- success = Receive(mem, timeout);
-
- if (!success)
- return false;
+ network::DataBuffer buffer(mem);
+ return asyncPool.Get()->Send(id, buffer);
+ }
- interop::InteropInputStream inStream(&mem);
+ void DataChannel::OnHandshakeResponse(const network::DataBuffer& msg)
+ {
+ interop::InteropInputStream inStream(msg.GetInputStream());
- inStream.Position(4);
+ inStream.Ignore(4);
binary::BinaryReaderImpl reader(&inStream);
@@ -334,17 +283,33 @@ namespace ignite
int16_t minor = reader.ReadInt16();
int16_t maintenance = reader.ReadInt16();
- resVer = ProtocolVersion(major, minor, maintenance);
+ ProtocolVersion resVer(major, minor, maintenance);
std::string error;
reader.ReadString(error);
- reader.ReadInt32();
+ int32_t errorCode = reader.ReadInt32();
+
+ bool shouldRetry = IsVersionSupported(resVer) && resVer != currentVersion;
+ if (shouldRetry)
+ shouldRetry = DoHandshake(resVer);
+
+ if (!shouldRetry)
+ {
+ std::stringstream ss;
+ ss << errorCode << ": " << error;
+ std::string newMsg = ss.str();
+
+ IgniteError err(IgniteError::IGNITE_ERR_GENERIC, newMsg.c_str());
+
+ if (!handshakePerformed)
+ stateHandler.OnHandshakeError(id, err);
+ }
- return false;
+ return;
}
- if (propVer >= VERSION_1_7_0)
+ if (currentVersion >= VERSION_1_7_0)
{
int32_t len = reader.ReadInt8Array(0, 0);
std::vector<int8_t> features;
@@ -356,74 +321,39 @@ namespace ignite
}
}
- if (propVer >= VERSION_1_4_0)
+ if (currentVersion >= VERSION_1_4_0)
{
Guid nodeGuid = reader.ReadGuid();
node.SetGuid(nodeGuid);
}
- return true;
+ handshakePerformed = true;
+ stateHandler.OnHandshakeSuccess(id);
}
- bool DataChannel::EnsureConnected(int32_t timeout)
- {
- if (socket.get() != 0)
- return true;
-
- return TryRestoreConnection(timeout);
- }
-
- bool DataChannel::NegotiateProtocolVersion(int32_t timeout)
+ bool DataChannel::IsVersionSupported(const ProtocolVersion& ver)
{
- ProtocolVersion resVer;
- ProtocolVersion propVer = VERSION_DEFAULT;
-
- bool success = MakeRequestHandshake(propVer, resVer, timeout);
-
- while (!success)
- {
- if (resVer == propVer || !IsVersionSupported(resVer))
- return false;
-
- propVer = resVer;
-
- success = MakeRequestHandshake(propVer, resVer, timeout);
- }
-
- return true;
+ return supportedVersions.find(ver) != supportedVersions.end();
}
- bool DataChannel::TryRestoreConnection(int32_t timeout)
+ void DataChannel::FailPendingRequests(const IgniteError* err)
{
- bool connected = false;
-
- const network::EndPoint& endPoint = node.GetEndPoint();
-
- connected = socket->Connect(endPoint.host.c_str(), endPoint.port, timeout);
+ IgniteError defaultErr(IgniteError::IGNITE_ERR_NETWORK_FAILURE, "Connection was closed");
+ if (!err)
+ err = &defaultErr;
- if (!connected)
{
- Close();
-
- return false;
- }
-
- connected = NegotiateProtocolVersion(timeout);
+ common::concurrent::CsLockGuard lock(responseMutex);
- if (!connected)
- {
- Close();
+ for (ResponseMap::iterator it = responseMap.begin(); it != responseMap.end(); ++it)
+ it->second.Get()->SetError(*err);
- return false;
+ responseMap.clear();
}
- return true;
- }
-
- bool DataChannel::IsVersionSupported(const ProtocolVersion& ver)
- {
- return supportedVersions.find(ver) != supportedVersions.end();
+ if (!handshakePerformed)
+ stateHandler.OnHandshakeError(id, *err);
}
}
}
diff --git a/modules/platforms/cpp/thin-client/src/impl/data_channel.h b/modules/platforms/cpp/thin-client/src/impl/data_channel.h
index 38f0180..aef02d0 100644
--- a/modules/platforms/cpp/thin-client/src/impl/data_channel.h
+++ b/modules/platforms/cpp/thin-client/src/impl/data_channel.h
@@ -22,10 +22,12 @@
#include <memory>
+#include <ignite/future.h>
#include <ignite/thin/ignite_client_configuration.h>
#include <ignite/common/concurrent.h>
#include <ignite/network/socket_client.h>
+#include <ignite/network/async_client_pool.h>
#include <ignite/impl/interop/interop_output_stream.h>
#include <ignite/impl/binary/binary_writer_impl.h>
@@ -33,6 +35,8 @@
#include "impl/protocol_version.h"
#include "impl/ignite_node.h"
#include "impl/response_status.h"
+#include "impl/channel_state_handler.h"
+#include "impl/notification_handler.h"
namespace ignite
{
@@ -46,6 +50,12 @@ namespace ignite
namespace thin
{
+ // Forward declaration.
+ class Request;
+
+ // Forward declaration.
+ class Response;
+
/**
* Data router.
*
@@ -58,12 +68,21 @@ namespace ignite
/** Version set type. */
typedef std::set<ProtocolVersion> VersionSet;
+ /** Shared pointer to DataBuffer Promise. */
+ typedef common::concurrent::SharedPointer<common::Promise<network::DataBuffer> > SP_PromiseDataBuffer;
+
+ /** Response map. */
+ typedef std::map< int64_t, SP_PromiseDataBuffer> ResponseMap;
+
+ /** Notification handler map. */
+ typedef std::map< int64_t, NotificationHandlerHolder > NotificationHandlerMap;
+
/** Version 1.2.0. */
static const ProtocolVersion VERSION_1_2_0;
/** Version 1.3.0. */
static const ProtocolVersion VERSION_1_3_0;
-
+
/** Version 1.4.0. Added: Partition awareness support, IEP-23. */
static const ProtocolVersion VERSION_1_4_0;
@@ -73,32 +92,28 @@ namespace ignite
/** Version 1.6.0. Expiration Policy Configuration. */
static const ProtocolVersion VERSION_1_6_0;
- /** Version 1.6.0. Features introduced. */
+ /** Version 1.7.0. Features introduced. */
static const ProtocolVersion VERSION_1_7_0;
/** Current version. */
static const ProtocolVersion VERSION_DEFAULT;
/**
- * Operation with timeout result.
- */
- struct OperationResult
- {
- enum T
- {
- SUCCESS,
- FAIL,
- TIMEOUT
- };
- };
-
- /**
* Constructor.
*
+ * @param id Connection ID.
+ * @param addr Address.
+ * @param asyncPool Async pool for connection.
* @param cfg Configuration.
* @param typeMgr Type manager.
+ * @param stateHandler State handler.
*/
- DataChannel(const ignite::thin::IgniteClientConfiguration& cfg, binary::BinaryTypeManager& typeMgr);
+ DataChannel(uint64_t id,
+ const network::EndPoint& addr,
+ const ignite::network::SP_AsyncClientPool& asyncPool,
+ const ignite::thin::IgniteClientConfiguration& cfg,
+ binary::BinaryTypeManager& typeMgr,
+ ChannelStateHandler& stateHandler);
/**
* Destructor.
@@ -106,14 +121,11 @@ namespace ignite
~DataChannel();
/**
- * Establish connection to cluster.
+ * Perform handshake.
*
- * @param host Host.
- * @param port Port.
- * @param timeout Timeout.
* @return @c true on success.
*/
- bool Connect(const std::string& host, uint16_t port, int32_t timeout);
+ void StartHandshake();
/**
* Close connection.
@@ -121,122 +133,28 @@ namespace ignite
void Close();
/**
- * Synchronously send request message and receive response.
- * Uses provided timeout. Does not try to restore connection on
- * fail.
+ * Synchronously send request message and receive response. Uses provided timeout.
*
* @param req Request message.
* @param rsp Response message.
* @param timeout Timeout.
* @throw IgniteError on error.
*/
- template<typename ReqT, typename RspT>
- void SyncMessage(const ReqT& req, RspT& rsp, int32_t timeout)
- {
- // Allocating 64KB to lessen number of re-allocations.
- enum { BUFFER_SIZE = 1024 * 64 };
-
- interop::InteropUnpooledMemory mem(BUFFER_SIZE);
-
- int64_t id = GenerateRequestMessage(req, mem);
-
- InternalSyncMessage(mem, timeout);
-
- interop::InteropInputStream inStream(&mem);
-
- inStream.Position(4);
-
- int64_t rspId = inStream.ReadInt64();
-
- if (id != rspId)
- throw IgniteError(IgniteError::IGNITE_ERR_GENERIC,
- "Protocol error: Response message ID does not equal Request ID");
-
- binary::BinaryReaderImpl reader(&inStream);
-
- rsp.Read(reader, currentVersion);
- }
+ void SyncMessage(Request& req, Response& rsp, int32_t timeout);
/**
- * Synchronously send request message, receive response and get a notification.
+ * Process received message.
*
- * @param req Request message.
- * @param notification Notification message.
- * @param timeout Timeout.
- * @return Channel that was used for request.
- * @throw IgniteError on error.
+ * @param msg Message.
*/
- template<typename ReqT, typename NotT>
- void SyncMessageWithNotification(const ReqT& req, NotT& notification, int32_t timeout)
- {
- // Allocating 64KB to lessen number of re-allocations.
- enum { BUFFER_SIZE = 1024 * 64 };
-
- interop::InteropUnpooledMemory mem(BUFFER_SIZE);
-
- int64_t id = GenerateRequestMessage(req, mem);
-
- common::concurrent::CsLockGuard lock(ioMutex);
-
- InternalSyncMessageUnguarded(mem, timeout);
-
- interop::InteropInputStream inStream(&mem);
-
- inStream.Position(4);
-
- int64_t rspId = inStream.ReadInt64();
-
- if (id != rspId)
- throw IgniteError(IgniteError::IGNITE_ERR_GENERIC,
- "Protocol error: Response message ID does not equal Request ID");
-
- binary::BinaryReaderImpl reader(&inStream);
-
- typedef typename NotT::ResponseType RspT;
- RspT rsp;
-
- rsp.Read(reader, currentVersion);
-
- if (rsp.GetStatus() != ResponseStatus::SUCCESS)
- throw IgniteError(IgniteError::IGNITE_ERR_COMPUTE_EXECUTION_REJECTED, rsp.GetError().c_str());
-
- bool success = Receive(mem, 0);
-
- if (!success)
- throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE,
- "Can not receive message response from the remote host: timeout");
-
- inStream.Position(4);
- inStream.Synchronize();
-
- int64_t notificationId = inStream.ReadInt64();
-
- if (notificationId != rsp.GetNotificationId())
- {
- IGNITE_ERROR_FORMATTED_2(IgniteError::IGNITE_ERR_GENERIC, "Unexpected notification ID",
- "expected", rsp.GetNotificationId(), "actual", notificationId)
- }
-
- notification.Read(reader, currentVersion);
- }
-
- /**
- * Send message stored in memory and synchronously receives
- * response and stores it in the same memory.
- *
- * @param mem Memory.
- * @param timeout Operation timeout.
- */
- void InternalSyncMessage(interop::InteropUnpooledMemory& mem, int32_t timeout);
+ void ProcessMessage(const network::DataBuffer& msg);
/**
- * Send message stored in memory and synchronously receives
- * response and stores it in the same memory.
- *
- * @param mem Memory.
- * @param timeout Operation timeout.
+ * Register handler for the notification.
+ * @param notId Notification ID.
+ * @param handler Handler.
*/
- void InternalSyncMessageUnguarded(interop::InteropUnpooledMemory& mem, int32_t timeout);
+ void RegisterNotificationHandler(int64_t notId, const SP_NotificationHandler& handler);
/**
* Get remote node.
@@ -248,15 +166,40 @@ namespace ignite
}
/**
- * Check if the connection established.
- *
- * @return @true if connected.
+ * Get connection ID.
+ * @return Connection ID.
*/
- bool IsConnected() const
+ uint64_t GetId() const
{
- return socket.get() != 0;
+ return id;
}
+ /**
+ * Deserialize message received by this channel.
+ * @tparam T Message type.
+ * @param data Data.
+ * @param msg Message.
+ */
+ template<typename T>
+ void DeserializeMessage(const network::DataBuffer& data, T& msg)
+ {
+ interop::InteropInputStream inStream(data.GetInputStream());
+
+ // Skipping size (4 bytes) and reqId (8 bytes)
+ inStream.Ignore(12);
+
+ binary::BinaryReaderImpl reader(&inStream);
+
+ msg.Read(reader, currentVersion);
+ }
+
+ /**
+ * Fail all pending requests.
+ *
+ * @param err Error.
+ */
+ void FailPendingRequests(const IgniteError* err);
+
private:
IGNITE_NO_COPY_ASSIGNMENT(DataChannel);
@@ -279,115 +222,40 @@ namespace ignite
* @param mem Memory to write request to.
* @return Message ID.
*/
- template<typename ReqT>
- int64_t GenerateRequestMessage(const ReqT& req, interop::InteropUnpooledMemory& mem)
- {
- interop::InteropOutputStream outStream(&mem);
- binary::BinaryWriterImpl writer(&outStream, &typeMgr);
-
- // Space for RequestSize + OperationCode + RequestID.
- outStream.Reserve(4 + 2 + 8);
-
- req.Write(writer, currentVersion);
-
- int64_t id = GenerateRequestId();
-
- outStream.WriteInt32(0, outStream.Position() - 4);
- outStream.WriteInt16(4, ReqT::GetOperationCode());
- outStream.WriteInt64(6, id);
-
- outStream.Synchronize();
-
- return id;
- }
-
- /**
- * Send data by established connection.
- *
- * @param data Data buffer.
- * @param len Data length.
- * @param timeout Timeout.
- * @return @c true on success, @c false on timeout.
- * @throw IgniteError on error.
- */
- bool Send(const int8_t* data, size_t len, int32_t timeout);
+ int64_t GenerateRequestMessage(Request& req, interop::InteropMemory& mem);
/**
- * Receive next message.
+ * Asynchronously send request message and get a future for the response.
*
- * @param msg Buffer for message.
- * @param timeout Timeout.
- * @return @c true on success, @c false on timeout.
+ * @param req Request message.
* @throw IgniteError on error.
*/
- bool Receive(interop::InteropMemory& msg, int32_t timeout);
-
- /**
- * Receive specified number of bytes.
- *
- * @param dst Buffer for data.
- * @param len Number of bytes to receive.
- * @param timeout Timeout.
- * @return Operation result.
- */
- OperationResult::T ReceiveAll(void* dst, size_t len, int32_t timeout);
-
- /**
- * Send specified number of bytes.
- *
- * @param data Data buffer.
- * @param len Data length.
- * @param timeout Timeout.
- * @return Operation result.
- */
- OperationResult::T SendAll(const int8_t* data, size_t len, int32_t timeout);
+ Future<network::DataBuffer> AsyncMessage(Request &req);
/**
* Perform handshake request.
*
* @param propVer Proposed protocol version.
- * @param resVer Resulted version.
- * @param timeout Timeout.
* @return @c true on success and @c false otherwise.
*/
- bool MakeRequestHandshake(const ProtocolVersion& propVer, ProtocolVersion& resVer, int32_t timeout);
+ bool DoHandshake(const ProtocolVersion& propVer);
/**
- * Synchronously send handshake request message and receive
- * handshake response. Uses provided timeout. Does not try to
- * restore connection on fail.
+ * Synchronously send handshake request message and receive handshake response. Uses provided timeout.
+ * Does not try to restore connection on fail.
*
* @param propVer Proposed protocol version.
- * @param resVer Resulted version.
- * @param timeout Timeout.
* @return @c true if accepted.
* @throw IgniteError on error.
*/
- bool Handshake(const ProtocolVersion& propVer, ProtocolVersion& resVer, int32_t timeout);
+ bool Handshake(const ProtocolVersion& propVer);
/**
- * Ensure there is a connection to the cluster.
+ * Handle handshake response.
*
- * @param timeout Timeout.
- * @return @c false on error.
+ * @param msg Message.
*/
- bool EnsureConnected(int32_t timeout);
-
- /**
- * Negotiate protocol version with current host
- *
- * @param timeout Timeout.
- * @return @c true on success and @c false otherwise.
- */
- bool NegotiateProtocolVersion(int32_t timeout);
-
- /**
- * Try to restore connection to the cluster.
- *
- * @param timeout Timeout.
- * @return @c true on success and @c false otherwise.
- */
- bool TryRestoreConnection(int32_t timeout);
+ void OnHandshakeResponse(const network::DataBuffer& msg);
/**
* Check if the version is supported.
@@ -400,8 +268,17 @@ namespace ignite
/** Set of supported versions. */
const static VersionSet supportedVersions;
- /** Sync IO mutex. */
- common::concurrent::CriticalSection ioMutex;
+ /** State handler. */
+ ChannelStateHandler& stateHandler;
+
+ /** Indicates whether handshake has been performed. */
+ bool handshakePerformed;
+
+ /** Connection ID */
+ uint64_t id;
+
+ /** Async pool. */
+ ignite::network::SP_AsyncClientPool asyncPool;
/** Remote node data. */
IgniteNode node;
@@ -418,8 +295,17 @@ namespace ignite
/** Request ID counter. */
int64_t reqIdCounter;
- /** Client Socket. */
- std::auto_ptr<network::SocketClient> socket;
+ /** Response map mutex. */
+ common::concurrent::CriticalSection responseMutex;
+
+ /** Responses. */
+ ResponseMap responseMap;
+
+ /** Notification handlers mutex. */
+ common::concurrent::CriticalSection handlerMutex;
+
+ /** Notification handlers. */
+ NotificationHandlerMap handlerMap;
};
/** Shared pointer type. */
diff --git a/modules/platforms/cpp/thin-client/src/impl/data_router.cpp b/modules/platforms/cpp/thin-client/src/impl/data_router.cpp
index 0943afa..ea1d36b 100644
--- a/modules/platforms/cpp/thin-client/src/impl/data_router.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/data_router.cpp
@@ -19,11 +19,14 @@
#include <cstddef>
#include <cstdlib>
-#include <sstream>
#include <iterator>
#include <algorithm>
+#include <ignite/network/codec_data_filter.h>
+#include <ignite/network/length_prefix_codec.h>
+#include <ignite/network/network.h>
#include <ignite/network/utils.h>
+#include <ignite/network/ssl/secure_data_filter.h>
#include "impl/utility.h"
#include "impl/data_router.h"
@@ -38,8 +41,6 @@ namespace ignite
namespace thin
{
DataRouter::DataRouter(const ignite::thin::IgniteClientConfiguration& cfg) :
- ioTimeout(DEFAULT_IO_TIMEOUT),
- connectionTimeout(DEFAULT_CONNECT_TIMEOUT),
config(cfg)
{
srand(common::GetRandSeed());
@@ -53,7 +54,7 @@ namespace ignite
DataRouter::~DataRouter()
{
- // No-op.
+ Close();
}
void DataRouter::Connect()
@@ -63,80 +64,213 @@ namespace ignite
if (ranges.empty())
throw IgniteError(IgniteError::IGNITE_ERR_ILLEGAL_ARGUMENT, "No valid address to connect.");
- ChannelsVector newLegacyChannels;
- newLegacyChannels.reserve(ranges.size());
-
- for (std::vector<network::TcpRange>::iterator it = ranges.begin(); it != ranges.end(); ++it)
+ if (!asyncPool.IsValid())
{
- network::TcpRange& range = *it;
+ std::vector<network::SP_DataFilter> filters;
- for (uint16_t port = range.port; port <= range.port + range.range; ++port)
+ if (config.GetSslMode() == SslMode::REQUIRE)
{
- SP_DataChannel channel(new DataChannel(config, typeMgr));
-
- bool connected = false;
-
- try
- {
- connected = channel.Get()->Connect(range.host, port, connectionTimeout);
- }
- catch (const IgniteError&)
- {
- // No-op.
- }
-
- if (connected)
- {
- const IgniteNode& newNode = channel.Get()->GetNode();
-
- if (newNode.IsLegacy())
- {
- newLegacyChannels.push_back(channel);
- }
- else
- {
- common::concurrent::CsLockGuard lock(channelsMutex);
-
- // Insertion takes place if no channel with the GUID is already present.
- std::pair<ChannelsGuidMap::iterator, bool> res =
- channels.insert(std::make_pair(newNode.GetGuid(), channel));
-
- bool inserted = res.second;
- SP_DataChannel& oldChannel = res.first->second;
-
- if (!inserted && !oldChannel.Get()->IsConnected())
- oldChannel.Swap(channel);
- }
-
- break;
- }
+ network::ssl::EnsureSslLoaded();
+
+ network::ssl::SecureConfiguration sslCfg;
+ sslCfg.caPath = config.GetSslCaFile();
+ sslCfg.keyPath = config.GetSslKeyFile();
+ sslCfg.certPath = config.GetSslCertFile();
+
+ network::ssl::SP_SecureDataFilter secureFilter(new network::ssl::SecureDataFilter(sslCfg));
+ filters.push_back(secureFilter);
}
- if (config.GetConnectionsLimit())
- {
- common::concurrent::CsLockGuard lock(channelsMutex);
+ network::SP_CodecFactory codecFactory(new network::LengthPrefixCodecFactory());
+ network::SP_CodecDataFilter codecFilter(new network::CodecDataFilter(codecFactory));
+ filters.push_back(codecFilter);
- size_t connectionsNum = newLegacyChannels.size() + channels.size();
+ asyncPool = network::MakeAsyncClientPool(filters);
- if (connectionsNum >= config.GetConnectionsLimit())
- break;
- }
+ if (!asyncPool.IsValid())
+ throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, "Can not create async connection pool");
+
+ asyncPool.Get()->SetHandler(this);
}
- common::concurrent::CsLockGuard lock(channelsMutex);
+ asyncPool.Get()->Start(ranges, config.GetConnectionsLimit());
- legacyChannels.swap(newLegacyChannels);
+ bool connected = EnsureConnected(config.GetConnectionTimeout());
- if (channels.empty() && legacyChannels.empty())
- throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, "Failed to establish connection with any host.");
+ if (!connected)
+ throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE,
+ "Failed to establish connection with any host.");
}
void DataRouter::Close()
{
+ if (asyncPool.IsValid())
+ {
+ asyncPool.Get()->SetHandler(0);
+ asyncPool.Get()->Stop();
+ }
+ }
+
+ bool DataRouter::EnsureConnected(int32_t timeout)
+ {
+ common::concurrent::CsLockGuard lock(channelsMutex);
+
+ if (!connectedChannels.empty())
+ return true;
+
+ CheckHandshakeErrorLocked();
+
+ channelsWaitPoint.WaitFor(channelsMutex, timeout);
+
+ CheckHandshakeErrorLocked();
+
+ return !connectedChannels.empty();
+ }
+
+ void DataRouter::CheckHandshakeErrorLocked()
+ {
+ if (!lastHandshakeError.get())
+ return;
+
+ IgniteError err = *lastHandshakeError;
+ lastHandshakeError.reset();
+
+ throw IgniteError(err);
+ }
+
+ void DataRouter::OnConnectionSuccess(const network::EndPoint& addr, uint64_t id)
+ {
+ SP_DataChannel channel(new DataChannel(id, addr, asyncPool, config, typeMgr, *this));
+
+ {
+ common::concurrent::CsLockGuard lock(channelsMutex);
+
+ channels[id] = channel;
+ }
+
+ channel.Get()->StartHandshake();
+ }
+
+ void DataRouter::OnConnectionError(const network::EndPoint& addr, const IgniteError& err)
+ {
+ IGNITE_UNUSED(addr);
+
+ if (!connectedChannels.empty())
+ return;
+
+ if (err.GetCode() != IgniteError::IGNITE_ERR_SECURE_CONNECTION_FAILURE)
+ return;
+
+ common::concurrent::CsLockGuard lock(channelsMutex);
+
+ lastHandshakeError.reset(new IgniteError(err));
+ channelsWaitPoint.NotifyAll();
+ }
+
+ void DataRouter::OnConnectionClosed(uint64_t id, const IgniteError* err)
+ {
+ SP_DataChannel channel;
+ {
+ common::concurrent::CsLockGuard lock(channelsMutex);
+
+ connectedChannels.erase(id);
+
+ ChannelsIdMap::iterator it = channels.find(id);
+ if (it == channels.end())
+ return;
+
+ channel = it->second;
+ InvalidateChannelLocked(channel);
+ }
+
+ channel.Get()->FailPendingRequests(err);
+ }
+
+ void DataRouter::OnMessageReceived(uint64_t id, const network::DataBuffer& msg)
+ {
+ SP_DataChannel channel;
+ {
+ common::concurrent::CsLockGuard lock(channelsMutex);
+
+ ChannelsIdMap::iterator it = channels.find(id);
+ if (it != channels.end())
+ channel = it->second;
+ }
+
+ if (channel.IsValid())
+ channel.Get()->ProcessMessage(msg);
+ }
+
+ void DataRouter::OnMessageSent(uint64_t id)
+ {
+ IGNITE_UNUSED(id);
+ // No-op.
+ }
+
+ void DataRouter::OnHandshakeSuccess(uint64_t id)
+ {
+ common::concurrent::CsLockGuard lock(channelsMutex);
+
+ connectedChannels.insert(id);
+ channelsWaitPoint.NotifyAll();
+
+ SP_DataChannel channel;
+
+ ChannelsIdMap::iterator it = channels.find(id);
+ if (it != channels.end())
+ channel = it->second;
+
+ if (channel.IsValid())
+ {
+ const IgniteNode& node = channel.Get()->GetNode();
+ if (!node.IsLegacy())
+ partChannels[node.GetGuid()] = channel;
+ }
+ }
+
... 1045 lines suppressed ...