You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by is...@apache.org on 2017/11/20 17:22:50 UTC
ignite git commit: IGNITE-6876: Added in ODBC support for
SQL_ATTR_CONNECTION_TIMEOUT
Repository: ignite
Updated Branches:
refs/heads/master 5fb04be39 -> db343b649
IGNITE-6876: Added in ODBC support for SQL_ATTR_CONNECTION_TIMEOUT
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/db343b64
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/db343b64
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/db343b64
Branch: refs/heads/master
Commit: db343b649e4289ac28b769a741eee7ea77db8018
Parents: 5fb04be
Author: Igor Sapego <is...@gridgain.com>
Authored: Mon Nov 20 20:15:37 2017 +0300
Committer: Igor Sapego <is...@gridgain.com>
Committed: Mon Nov 20 20:15:37 2017 +0300
----------------------------------------------------------------------
.../cpp/odbc-test/src/attributes_test.cpp | 22 +++
.../cpp/odbc-test/src/queries_test.cpp | 113 +++++++++++++++
.../cpp/odbc/include/ignite/odbc/common_types.h | 3 +
.../cpp/odbc/include/ignite/odbc/connection.h | 81 +++++++++--
.../include/ignite/odbc/system/socket_client.h | 55 +++++++-
.../odbc/os/linux/src/system/socket_client.cpp | 141 +++++++++++++++++--
.../odbc/os/win/src/system/socket_client.cpp | 134 ++++++++++++++++--
modules/platforms/cpp/odbc/src/connection.cpp | 141 ++++++++++++++-----
.../odbc/src/diagnostic/diagnostic_record.cpp | 6 +
.../cpp/odbc/src/query/batch_query.cpp | 12 +-
.../platforms/cpp/odbc/src/query/data_query.cpp | 12 +-
11 files changed, 644 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc-test/src/attributes_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc-test/src/attributes_test.cpp b/modules/platforms/cpp/odbc-test/src/attributes_test.cpp
index b87f4b9..c4c2433 100644
--- a/modules/platforms/cpp/odbc-test/src/attributes_test.cpp
+++ b/modules/platforms/cpp/odbc-test/src/attributes_test.cpp
@@ -227,4 +227,26 @@ BOOST_AUTO_TEST_CASE(StatementAttributeQueryTimeout)
BOOST_REQUIRE_EQUAL(timeout, 7);
}
+BOOST_AUTO_TEST_CASE(ConnectionAttributeConnectionTimeout)
+{
+ Connect("DRIVER={Apache Ignite};address=127.0.0.1:11110;schema=cache");
+
+ SQLUINTEGER timeout = -1;
+ SQLRETURN ret = SQLGetConnectAttr(dbc, SQL_ATTR_CONNECTION_TIMEOUT, &timeout, 0, 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, dbc);
+ BOOST_REQUIRE_EQUAL(timeout, 0);
+
+ ret = SQLSetConnectAttr(dbc, SQL_ATTR_CONNECTION_TIMEOUT, reinterpret_cast<SQLPOINTER>(42), 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, dbc);
+
+ timeout = -1;
+
+ ret = SQLGetConnectAttr(dbc, SQL_ATTR_CONNECTION_TIMEOUT, &timeout, 0, 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, dbc);
+ BOOST_REQUIRE_EQUAL(timeout, 42);
+}
+
BOOST_AUTO_TEST_SUITE_END()
http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc-test/src/queries_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc-test/src/queries_test.cpp b/modules/platforms/cpp/odbc-test/src/queries_test.cpp
index 6fcf7c9..dafab1a 100644
--- a/modules/platforms/cpp/odbc-test/src/queries_test.cpp
+++ b/modules/platforms/cpp/odbc-test/src/queries_test.cpp
@@ -2404,5 +2404,118 @@ BOOST_AUTO_TEST_CASE(TestCloseAfterEmptyUpdate)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
}
+BOOST_AUTO_TEST_CASE(TestConnectionTimeoutQuery)
+{
+ Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache");
+
+ SQLRETURN ret = SQLSetConnectAttr(dbc, SQL_ATTR_CONNECTION_TIMEOUT, reinterpret_cast<SQLPOINTER>(5), 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, dbc);
+
+ InsertTestStrings(10, false);
+}
+
+BOOST_AUTO_TEST_CASE(TestConnectionTimeoutBatch)
+{
+ Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache");
+
+ SQLRETURN ret = SQLSetConnectAttr(dbc, SQL_ATTR_CONNECTION_TIMEOUT, reinterpret_cast<SQLPOINTER>(5), 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, dbc);
+
+ InsertTestBatch(11, 20, 9);
+}
+
+BOOST_AUTO_TEST_CASE(TestConnectionTimeoutBoth)
+{
+ Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache");
+
+ SQLRETURN ret = SQLSetConnectAttr(dbc, SQL_ATTR_CONNECTION_TIMEOUT, reinterpret_cast<SQLPOINTER>(5), 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, dbc);
+
+ InsertTestStrings(10, false);
+ InsertTestBatch(11, 20, 9);
+}
+
+BOOST_AUTO_TEST_CASE(TestQueryTimeoutQuery)
+{
+ Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache");
+
+ SQLRETURN ret = SQLSetStmtAttr(stmt, SQL_ATTR_QUERY_TIMEOUT, reinterpret_cast<SQLPOINTER>(5), 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_STMT, stmt);
+
+ InsertTestStrings(10, false);
+}
+
+BOOST_AUTO_TEST_CASE(TestQueryTimeoutBatch)
+{
+ Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache");
+
+ SQLRETURN ret = SQLSetStmtAttr(stmt, SQL_ATTR_QUERY_TIMEOUT, reinterpret_cast<SQLPOINTER>(5), 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_STMT, stmt);
+
+ InsertTestBatch(11, 20, 9);
+}
+
+BOOST_AUTO_TEST_CASE(TestQueryTimeoutBoth)
+{
+ Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache");
+
+ SQLRETURN ret = SQLSetStmtAttr(stmt, SQL_ATTR_QUERY_TIMEOUT, reinterpret_cast<SQLPOINTER>(5), 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_STMT, stmt);
+
+ InsertTestStrings(10, false);
+ InsertTestBatch(11, 20, 9);
+}
+
+BOOST_AUTO_TEST_CASE(TestQueryAndConnectionTimeoutQuery)
+{
+ Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache");
+
+ SQLRETURN ret = SQLSetStmtAttr(stmt, SQL_ATTR_QUERY_TIMEOUT, reinterpret_cast<SQLPOINTER>(5), 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_STMT, stmt);
+
+ ret = SQLSetConnectAttr(dbc, SQL_ATTR_CONNECTION_TIMEOUT, reinterpret_cast<SQLPOINTER>(3), 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, dbc);
+
+ InsertTestStrings(10, false);
+}
+
+BOOST_AUTO_TEST_CASE(TestQueryAndConnectionTimeoutBatch)
+{
+ Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache");
+
+ SQLRETURN ret = SQLSetStmtAttr(stmt, SQL_ATTR_QUERY_TIMEOUT, reinterpret_cast<SQLPOINTER>(5), 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_STMT, stmt);
+
+ ret = SQLSetConnectAttr(dbc, SQL_ATTR_CONNECTION_TIMEOUT, reinterpret_cast<SQLPOINTER>(3), 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, dbc);
+
+ InsertTestBatch(11, 20, 9);
+}
+
+BOOST_AUTO_TEST_CASE(TestQueryAndConnectionTimeoutBoth)
+{
+ Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache");
+
+ SQLRETURN ret = SQLSetStmtAttr(stmt, SQL_ATTR_QUERY_TIMEOUT, reinterpret_cast<SQLPOINTER>(5), 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_STMT, stmt);
+
+ ret = SQLSetConnectAttr(dbc, SQL_ATTR_CONNECTION_TIMEOUT, reinterpret_cast<SQLPOINTER>(3), 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, dbc);
+
+ InsertTestStrings(10, false);
+ InsertTestBatch(11, 20, 9);
+}
BOOST_AUTO_TEST_SUITE_END()
http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc/include/ignite/odbc/common_types.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/common_types.h b/modules/platforms/cpp/odbc/include/ignite/odbc/common_types.h
index 349147f..9c8c433 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/common_types.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/common_types.h
@@ -190,6 +190,9 @@ namespace ignite
*/
SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED,
+ /** The timeout period expired before the data source responded to the request. */
+ SHYT00_TIMEOUT_EXPIRED,
+
/**
* The connection timeout period expired before the data source
* responded to the request.
http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h b/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h
index 34fed5f..1577ee7 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h
@@ -27,6 +27,7 @@
#include "ignite/odbc/config/connection_info.h"
#include "ignite/odbc/config/configuration.h"
#include "ignite/odbc/diagnostic/diagnosable_adapter.h"
+#include "ignite/odbc/odbc_error.h"
namespace ignite
{
@@ -42,6 +43,19 @@ namespace ignite
friend class Environment;
public:
/**
+ * Operation with timeout result.
+ */
+ struct OperationResult
+ {
+ enum T
+ {
+ SUCCESS,
+ FAIL,
+ TIMEOUT
+ };
+ };
+
+ /**
* Destructor.
*/
~Connection();
@@ -96,15 +110,21 @@ namespace ignite
*
* @param data Data buffer.
* @param len Data length.
+ * @param timeout Timeout.
+ * @return @c true on success, @c false on timeout.
+ * @throw OdbcError on error.
*/
- void Send(const int8_t* data, size_t len);
+ bool Send(const int8_t* data, size_t len, int32_t timeout);
/**
* Receive next message.
*
* @param msg Buffer for message.
+ * @param timeout Timeout.
+ * @return @c true on success, @c false on timeout.
+ * @throw OdbcError on error.
*/
- void Receive(std::vector<int8_t>& msg);
+ bool Receive(std::vector<int8_t>& msg, int32_t timeout);
/**
* Get name of the assotiated schema.
@@ -134,9 +154,43 @@ namespace ignite
/**
* Synchronously send request message and receive response.
+ * Uses provided timeout.
*
* @param req Request message.
* @param rsp Response message.
+ * @param timeout Timeout.
+ * @return @c true on success, @c false on timeout.
+ * @throw OdbcError on error.
+ */
+ template<typename ReqT, typename RspT>
+ bool SyncMessage(const ReqT& req, RspT& rsp, int32_t timeout)
+ {
+ std::vector<int8_t> tempBuffer;
+
+ parser.Encode(req, tempBuffer);
+
+ bool success = Send(tempBuffer.data(), tempBuffer.size(), timeout);
+
+ if (!success)
+ return false;
+
+ success = Receive(tempBuffer, timeout);
+
+ if (!success)
+ return false;
+
+ parser.Decode(rsp, tempBuffer);
+
+ return true;
+ }
+
+ /**
+ * Synchronously send request message and receive response.
+ * Uses connection timeout.
+ *
+ * @param req Request message.
+ * @param rsp Response message.
+ * @throw OdbcError on error.
*/
template<typename ReqT, typename RspT>
void SyncMessage(const ReqT& req, RspT& rsp)
@@ -145,9 +199,15 @@ namespace ignite
parser.Encode(req, tempBuffer);
- Send(tempBuffer.data(), tempBuffer.size());
+ bool success = Send(tempBuffer.data(), tempBuffer.size(), timeout);
+
+ if (!success)
+ throw OdbcError(SqlState::SHYT01_CONNECTION_TIMEOUT, "Send operation timed out");
+
+ success = Receive(tempBuffer, timeout);
- Receive(tempBuffer);
+ if (!success)
+ throw OdbcError(SqlState::SHYT01_CONNECTION_TIMEOUT, "Receive operation timed out");
parser.Decode(rsp, tempBuffer);
}
@@ -280,18 +340,20 @@ namespace ignite
*
* @param dst Buffer for data.
* @param len Number of bytes to receive.
- * @return Number of successfully received bytes.
+ * @param timeout Timeout.
+ * @return Operation result.
*/
- size_t ReceiveAll(void* dst, size_t len);
+ OperationResult::T ReceiveAll(void* dst, size_t len, int32_t timeout);
/**
* Send specified number of bytes.
*
* @param data Data buffer.
* @param len Data length.
- * @return Number of successfully sent bytes.
+ * @param timeout Timeout.
+ * @return Operation result.
*/
- size_t SendAll(const int8_t* data, size_t len);
+ OperationResult::T SendAll(const int8_t* data, size_t len, int32_t timeout);
/**
* Perform handshake request.
@@ -311,6 +373,9 @@ namespace ignite
/** State flag. */
bool connected;
+ /** Connection timeout in seconds. */
+ int32_t timeout;
+
/** Message parser. */
Parser parser;
http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc/include/ignite/odbc/system/socket_client.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/system/socket_client.h b/modules/platforms/cpp/odbc/include/ignite/odbc/system/socket_client.h
index 946605e..2a3cfa3 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/system/socket_client.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/system/socket_client.h
@@ -44,6 +44,22 @@ namespace ignite
/** The time in seconds between individual keepalive probes. */
enum { KEEP_ALIVE_PROBES_PERIOD = 1 };
+ /** Connection establishment timeout in seconds. */
+ enum { CONNECT_TIMEOUT = 5 };
+
+ /**
+ * Non-negative timeout operation result.
+ */
+ struct WaitResult
+ {
+ enum T
+ {
+ TIMEOUT = 0,
+
+ SUCCESS = 1
+ };
+ };
+
/**
* Constructor.
*/
@@ -76,20 +92,31 @@ namespace ignite
*
* @param data Pointer to data to be sent.
* @param size Size of the data in bytes.
- * @return Number of bytes that have been sent on success and negative
- * value on failure.
+ * @param timeout Timeout.
+ * @return Number of bytes that have been sent on success,
+ * WaitResult::TIMEOUT on timeout and -errno on failure.
*/
- int Send(const int8_t* data, size_t size);
+ int Send(const int8_t* data, size_t size, int32_t timeout);
/**
* Receive data from established connection.
*
* @param buffer Pointer to data buffer.
* @param size Size of the buffer in bytes.
- * @return Number of bytes that have been received on success and negative
- * value on failure.
+ * @param timeout Timeout.
+ * @return Number of bytes that have been sent on success,
+ * WaitResult::TIMEOUT on timeout and -errno on failure.
*/
- int Receive(int8_t* buffer, size_t size);
+ int Receive(int8_t* buffer, size_t size, int32_t timeout);
+
+ /**
+ * Check if the socket is blocking or not.
+ * @return @c true if the socket is blocking and false otherwise.
+ */
+ bool IsBlocking() const
+ {
+ return blocking;
+ }
private:
/**
@@ -97,8 +124,24 @@ namespace ignite
*/
void TrySetOptions(diagnostic::Diagnosable& diag);
+ /**
+ * Wait on the socket for any event for specified time.
+ * This function uses poll to achive timeout functionality
+ * for every separate socket operation.
+ *
+ * @param timeout Timeout.
+ * @param rd Wait for read if @c true, or for write if @c false.
+ * @return -errno on error, WaitResult::TIMEOUT on timeout and
+ * WaitResult::SUCCESS on success.
+ */
+ int WaitOnSocket(int32_t timeout, bool rd);
+
+ /** Handle. */
intptr_t socketHandle;
+ /** Blocking flag. */
+ bool blocking;
+
IGNITE_NO_COPY_ASSIGNMENT(SocketClient)
};
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc/os/linux/src/system/socket_client.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/os/linux/src/system/socket_client.cpp b/modules/platforms/cpp/odbc/os/linux/src/system/socket_client.cpp
index 5a9b03a..a6d6151 100644
--- a/modules/platforms/cpp/odbc/os/linux/src/system/socket_client.cpp
+++ b/modules/platforms/cpp/odbc/os/linux/src/system/socket_client.cpp
@@ -20,6 +20,7 @@
#include <netinet/tcp.h>
#include <netdb.h>
#include <unistd.h>
+#include <fcntl.h>
#include <cstring>
@@ -35,26 +36,36 @@ namespace
{
/**
* Get last socket error message.
+ * @param error Error code.
* @return Last socket error message string.
*/
- std::string GetLastSocketErrorMessage()
+ std::string GetSocketErrorMessage(int error)
{
- int lastError = errno;
std::stringstream res;
- res << "error_code=" << lastError;
+ res << "error_code=" << error;
- if (lastError == 0)
+ if (error == 0)
return res.str();
char buffer[1024] = "";
- strerror_r(lastError, buffer, sizeof(buffer));
-
- res << ", msg=" << buffer;
+ if (!strerror_r(error, buffer, sizeof(buffer)))
+ res << ", msg=" << buffer;
return res.str();
}
+
+ /**
+ * Get last socket error message.
+ * @return Last socket error message string.
+ */
+ std::string GetLastSocketErrorMessage()
+ {
+ int lastError = errno;
+
+ return GetSocketErrorMessage(lastError);
+ }
}
namespace ignite
@@ -64,7 +75,9 @@ namespace ignite
namespace tcp
{
- SocketClient::SocketClient() : socketHandle(SOCKET_ERROR)
+ SocketClient::SocketClient() :
+ socketHandle(SOCKET_ERROR),
+ blocking(true)
{
// No-op.
}
@@ -129,11 +142,27 @@ namespace ignite
res = connect(socketHandle, it->ai_addr, static_cast<int>(it->ai_addrlen));
if (SOCKET_ERROR == res)
{
- LOG_MSG("Connection failed: " << GetLastSocketErrorMessage());
+ int lastError = errno;
+
+ if (lastError != EWOULDBLOCK && lastError != EINPROGRESS)
+ {
+ LOG_MSG("Connection failed: " << GetSocketErrorMessage(lastError));
+
+ Close();
- Close();
+ continue;
+ }
- continue;
+ res = WaitOnSocket(CONNECT_TIMEOUT, false);
+
+ if (res < 0 || res == WaitResult::TIMEOUT)
+ {
+ LOG_MSG("Connection timeout expired: " << GetSocketErrorMessage(-res));
+
+ Close();
+
+ continue;
+ }
}
break;
}
@@ -153,13 +182,29 @@ namespace ignite
}
}
- int SocketClient::Send(const int8_t* data, size_t size)
+ int SocketClient::Send(const int8_t* data, size_t size, int32_t timeout)
{
+ if (!blocking)
+ {
+ int res = WaitOnSocket(timeout, false);
+
+ if (res < 0 || res == WaitResult::TIMEOUT)
+ return res;
+ }
+
return send(socketHandle, reinterpret_cast<const char*>(data), static_cast<int>(size), 0);
}
- int SocketClient::Receive(int8_t* buffer, size_t size)
+ int SocketClient::Receive(int8_t* buffer, size_t size, int32_t timeout)
{
+ if (!blocking)
+ {
+ int res = WaitOnSocket(timeout, true);
+
+ if (res < 0 || res == WaitResult::TIMEOUT)
+ return res;
+ }
+
return recv(socketHandle, reinterpret_cast<char*>(buffer), static_cast<int>(size), 0);
}
@@ -203,6 +248,30 @@ namespace ignite
"Can not set up TCP no-delay mode");
}
+ res = setsockopt(socketHandle, SOL_SOCKET, SO_OOBINLINE,
+ reinterpret_cast<char*>(&trueOpt), sizeof(trueOpt));
+
+ if (SOCKET_ERROR == res)
+ {
+ LOG_MSG("TCP out-of-bound data inlining setup failed: " << GetLastSocketErrorMessage());
+
+ diag.AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED,
+ "Can not set up TCP out-of-bound data inlining");
+ }
+
+ blocking = false;
+
+ int flags;
+ if (((flags = fcntl(socketHandle, F_GETFL, 0)) < 0) ||
+ (fcntl(socketHandle, F_SETFL, flags | O_NONBLOCK) < 0))
+ {
+ blocking = true;
+ LOG_MSG("Non-blocking mode setup failed: " << GetLastSocketErrorMessage());
+
+ diag.AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED,
+ "Can not set up non-blocking mode. Timeouts are not available.");
+ }
+
res = setsockopt(socketHandle, SOL_SOCKET, SO_KEEPALIVE,
reinterpret_cast<char*>(&trueOpt), sizeof(trueOpt));
@@ -238,6 +307,52 @@ namespace ignite
diag.AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED,
"Can not set up TCP keep-alive probes period");
}
+
+ }
+
+ int SocketClient::WaitOnSocket(int32_t timeout, bool rd)
+ {
+ int ready = 0;
+ int lastError = 0;
+
+ fd_set fds;
+
+ do {
+ struct timeval tv = { 0 };
+ tv.tv_sec = timeout;
+
+ FD_ZERO(&fds);
+ FD_SET(socketHandle, &fds);
+
+ fd_set* readFds = 0;
+ fd_set* writeFds = 0;
+
+ if (rd)
+ readFds = &fds;
+ else
+ writeFds = &fds;
+
+ ready = select(static_cast<int>((socketHandle) + 1),
+ readFds, writeFds, NULL, (timeout == 0 ? NULL : &tv));
+
+ if (ready == SOCKET_ERROR)
+ lastError = errno;
+
+ } while (ready == SOCKET_ERROR && lastError == EINTR);
+
+ if (ready == SOCKET_ERROR)
+ return -lastError;
+
+ socklen_t size = sizeof(lastError);
+ int res = getsockopt(socketHandle, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&lastError), &size);
+
+ if (res != SOCKET_ERROR && lastError != 0)
+ return -lastError;
+
+ if (ready == 0)
+ return WaitResult::TIMEOUT;
+
+ return WaitResult::SUCCESS;
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp b/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp
index 30fb7d7..6f87b93 100644
--- a/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp
+++ b/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp
@@ -34,17 +34,17 @@
namespace
{
/**
- * Get last socket error message.
- * @return Last socket error message string.
+ * Get socket error message for the error code.
+ * @param error Error code.
+ * @return Socket error message string.
*/
- std::string GetLastSocketErrorMessage()
+ std::string GetSocketErrorMessage(HRESULT error)
{
- HRESULT lastError = WSAGetLastError();
std::stringstream res;
- res << "error_code=" << lastError;
+ res << "error_code=" << error;
- if (lastError == 0)
+ if (error == 0)
return res.str();
LPTSTR errorText = NULL;
@@ -58,7 +58,7 @@ namespace
| FORMAT_MESSAGE_IGNORE_INSERTS,
// unused with FORMAT_MESSAGE_FROM_SYSTEM
NULL,
- lastError,
+ error,
MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US),
// output
reinterpret_cast<LPTSTR>(&errorText),
@@ -77,6 +77,17 @@ namespace
return res.str();
}
+
+ /**
+ * Get last socket error message.
+ * @return Last socket error message string.
+ */
+ std::string GetLastSocketErrorMessage()
+ {
+ HRESULT lastError = WSAGetLastError();
+
+ return GetSocketErrorMessage(lastError);
+ }
}
namespace ignite
@@ -86,7 +97,9 @@ namespace ignite
namespace tcp
{
- SocketClient::SocketClient() : socketHandle(INVALID_SOCKET)
+ SocketClient::SocketClient() :
+ socketHandle(INVALID_SOCKET),
+ blocking(true)
{
// No-op.
}
@@ -170,11 +183,27 @@ namespace ignite
res = connect(socketHandle, it->ai_addr, static_cast<int>(it->ai_addrlen));
if (SOCKET_ERROR == res)
{
- LOG_MSG("Connection failed: " << GetLastSocketErrorMessage());
+ int lastError = WSAGetLastError();
- Close();
+ if (lastError != WSAEWOULDBLOCK)
+ {
+ LOG_MSG("Connection failed: " << GetSocketErrorMessage(lastError));
- continue;
+ Close();
+
+ continue;
+ }
+
+ res = WaitOnSocket(CONNECT_TIMEOUT, false);
+
+ if (res < 0 || res == WaitResult::TIMEOUT)
+ {
+ LOG_MSG("Connection timeout expired: " << GetSocketErrorMessage(-res));
+
+ Close();
+
+ continue;
+ }
}
break;
}
@@ -194,21 +223,39 @@ namespace ignite
}
}
- int SocketClient::Send(const int8_t* data, size_t size)
+ int SocketClient::Send(const int8_t* data, size_t size, int32_t timeout)
{
+ if (!blocking)
+ {
+ int res = WaitOnSocket(timeout, false);
+
+ if (res < 0 || res == WaitResult::TIMEOUT)
+ return res;
+ }
+
return send(socketHandle, reinterpret_cast<const char*>(data), static_cast<int>(size), 0);
}
- int SocketClient::Receive(int8_t* buffer, size_t size)
+ int SocketClient::Receive(int8_t* buffer, size_t size, int32_t timeout)
{
+ if (!blocking)
+ {
+ int res = WaitOnSocket(timeout, true);
+
+ if (res < 0 || res == WaitResult::TIMEOUT)
+ return res;
+ }
+
return recv(socketHandle, reinterpret_cast<char*>(buffer), static_cast<int>(size), 0);
}
void SocketClient::TrySetOptions(diagnostic::Diagnosable& diag)
{
BOOL trueOpt = TRUE;
+ ULONG uTrueOpt = TRUE;
int bufSizeOpt = BUFFER_SIZE;
+
int res = setsockopt(socketHandle, SOL_SOCKET, SO_SNDBUF,
reinterpret_cast<char*>(&bufSizeOpt), sizeof(bufSizeOpt));
@@ -242,6 +289,29 @@ namespace ignite
"Can not set up TCP no-delay mode");
}
+ res = setsockopt(socketHandle, SOL_SOCKET, SO_OOBINLINE,
+ reinterpret_cast<char*>(&trueOpt), sizeof(trueOpt));
+
+ if (SOCKET_ERROR == res)
+ {
+ LOG_MSG("TCP out-of-bound data inlining setup failed: " << GetLastSocketErrorMessage());
+
+ diag.AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED,
+ "Can not set up TCP out-of-bound data inlining");
+ }
+
+ blocking = false;
+ res = ioctlsocket(socketHandle, FIONBIO, &uTrueOpt);
+
+ if (res == SOCKET_ERROR)
+ {
+ blocking = true;
+ LOG_MSG("Non-blocking mode setup failed: " << GetLastSocketErrorMessage());
+
+ diag.AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED,
+ "Can not set up non-blocking mode. Timeouts are not available.");
+ }
+
res = setsockopt(socketHandle, SOL_SOCKET, SO_KEEPALIVE,
reinterpret_cast<char*>(&trueOpt), sizeof(trueOpt));
@@ -318,6 +388,44 @@ namespace ignite
#endif
}
+ int SocketClient::WaitOnSocket(int32_t timeout, bool rd)
+ {
+ int ready = 0;
+ int lastError = 0;
+
+ fd_set fds;
+
+ do {
+ struct timeval tv = { 0 };
+ tv.tv_sec = timeout;
+
+ FD_ZERO(&fds);
+ FD_SET(socketHandle, &fds);
+
+ fd_set* readFds = 0;
+ fd_set* writeFds = 0;
+
+ if (rd)
+ readFds = &fds;
+ else
+ writeFds = &fds;
+
+ ready = select(static_cast<int>((socketHandle) + 1),
+ readFds, writeFds, NULL, (timeout == 0 ? NULL : &tv));
+
+ if (ready == SOCKET_ERROR)
+ lastError = WSAGetLastError();
+
+ } while (ready == SOCKET_ERROR && lastError == WSAEINTR);
+
+ if (ready == SOCKET_ERROR)
+ return -lastError;
+
+ if (ready == 0)
+ return WaitResult::TIMEOUT;
+
+ return WaitResult::SUCCESS;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc/src/connection.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/connection.cpp b/modules/platforms/cpp/odbc/src/connection.cpp
index b99d768..8b03876 100644
--- a/modules/platforms/cpp/odbc/src/connection.cpp
+++ b/modules/platforms/cpp/odbc/src/connection.cpp
@@ -16,6 +16,7 @@
*/
#include <cstring>
+#include <cstddef>
#include <sstream>
@@ -27,7 +28,6 @@
#include "ignite/odbc/connection.h"
#include "ignite/odbc/message.h"
#include "ignite/odbc/config/configuration.h"
-#include "ignite/odbc/odbc_error.h"
namespace
{
@@ -46,6 +46,7 @@ namespace ignite
Connection::Connection() :
socket(),
connected(false),
+ timeout(0),
parser(),
config(),
info(config)
@@ -194,7 +195,7 @@ namespace ignite
return SqlResult::AI_SUCCESS;
}
- void Connection::Send(const int8_t* data, size_t len)
+ bool Connection::Send(const int8_t* data, size_t len, int32_t timeout)
{
if (!connected)
throw OdbcError(SqlState::S08003_NOT_CONNECTED, "Connection is not established");
@@ -209,38 +210,45 @@ namespace ignite
memcpy(msg.GetData() + sizeof(OdbcProtocolHeader), data, len);
- size_t sent = SendAll(msg.GetData(), msg.GetSize());
+ OperationResult::T res = SendAll(msg.GetData(), msg.GetSize(), timeout);
- if (sent != len + sizeof(OdbcProtocolHeader))
+ if (res == OperationResult::TIMEOUT)
+ return false;
+
+ if (res == OperationResult::FAIL)
throw OdbcError(SqlState::S08S01_LINK_FAILURE, "Can not send message due to connection failure");
LOG_MSG("message sent: (" << msg.GetSize() << " bytes)" << utility::HexDump(msg.GetData(), msg.GetSize()));
+
+ return true;
}
- size_t Connection::SendAll(const int8_t* data, size_t len)
+ Connection::OperationResult::T Connection::SendAll(const int8_t* data, size_t len, int32_t timeout)
{
int sent = 0;
while (sent != static_cast<int64_t>(len))
{
- int res = socket.Send(data + sent, len - sent);
+ int res = socket.Send(data + sent, len - sent, timeout);
LOG_MSG("Sent: " << res);
- if (res <= 0)
+ if (res < 0 || res == tcp::SocketClient::WaitResult::TIMEOUT)
{
Close();
- return sent;
+ return res < 0 ? OperationResult::FAIL : OperationResult::TIMEOUT;
}
sent += res;
}
- return sent;
+ assert(static_cast<size_t>(sent) == len);
+
+ return OperationResult::SUCCESS;
}
- void Connection::Receive(std::vector<int8_t>& msg)
+ bool Connection::Receive(std::vector<int8_t>& msg, int32_t timeout)
{
if (!connected)
throw OdbcError(SqlState::S08003_NOT_CONNECTED, "Connection is not established");
@@ -249,36 +257,40 @@ namespace ignite
OdbcProtocolHeader hdr;
- size_t received = ReceiveAll(reinterpret_cast<int8_t*>(&hdr), sizeof(hdr));
+ OperationResult::T res = ReceiveAll(reinterpret_cast<int8_t*>(&hdr), sizeof(hdr), timeout);
+
+ if (res == OperationResult::TIMEOUT)
+ return false;
- if (received != sizeof(hdr))
+ if (res == OperationResult::FAIL)
throw OdbcError(SqlState::S08S01_LINK_FAILURE, "Can not receive message header");
if (hdr.len < 0)
{
Close();
- throw OdbcError(SqlState::S08S01_LINK_FAILURE, "Protocol error: Message length is negative");
+ throw OdbcError(SqlState::SHY000_GENERAL_ERROR, "Protocol error: Message length is negative");
}
if (hdr.len == 0)
- return;
+ return false;
msg.resize(hdr.len);
- received = ReceiveAll(&msg[0], hdr.len);
+ res = ReceiveAll(&msg[0], hdr.len, timeout);
- if (received != hdr.len)
- {
- msg.resize(received);
+ if (res == OperationResult::TIMEOUT)
+ return false;
+ if (res == OperationResult::FAIL)
throw OdbcError(SqlState::S08S01_LINK_FAILURE, "Can not receive message body");
- }
LOG_MSG("Message received: " << utility::HexDump(&msg[0], msg.size()));
+
+ return true;
}
- size_t Connection::ReceiveAll(void* dst, size_t len)
+ Connection::OperationResult::T Connection::ReceiveAll(void* dst, size_t len, int32_t timeout)
{
size_t remain = len;
int8_t* buffer = reinterpret_cast<int8_t*>(dst);
@@ -287,20 +299,20 @@ namespace ignite
{
size_t received = len - remain;
- int res = socket.Receive(buffer + received, remain);
+ int res = socket.Receive(buffer + received, remain, timeout);
LOG_MSG("Receive res: " << res << " remain: " << remain);
- if (res <= 0)
+ if (res < 0 || res == tcp::SocketClient::WaitResult::TIMEOUT)
{
Close();
- return received;
+ return res < 0 ? OperationResult::FAIL : OperationResult::TIMEOUT;
}
remain -= static_cast<size_t>(res);
}
- return len;
+ return OperationResult::SUCCESS;
}
const std::string& Connection::GetSchema() const
@@ -334,6 +346,14 @@ namespace ignite
IGNITE_ODBC_API_CALL(InternalTransactionRollback());
}
+ SqlResult::Type Connection::InternalTransactionRollback()
+ {
+ AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED,
+ "Rollback operation is not supported.");
+
+ return SqlResult::AI_ERROR;
+ }
+
void Connection::GetAttribute(int attr, void* buf, SQLINTEGER bufLen, SQLINTEGER* valueLen)
{
IGNITE_ODBC_API_CALL(InternalGetAttribute(attr, buf, bufLen, valueLen));
@@ -343,7 +363,7 @@ namespace ignite
{
if (!buf)
{
- AddStatusRecord(SqlState::SHY000_GENERAL_ERROR, "Data buffer is NULL.");
+ AddStatusRecord(SqlState::SHY009_INVALID_USE_OF_NULL_POINTER, "Data buffer is null.");
return SqlResult::AI_ERROR;
}
@@ -362,6 +382,18 @@ namespace ignite
break;
}
+ case SQL_ATTR_CONNECTION_TIMEOUT:
+ {
+ SQLUINTEGER *val = reinterpret_cast<SQLUINTEGER*>(buf);
+
+ *val = static_cast<SQLUINTEGER>(timeout);
+
+ if (valueLen)
+ *valueLen = SQL_IS_INTEGER;
+
+ break;
+ }
+
default:
{
AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED,
@@ -381,6 +413,13 @@ namespace ignite
SqlResult::Type Connection::InternalSetAttribute(int attr, void* value, SQLINTEGER valueLen)
{
+ if (!value)
+ {
+ AddStatusRecord(SqlState::SHY009_INVALID_USE_OF_NULL_POINTER, "Value pointer is null.");
+
+ return SqlResult::AI_ERROR;
+ }
+
switch (attr)
{
case SQL_ATTR_CONNECTION_DEAD:
@@ -390,6 +429,39 @@ namespace ignite
return SqlResult::AI_ERROR;
}
+ case SQL_ATTR_CONNECTION_TIMEOUT:
+ {
+ SQLUINTEGER uTimeout = static_cast<SQLUINTEGER>(reinterpret_cast<ptrdiff_t>(value));
+
+ if (uTimeout != 0 && connected && socket.IsBlocking())
+ {
+ timeout = 0;
+
+ AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED, "Can not set timeout, because can not "
+ "enable non-blocking mode on TCP connection. Setting to 0.");
+
+ return SqlResult::AI_SUCCESS_WITH_INFO;
+ }
+
+ if (uTimeout > INT32_MAX)
+ {
+ timeout = INT32_MAX;
+
+ std::stringstream ss;
+
+ ss << "Value is too big: " << uTimeout << ", changing to " << timeout << ".";
+ std::string msg = ss.str();
+
+ AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED, msg);
+
+ return SqlResult::AI_SUCCESS_WITH_INFO;
+ }
+
+ timeout = static_cast<int32_t>(uTimeout);
+
+ break;
+ }
+
default:
{
AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED,
@@ -402,14 +474,6 @@ namespace ignite
return SqlResult::AI_SUCCESS;
}
- SqlResult::Type Connection::InternalTransactionRollback()
- {
- AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED,
- "Rollback operation is not supported.");
-
- return SqlResult::AI_ERROR;
- }
-
SqlResult::Type Connection::MakeRequestHandshake()
{
bool distributedJoins = false;
@@ -451,7 +515,16 @@ namespace ignite
try
{
- SyncMessage(req, rsp);
+ // Workaround for some Linux systems that report connection on non-blocking
+ // sockets as successfull but fail to establish real connection.
+ bool sent = SyncMessage(req, rsp, tcp::SocketClient::CONNECT_TIMEOUT);
+
+ if (!sent)
+ {
+ AddStatusRecord(SqlState::S08001_CANNOT_CONNECT, "Failed to establish connection with the host.");
+
+ return SqlResult::AI_ERROR;
+ }
}
catch (const OdbcError& err)
{
http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record.cpp b/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record.cpp
index 0a02310..7fa7669 100644
--- a/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record.cpp
+++ b/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record.cpp
@@ -133,6 +133,9 @@ namespace
/** SQL state HYC00 constant. */
const std::string STATE_HYC00 = "HYC00";
+ /** SQL state HYT00 constant. */
+ const std::string STATE_HYT00 = "HYT00";
+
/** SQL state HYT01 constant. */
const std::string STATE_HYT01 = "HYT01";
@@ -365,6 +368,9 @@ namespace ignite
case SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED:
return STATE_HYC00;
+ case SqlState::SHYT00_TIMEOUT_EXPIRED:
+ return STATE_HYT00;
+
case SqlState::SHYT01_CONNECTION_TIMEOUT:
return STATE_HYT01;
http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc/src/query/batch_query.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/query/batch_query.cpp b/modules/platforms/cpp/odbc/src/query/batch_query.cpp
index 07d42d4..a9db8d8 100644
--- a/modules/platforms/cpp/odbc/src/query/batch_query.cpp
+++ b/modules/platforms/cpp/odbc/src/query/batch_query.cpp
@@ -153,7 +153,17 @@ namespace ignite
try
{
- connection.SyncMessage(req, rsp);
+ // Setting connection timeout to 1 second more than query timeout itself.
+ int32_t connectionTimeout = timeout ? timeout + 1 : 0;
+
+ bool success = connection.SyncMessage(req, rsp, connectionTimeout);
+
+ if (!success)
+ {
+ diag.AddStatusRecord(SqlState::SHYT00_TIMEOUT_EXPIRED, "Query timeout expired");
+
+ return SqlResult::AI_ERROR;
+ }
}
catch (const OdbcError& err)
{
http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc/src/query/data_query.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/query/data_query.cpp b/modules/platforms/cpp/odbc/src/query/data_query.cpp
index 0539af5..e7bf5a0 100644
--- a/modules/platforms/cpp/odbc/src/query/data_query.cpp
+++ b/modules/platforms/cpp/odbc/src/query/data_query.cpp
@@ -222,7 +222,17 @@ namespace ignite
try
{
- connection.SyncMessage(req, rsp);
+ // Setting connection timeout to 1 second more than query timeout itself.
+ int32_t connectionTimeout = timeout ? timeout + 1 : 0;
+
+ bool success = connection.SyncMessage(req, rsp, connectionTimeout);
+
+ if (!success)
+ {
+ diag.AddStatusRecord(SqlState::SHYT00_TIMEOUT_EXPIRED, "Query timeout expired");
+
+ return SqlResult::AI_ERROR;
+ }
}
catch (const OdbcError& err)
{