You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by is...@apache.org on 2021/03/01 18:04:47 UTC
[ignite] branch master updated: IGNITE-14204 Fix C++ thin
transactions
This is an automated email from the ASF dual-hosted git repository.
isapego pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 0675e2a IGNITE-14204 Fix C++ thin transactions
0675e2a is described below
commit 0675e2a7e800730c9c8230332b82809754ddae5a
Author: Igor Sapego <is...@apache.org>
AuthorDate: Mon Mar 1 21:03:54 2021 +0300
IGNITE-14204 Fix C++ thin transactions
This closes #8836
---
.../platforms/cpp/thin-client-test/src/tx_test.cpp | 85 ++++++++----
modules/platforms/cpp/thin-client/CMakeLists.txt | 1 +
.../cpp/thin-client/project/vs/thin-client.vcxproj | 1 +
.../project/vs/thin-client.vcxproj.filters | 24 ++++
.../src/impl/cache/cache_client_impl.cpp | 145 ++++++++-------------
.../thin-client/src/impl/cache/cache_client_impl.h | 37 +++++-
.../cpp/thin-client/src/impl/data_router.h | 10 ++
.../platforms/cpp/thin-client/src/impl/message.h | 18 +--
...{transactions_impl.cpp => transaction_impl.cpp} | 94 +++----------
.../src/impl/transactions/transaction_impl.h | 50 +++++--
.../src/impl/transactions/transactions_impl.cpp | 135 +------------------
.../src/impl/transactions/transactions_impl.h | 48 +------
12 files changed, 252 insertions(+), 396 deletions(-)
diff --git a/modules/platforms/cpp/thin-client-test/src/tx_test.cpp b/modules/platforms/cpp/thin-client-test/src/tx_test.cpp
index 19757a2..04422ad 100644
--- a/modules/platforms/cpp/thin-client-test/src/tx_test.cpp
+++ b/modules/platforms/cpp/thin-client-test/src/tx_test.cpp
@@ -39,7 +39,8 @@ class IgniteTxTestSuiteFixture
public:
IgniteTxTestSuiteFixture()
{
- serverNode = ignite_test::StartCrossPlatformServerNode("cache.xml", "ServerNode");
+ node1 = ignite_test::StartCrossPlatformServerNode("cache.xml", "node1");
+ node2 = ignite_test::StartCrossPlatformServerNode("cache.xml", "node2");
}
~IgniteTxTestSuiteFixture()
@@ -47,9 +48,24 @@ public:
ignite::Ignition::StopAll(false);
}
+ /**
+ * Start client.
+ */
+ static IgniteClient StartClient()
+ {
+ IgniteClientConfiguration cfg;
+
+ cfg.SetEndPoints("127.0.0.1:11110,127.0.0.1:11111");
+
+ return IgniteClient::Start(cfg);
+ }
+
private:
- /** Server node. */
- ignite::Ignite serverNode;
+ /** Server node #1. */
+ ignite::Ignite node1;
+
+ /** Server node #2. */
+ ignite::Ignite node2;
};
BOOST_FIXTURE_TEST_SUITE(IgniteTxTestSuite, IgniteTxTestSuiteFixture)
@@ -75,11 +91,7 @@ bool checkTxTimeoutMessage(const ignite::IgniteError& ex)
BOOST_AUTO_TEST_CASE(TestCacheOpsWithTx)
{
- IgniteClientConfiguration cfg;
-
- cfg.SetEndPoints("127.0.0.1:11110");
-
- IgniteClient client = IgniteClient::Start(cfg);
+ IgniteClient client = StartClient();
cache::CacheClient<int, int> cache =
client.GetCache<int, int>("partitioned");
@@ -190,7 +202,7 @@ BOOST_AUTO_TEST_CASE(TestCacheOpsWithTx)
tx.Rollback();
- BOOST_CHECK_EQUAL(cache.GetSize(cache::CachePeekMode::ALL), 1);
+ BOOST_CHECK_EQUAL(cache.GetSize(cache::CachePeekMode::PRIMARY), 1);
//---
@@ -241,11 +253,7 @@ BOOST_AUTO_TEST_CASE(TestCacheOpsWithTx)
void startAnotherClientAndTx(SharedPointer<SingleLatch>& l)
{
- IgniteClientConfiguration cfg;
-
- cfg.SetEndPoints("127.0.0.1:11110");
-
- IgniteClient client = IgniteClient::Start(cfg);
+ IgniteClient client = IgniteTxTestSuiteFixture::StartClient();
cache::CacheClient<int, int> cache =
client.GetCache<int, int>("partitioned");
@@ -263,11 +271,7 @@ void startAnotherClientAndTx(SharedPointer<SingleLatch>& l)
BOOST_AUTO_TEST_CASE(TestTxOps)
{
- IgniteClientConfiguration cfg;
-
- cfg.SetEndPoints("127.0.0.1:11110");
-
- IgniteClient client = IgniteClient::Start(cfg);
+ IgniteClient client = StartClient();
cache::CacheClient<int, int> cache =
client.GetCache<int, int>("partitioned");
@@ -347,11 +351,7 @@ bool checkTxLabel1Message(const ignite::IgniteError& ex)
BOOST_AUTO_TEST_CASE(TestTxWithLabel)
{
- IgniteClientConfiguration cfg;
-
- cfg.SetEndPoints("127.0.0.1:11110");
-
- IgniteClient client = IgniteClient::Start(cfg);
+ IgniteClient client = StartClient();
cache::CacheClient<int, int> cache =
client.GetCache<int, int>("partitioned");
@@ -397,4 +397,41 @@ BOOST_AUTO_TEST_CASE(TestTxWithLabel)
tx.Close();
}
+BOOST_AUTO_TEST_CASE(ManyTransactions)
+{
+ IgniteClient client = StartClient();
+
+ cache::CacheClient<int, int> cache =
+ client.GetCache<int, int>("partitioned");
+
+ transactions::ClientTransactions transactions = client.ClientTransactions();
+ const int32_t key = 42;
+
+ for (int32_t val = 0; val < 100; ++val) {
+ transactions::ClientTransaction tx = transactions.TxStart();
+
+ cache.Put(key, val);
+
+ tx.Commit();
+
+ BOOST_CHECK_EQUAL(val, cache.Get(key));
+ }
+
+ const int32_t expected = -42;
+
+ cache.Put(key, expected);
+
+ BOOST_CHECK_EQUAL(expected, cache.Get(key));
+
+ for (int32_t val = 0; val < 100; ++val) {
+ transactions::ClientTransaction tx = transactions.TxStart();
+
+ cache.Put(key, val);
+
+ tx.Rollback();
+
+ BOOST_CHECK_EQUAL(expected, cache.Get(key));
+ }
+}
+
BOOST_AUTO_TEST_SUITE_END()
diff --git a/modules/platforms/cpp/thin-client/CMakeLists.txt b/modules/platforms/cpp/thin-client/CMakeLists.txt
index 059b012..bc1fbeb 100644
--- a/modules/platforms/cpp/thin-client/CMakeLists.txt
+++ b/modules/platforms/cpp/thin-client/CMakeLists.txt
@@ -34,6 +34,7 @@ set(SOURCES src/impl/data_channel.cpp
src/impl/message.cpp
src/impl/cache/cache_client_proxy.cpp
src/impl/cache/cache_client_impl.cpp
+ src/impl/transactions/transaction_impl.cpp
src/impl/transactions/transactions_impl.cpp
src/impl/transactions/transactions_proxy.cpp
src/ignite_client.cpp
diff --git a/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj b/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj
index 4d60182..d5cdc6f 100644
--- a/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj
+++ b/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj
@@ -169,6 +169,7 @@
<ClCompile Include="..\..\src\impl\protocol_version.cpp" />
<ClCompile Include="..\..\src\impl\remote_type_updater.cpp" />
<ClCompile Include="..\..\src\impl\utility.cpp" />
+ <ClCompile Include="..\..\src\impl\transactions\transaction_impl.cpp" />
<ClCompile Include="..\..\src\impl\transactions\transactions_impl.cpp" />
<ClCompile Include="..\..\src\impl\transactions\transactions_proxy.cpp" />
</ItemGroup>
diff --git a/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj.filters b/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj.filters
index 263325c..c9a092a 100644
--- a/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj.filters
+++ b/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj.filters
@@ -67,6 +67,15 @@
<ClCompile Include="..\..\src\impl\affinity\affinity_manager.cpp">
<Filter>Code\impl\affinity</Filter>
</ClCompile>
+ <ClCompile Include="..\..\src\impl\transactions\transaction_impl.cpp">
+ <Filter>Code\impl\transactions</Filter>
+ </ClCompile>
+ <ClCompile Include="..\..\src\impl\transactions\transactions_impl.cpp">
+ <Filter>Code\impl\transactions</Filter>
+ </ClCompile>
+ <ClCompile Include="..\..\src\impl\transactions\transactions_proxy.cpp">
+ <Filter>Code\impl\transactions</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\include\ignite\thin\ignite_client.h">
@@ -174,5 +183,20 @@
<ClInclude Include="..\..\src\impl\affinity\partition_awareness_group.h">
<Filter>Code\impl\affinity</Filter>
</ClInclude>
+ <ClInclude Include="..\..\include\ignite\thin\transactions\transactions.h">
+ <Filter>Code\transactions</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\include\ignite\thin\transactions\transaction.h">
+ <Filter>Code\transactions</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\include\ignite\thin\transactions\transaction_consts.h">
+ <Filter>Code\transactions</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\include\ignite\thin\transactions\transactions_proxy.h">
+ <Filter>Code\transactions</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\src\impl\transactions\transactions_impl.h">
+ <Filter>Code\impl\transactions</Filter>
+ </ClInclude>
</ItemGroup>
</Project>
\ No newline at end of file
diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp
index 51b0c28..d66b9e6 100644
--- a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp
@@ -101,71 +101,79 @@ namespace ignite
return channel;
}
- template<typename ReqT>
- void CacheClientImpl::checkTransactional(ReqT& req)
+ template<typename ReqT, typename RspT>
+ void CacheClientImpl::TransactionalSyncCacheKeyMessage(const WritableKey &key, ReqT &req,
+ RspT &rsp)
+ {
+ if (!TryProcessTransactional(req, rsp))
+ SyncCacheKeyMessage(key, req, rsp);
+ }
+
+ template<typename ReqT, typename RspT>
+ void CacheClientImpl::TransactionalSyncMessage(ReqT &req, RspT &rsp)
{
- SP_TransactionImpl activeTx = tx.Get()->GetCurrent();
+ if (!TryProcessTransactional(req, rsp))
+ SyncMessage(req, rsp);
+ }
- bool isUnderTx = activeTx.IsValid();
+ template<typename ReqT, typename RspT>
+ bool CacheClientImpl::TryProcessTransactional(ReqT& req, RspT& rsp)
+ {
+ TransactionImpl* activeTx = tx.Get()->GetCurrent().Get();
- int32_t txId = isUnderTx ? activeTx.Get()->TxId() : 0;
+ if (!activeTx)
+ return false;
- req.activeTx(isUnderTx, txId);
+ req.activeTx(true, activeTx->TxId());
+
+ SP_DataChannel channel = activeTx->GetChannel();
+
+ channel.Get()->SyncMessage(req, rsp, router.Get()->GetIoTimeout());
+
+ if (rsp.GetStatus() != ResponseStatus::SUCCESS)
+ throw IgniteError(IgniteError::IGNITE_ERR_CACHE, rsp.GetError().c_str());
+
+ return true;
}
void CacheClientImpl::Put(const WritableKey& key, const Writable& value)
{
Cache2ValueRequest<RequestType::CACHE_PUT> req(id, binary, key, value);
-
- checkTransactional(req);
-
Response rsp;
- SyncCacheKeyMessage(key, req, rsp);
+ TransactionalSyncCacheKeyMessage(key, req, rsp);
}
void CacheClientImpl::Get(const WritableKey& key, Readable& value)
{
CacheValueRequest<RequestType::CACHE_GET> req(id, binary, key);
-
- checkTransactional(req);
-
CacheValueResponse rsp(value);
- SyncCacheKeyMessage(key, req, rsp);
+ TransactionalSyncCacheKeyMessage(key, req, rsp);
}
void CacheClientImpl::PutAll(const Writable & pairs)
{
CacheValueRequest<RequestType::CACHE_PUT_ALL> req(id, binary, pairs);
-
- checkTransactional(req);
-
Response rsp;
- SyncMessage(req, rsp);
+ TransactionalSyncMessage(req, rsp);
}
void CacheClientImpl::GetAll(const Writable& keys, Readable& pairs)
{
CacheValueRequest<RequestType::CACHE_GET_ALL> req(id, binary, keys);
-
- checkTransactional(req);
-
CacheValueResponse rsp(pairs);
- SyncMessage(req, rsp);
+ TransactionalSyncMessage(req, rsp);
}
bool CacheClientImpl::Replace(const WritableKey& key, const Writable& value)
{
Cache2ValueRequest<RequestType::CACHE_REPLACE> req(id, binary, key, value);
-
- checkTransactional(req);
-
BoolResponse rsp;
- SyncCacheKeyMessage(key, req, rsp);
+ TransactionalSyncCacheKeyMessage(key, req, rsp);
return rsp.GetValue();
}
@@ -173,12 +181,9 @@ namespace ignite
bool CacheClientImpl::ContainsKey(const WritableKey& key)
{
CacheValueRequest<RequestType::CACHE_CONTAINS_KEY> req(id, binary, key);
-
- checkTransactional(req);
-
BoolResponse rsp;
- SyncCacheKeyMessage(key, req, rsp);
+ TransactionalSyncCacheKeyMessage(key, req, rsp);
return rsp.GetValue();
}
@@ -186,12 +191,9 @@ namespace ignite
bool CacheClientImpl::ContainsKeys(const Writable& keys)
{
CacheValueRequest<RequestType::CACHE_CONTAINS_KEYS> req(id, binary, keys);
-
- checkTransactional(req);
-
BoolResponse rsp;
- SyncMessage(req, rsp);
+ TransactionalSyncMessage(req, rsp);
return rsp.GetValue();
}
@@ -199,12 +201,9 @@ namespace ignite
int64_t CacheClientImpl::GetSize(int32_t peekModes)
{
CacheGetSizeRequest req(id, binary, peekModes);
-
- checkTransactional(req);
-
Int64Response rsp;
- SyncMessage(req, rsp);
+ TransactionalSyncMessage(req, rsp);
return rsp.GetValue();
}
@@ -212,12 +211,9 @@ namespace ignite
bool CacheClientImpl::Remove(const WritableKey& key)
{
CacheValueRequest<RequestType::CACHE_REMOVE_KEY> req(id, binary, key);
-
- checkTransactional(req);
-
BoolResponse rsp;
- SyncCacheKeyMessage(key, req, rsp);
+ TransactionalSyncCacheKeyMessage(key, req, rsp);
return rsp.GetValue();
}
@@ -225,12 +221,9 @@ namespace ignite
bool CacheClientImpl::Remove(const WritableKey& key, const Writable& val)
{
Cache2ValueRequest<RequestType::CACHE_REMOVE_IF_EQUALS> req(id, binary, key, val);
-
- checkTransactional(req);
-
BoolResponse rsp;
- SyncCacheKeyMessage(key, req, rsp);
+ TransactionalSyncCacheKeyMessage(key, req, rsp);
return rsp.GetValue();
}
@@ -238,78 +231,57 @@ namespace ignite
void CacheClientImpl::RemoveAll(const Writable& keys)
{
CacheValueRequest<RequestType::CACHE_REMOVE_KEYS> req(id, binary, keys);
-
- checkTransactional(req);
-
Response rsp;
- SyncMessage(req, rsp);
+ TransactionalSyncMessage(req, rsp);
}
void CacheClientImpl::RemoveAll()
{
CacheRequest<RequestType::CACHE_REMOVE_ALL> req(id, binary);
-
- checkTransactional(req);
-
Response rsp;
- SyncMessage(req, rsp);
+ TransactionalSyncMessage(req, rsp);
}
void CacheClientImpl::Clear(const WritableKey& key)
{
CacheValueRequest<RequestType::CACHE_CLEAR_KEY> req(id, binary, key);
-
- checkTransactional(req);
-
Response rsp;
- SyncCacheKeyMessage(key, req, rsp);
+ TransactionalSyncCacheKeyMessage(key, req, rsp);
}
void CacheClientImpl::Clear()
{
CacheRequest<RequestType::CACHE_CLEAR> req(id, binary);
-
- checkTransactional(req);
-
Response rsp;
- SyncMessage(req, rsp);
+ TransactionalSyncMessage(req, rsp);
}
void CacheClientImpl::ClearAll(const Writable& keys)
{
CacheValueRequest<RequestType::CACHE_CLEAR_KEYS> req(id, binary, keys);
-
- checkTransactional(req);
-
Response rsp;
- SyncMessage(req, rsp);
+ TransactionalSyncMessage(req, rsp);
}
void CacheClientImpl::LocalPeek(const WritableKey& key, Readable& value)
{
CacheValueRequest<RequestType::CACHE_LOCAL_PEEK> req(id, binary, key);
-
- checkTransactional(req);
-
CacheValueResponse rsp(value);
- SyncCacheKeyMessage(key, req, rsp);
+ TransactionalSyncCacheKeyMessage(key, req, rsp);
}
bool CacheClientImpl::Replace(const WritableKey& key, const Writable& oldVal, const Writable& newVal)
{
Cache3ValueRequest<RequestType::CACHE_REPLACE_IF_EQUALS> req(id, binary, key, oldVal, newVal);
-
- checkTransactional(req);
-
BoolResponse rsp;
- SyncCacheKeyMessage(key, req, rsp);
+ TransactionalSyncCacheKeyMessage(key, req, rsp);
return rsp.GetValue();
}
@@ -317,45 +289,33 @@ namespace ignite
void CacheClientImpl::GetAndPut(const WritableKey& key, const Writable& valIn, Readable& valOut)
{
Cache2ValueRequest<RequestType::CACHE_GET_AND_PUT> req(id, binary, key, valIn);
-
- checkTransactional(req);
-
CacheValueResponse rsp(valOut);
- SyncCacheKeyMessage(key, req, rsp);
+ TransactionalSyncCacheKeyMessage(key, req, rsp);
}
void CacheClientImpl::GetAndRemove(const WritableKey& key, Readable& valOut)
{
CacheValueRequest<RequestType::CACHE_GET_AND_REMOVE> req(id, binary, key);
-
- checkTransactional(req);
-
CacheValueResponse rsp(valOut);
- SyncCacheKeyMessage(key, req, rsp);
+ TransactionalSyncCacheKeyMessage(key, req, rsp);
}
void CacheClientImpl::GetAndReplace(const WritableKey& key, const Writable& valIn, Readable& valOut)
{
Cache2ValueRequest<RequestType::CACHE_GET_AND_REPLACE> req(id, binary, key, valIn);
-
- checkTransactional(req);
-
CacheValueResponse rsp(valOut);
- SyncCacheKeyMessage(key, req, rsp);
+ TransactionalSyncCacheKeyMessage(key, req, rsp);
}
bool CacheClientImpl::PutIfAbsent(const WritableKey& key, const Writable& val)
{
Cache2ValueRequest<RequestType::CACHE_PUT_IF_ABSENT> req(id, binary, key, val);
-
- checkTransactional(req);
-
BoolResponse rsp;
- SyncCacheKeyMessage(key, req, rsp);
+ TransactionalSyncCacheKeyMessage(key, req, rsp);
return rsp.GetValue();
}
@@ -363,12 +323,9 @@ namespace ignite
void CacheClientImpl::GetAndPutIfAbsent(const WritableKey& key, const Writable& valIn, Readable& valOut)
{
Cache2ValueRequest<RequestType::CACHE_GET_AND_PUT_IF_ABSENT> req(id, binary, key, valIn);
-
- checkTransactional(req);
-
CacheValueResponse rsp(valOut);
- SyncCacheKeyMessage(key, req, rsp);
+ TransactionalSyncCacheKeyMessage(key, req, rsp);
}
query::SP_QueryFieldsCursorImpl CacheClientImpl::Query(
diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h
index d74ad29..4efefd7 100644
--- a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h
+++ b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h
@@ -310,9 +310,6 @@ namespace ignite
template<typename ReqT, typename RspT>
void SyncCacheKeyMessage(const WritableKey& key, const ReqT& req, RspT& rsp);
- template<typename ReqT>
- void checkTransactional(ReqT& req);
-
/**
* Synchronously send message and receive response.
*
@@ -324,6 +321,40 @@ namespace ignite
template<typename ReqT, typename RspT>
SP_DataChannel SyncMessage(const ReqT& req, RspT& rsp);
+ /**
+ * Synchronously send request message and receive response taking in account that it can be
+ * transactional.
+ *
+ * @param key Key.
+ * @param req Request message.
+ * @param rsp Response message.
+ * @throw IgniteError on error.
+ */
+ template<typename ReqT, typename RspT>
+ void TransactionalSyncCacheKeyMessage(const WritableKey& key, ReqT& req, RspT& rsp);
+
+ /**
+ * Synchronously send message and receive response taking in account that it can be transactional.
+ *
+ * @param req Request message.
+ * @param rsp Response message.
+ * @return Channel that was used for request.
+ * @throw IgniteError on error.
+ */
+ template<typename ReqT, typename RspT>
+ void TransactionalSyncMessage(ReqT& req, RspT& rsp);
+
+ /***
+ * Check whether request is transactional and process it if it is.
+ * @tparam ReqT Request type.
+ * @tparam RspT Response type.
+ * @param req Request.
+ * @param rsp Response.
+ * @return @c true if processed and false otherwise.
+ */
+ template<typename ReqT, typename RspT>
+ bool TryProcessTransactional(ReqT& req, RspT& rsp);
+
/** Data router. */
SP_DataRouter router;
diff --git a/modules/platforms/cpp/thin-client/src/impl/data_router.h b/modules/platforms/cpp/thin-client/src/impl/data_router.h
index baa74ac..701f710 100644
--- a/modules/platforms/cpp/thin-client/src/impl/data_router.h
+++ b/modules/platforms/cpp/thin-client/src/impl/data_router.h
@@ -208,6 +208,16 @@ namespace ignite
*/
affinity::SP_AffinityAssignment GetAffinityAssignment(int32_t cacheId) const;
+ /**
+ * Get IO timeout.
+ *
+ * @return IO timeout.
+ */
+ int32_t GetIoTimeout()
+ {
+ return ioTimeout;
+ }
+
private:
IGNITE_NO_COPY_ASSIGNMENT(DataRouter);
diff --git a/modules/platforms/cpp/thin-client/src/impl/message.h b/modules/platforms/cpp/thin-client/src/impl/message.h
index 80e5437..a7220d8 100644
--- a/modules/platforms/cpp/thin-client/src/impl/message.h
+++ b/modules/platforms/cpp/thin-client/src/impl/message.h
@@ -363,7 +363,8 @@ namespace ignite
CacheRequest(int32_t cacheId, bool binary) :
cacheId(cacheId),
binary(binary),
- actTx(false)
+ actTx(false),
+ txId(0)
{
// No-op.
}
@@ -536,15 +537,6 @@ namespace ignite
}
/**
- * Sets transaction active flag and appropriate txId.
- * @param active Transaction activity flag.
- * @param id Transaction id.
- */
- void activeTx(bool active, int32_t id) {
- CacheRequest<OpCode>::activeTx(active, id);
- }
-
- /**
* Write request using provided writer.
* @param writer Writer.
* @param ver Version.
@@ -691,11 +683,11 @@ namespace ignite
* Constructor.
*
* @param id Transaction id.
- * @param comm Need to commit flag.
+ * @param commit Need to commit flag.
*/
- TxEndRequest(int32_t id, bool comm) :
+ TxEndRequest(int32_t id, bool commit) :
txId(id),
- commited(comm)
+ commited(commit)
{
// No-op.
}
diff --git a/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.cpp
similarity index 64%
copy from modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.cpp
copy to modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.cpp
index d785932..6fd9115 100644
--- a/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.cpp
@@ -16,8 +16,8 @@
*/
#include "impl/message.h"
-#include "impl/transactions/transactions_impl.h"
#include "impl/transactions/transaction_impl.h"
+#include "impl/transactions/transactions_impl.h"
#include "impl/response_status.h"
using namespace ignite::common::concurrent;
@@ -32,35 +32,18 @@ namespace ignite
{
namespace transactions
{
- TransactionsImpl::TransactionsImpl(const SP_DataRouter& router) :
- router(router)
- {
- // No-op.
- }
-
template<typename ReqT, typename RspT>
- void TransactionsImpl::SendTxMessage(const ReqT& req, RspT& rsp)
+ void TransactionImpl::SendTxMessage(const ReqT& req, RspT& rsp)
{
- router.Get()->SyncMessage(req, rsp);
+ channel.Get()->SyncMessage(req, rsp, static_cast<int32_t>(timeout / 1000) + ioTimeout);
if (rsp.GetStatus() != ResponseStatus::SUCCESS)
throw IgniteError(IgniteError::IGNITE_ERR_TX, rsp.GetError().c_str());
}
- SharedPointer<TransactionImpl> TransactionsImpl::TxStart(
- TransactionConcurrency::Type concurrency,
- TransactionIsolation::Type isolation,
- int64_t timeout,
- int32_t txSize,
- SharedPointer<common::FixedSizeArray<char> > label)
- {
- SP_TransactionImpl tx = TransactionImpl::Create(*this, concurrency, isolation, timeout, txSize, label);
-
- return tx;
- }
-
SP_TransactionImpl TransactionImpl::Create(
TransactionsImpl& txs,
+ SP_DataRouter& router,
TransactionConcurrency::Type concurrency,
TransactionIsolation::Type isolation,
int64_t timeout,
@@ -72,19 +55,21 @@ namespace ignite
TransactionImpl* ptr = tx.Get();
if (ptr && !ptr->IsClosed())
- {
throw IgniteError(IgniteError::IGNITE_ERR_TX_THIS_THREAD, TX_ALREADY_STARTED);
- }
TxStartRequest req(concurrency, isolation, timeout, label);
Int32Response rsp;
- txs.SendTxMessage(req, rsp);
+ SP_DataChannel channel = router.Get()->SyncMessage(req, rsp);
+
+ if (rsp.GetStatus() != ResponseStatus::SUCCESS)
+ throw IgniteError(IgniteError::IGNITE_ERR_TX, rsp.GetError().c_str());
int32_t curTxId = rsp.GetValue();
- tx = SP_TransactionImpl(new TransactionImpl(txs, curTxId, concurrency, isolation, timeout, txSize));
+ tx = SP_TransactionImpl(new TransactionImpl(txs, channel, curTxId, concurrency,
+ isolation, timeout, router.Get()->GetIoTimeout(), txSize));
txs.SetCurrent(tx);
@@ -96,74 +81,29 @@ namespace ignite
return closed;
}
- SP_TransactionImpl TransactionsImpl::GetCurrent()
- {
- SP_TransactionImpl tx = threadTx.Get();
-
- TransactionImpl* ptr = tx.Get();
-
- if (ptr && ptr->IsClosed())
- {
- threadTx.Remove();
-
- tx = SP_TransactionImpl();
- }
-
- return tx;
- }
-
- void TransactionsImpl::SetCurrent(const SP_TransactionImpl& impl)
- {
- threadTx.Set(impl);
- }
-
- void TransactionsImpl::ResetCurrent()
+ void TransactionImpl::Commit()
{
- threadTx.Remove();
- }
+ ThreadCheck();
- int32_t TransactionsImpl::TxCommit(int32_t txId)
- {
TxEndRequest req(txId, true);
Response rsp;
SendTxMessage(req, rsp);
- return rsp.GetStatus();
+ ThreadEnd();
}
- int32_t TransactionsImpl::TxRollback(int32_t txId)
+ void TransactionImpl::Rollback()
{
+ ThreadCheck();
+
TxEndRequest req(txId, false);
Response rsp;
SendTxMessage(req, rsp);
- return rsp.GetStatus();
- }
-
- int32_t TransactionsImpl::TxClose(int32_t txId)
- {
- return TxRollback(txId);
- }
-
- void TransactionImpl::Commit()
- {
- ThreadCheck();
-
- txs.TxCommit(txId);
-
- ThreadEnd();
- }
-
- void TransactionImpl::Rollback()
- {
- ThreadCheck();
-
- txs.TxRollback(txId);
-
ThreadEnd();
}
@@ -176,7 +116,7 @@ namespace ignite
return;
}
- txs.TxClose(txId);
+ Rollback();
ThreadEnd();
}
diff --git a/modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.h b/modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.h
index b6f9aa1..e395b3b 100644
--- a/modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.h
+++ b/modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.h
@@ -18,10 +18,10 @@
#ifndef _IGNITE_IMPL_THIN_TRANSACTION_IMPL
#define _IGNITE_IMPL_THIN_TRANSACTION_IMPL
-#include "impl/data_router.h"
#include <ignite/common/fixed_size_array.h>
-#include "ignite/thin/transactions/transaction_consts.h"
-#include "impl/transactions/transactions_impl.h"
+#include <ignite/thin/transactions/transaction_consts.h>
+
+#include "impl/data_router.h"
namespace ignite
{
@@ -51,24 +51,30 @@ namespace ignite
* Constructor.
*
* @param txImpl Transactions implementation.
- * @param txid Transaction Id.
+ * @param channel Channel linked to transaction.
+ * @param txId Transaction Id.
* @param concurrency Transaction concurrency.
* @param isolation Transaction isolation.
* @param timeout Transaction timeout.
+ * @param ioTimeout IO timeout for channel.
* @param size Number of entries participating in transaction (may be approximate).
*/
TransactionImpl(
TransactionsImpl& txImpl,
- int32_t txid,
+ SP_DataChannel channel,
+ int32_t txId,
ignite::thin::transactions::TransactionConcurrency::Type concurrency,
ignite::thin::transactions::TransactionIsolation::Type isolation,
int64_t timeout,
+ int32_t ioTimeout,
int32_t size) :
+ channel(channel),
txs(txImpl),
- txId(txid),
+ txId(txId),
concurrency(concurrency),
isolation(isolation),
timeout(timeout),
+ ioTimeout(ioTimeout),
txSize(size),
closed(false)
{
@@ -123,6 +129,7 @@ namespace ignite
* Starts transaction.
*
* @param txs Transactions implementation.
+ * @param router Router to use to start transaction.
* @param concurrency Transaction concurrency.
* @param isolation Transaction isolation.
* @param timeout Transaction timeout.
@@ -131,19 +138,43 @@ namespace ignite
*/
static SP_TransactionImpl Create(
TransactionsImpl& txs,
+ SP_DataRouter& router,
ignite::thin::transactions::TransactionConcurrency::Type concurrency,
ignite::thin::transactions::TransactionIsolation::Type isolation,
int64_t timeout,
int32_t txSize,
ignite::common::concurrent::SharedPointer<common::FixedSizeArray<char> > label);
- protected:
+
+ /**
+ * Get channel for the transaction.
+ *
+ * @return Channel.
+ */
+ SP_DataChannel GetChannel()
+ {
+ return channel;
+ }
+
+ private:
/** Checks current thread state. */
void ThreadCheck();
/** Completes tc and clear state from storage. */
void ThreadEnd();
- private:
+ /**
+ * Synchronously send message and receive response.
+ *
+ * @param req Request message.
+ * @param rsp Response message.
+ * @throw IgniteError on error.
+ */
+ template<typename ReqT, typename RspT>
+ void SendTxMessage(const ReqT& req, RspT& rsp);
+
+ /** Data channel to use. */
+ SP_DataChannel channel;
+
/** Transactions implementation. */
TransactionsImpl& txs;
@@ -159,6 +190,9 @@ namespace ignite
/** Timeout in milliseconds. */
int64_t timeout;
+ /** Channel io timeout. */
+ int32_t ioTimeout;
+
/** Transaction size. */
int32_t txSize;
diff --git a/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.cpp
index d785932..6227add 100644
--- a/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.cpp
@@ -15,10 +15,7 @@
* limitations under the License.
*/
-#include "impl/message.h"
#include "impl/transactions/transactions_impl.h"
-#include "impl/transactions/transaction_impl.h"
-#include "impl/response_status.h"
using namespace ignite::common::concurrent;
using namespace ignite::impl::thin;
@@ -38,15 +35,6 @@ namespace ignite
// No-op.
}
- template<typename ReqT, typename RspT>
- void TransactionsImpl::SendTxMessage(const ReqT& req, RspT& rsp)
- {
- router.Get()->SyncMessage(req, rsp);
-
- if (rsp.GetStatus() != ResponseStatus::SUCCESS)
- throw IgniteError(IgniteError::IGNITE_ERR_TX, rsp.GetError().c_str());
- }
-
SharedPointer<TransactionImpl> TransactionsImpl::TxStart(
TransactionConcurrency::Type concurrency,
TransactionIsolation::Type isolation,
@@ -54,48 +42,11 @@ namespace ignite
int32_t txSize,
SharedPointer<common::FixedSizeArray<char> > label)
{
- SP_TransactionImpl tx = TransactionImpl::Create(*this, concurrency, isolation, timeout, txSize, label);
+ SP_TransactionImpl tx = TransactionImpl::Create(*this, router, concurrency, isolation, timeout, txSize, label);
return tx;
}
- SP_TransactionImpl TransactionImpl::Create(
- TransactionsImpl& txs,
- TransactionConcurrency::Type concurrency,
- TransactionIsolation::Type isolation,
- int64_t timeout,
- int32_t txSize,
- SharedPointer<common::FixedSizeArray<char> > label)
- {
- SP_TransactionImpl tx = txs.GetCurrent();
-
- TransactionImpl* ptr = tx.Get();
-
- if (ptr && !ptr->IsClosed())
- {
- throw IgniteError(IgniteError::IGNITE_ERR_TX_THIS_THREAD, TX_ALREADY_STARTED);
- }
-
- TxStartRequest req(concurrency, isolation, timeout, label);
-
- Int32Response rsp;
-
- txs.SendTxMessage(req, rsp);
-
- int32_t curTxId = rsp.GetValue();
-
- tx = SP_TransactionImpl(new TransactionImpl(txs, curTxId, concurrency, isolation, timeout, txSize));
-
- txs.SetCurrent(tx);
-
- return tx;
- }
-
- bool TransactionImpl::IsClosed() const
- {
- return closed;
- }
-
SP_TransactionImpl TransactionsImpl::GetCurrent()
{
SP_TransactionImpl tx = threadTx.Get();
@@ -121,90 +72,6 @@ namespace ignite
{
threadTx.Remove();
}
-
- int32_t TransactionsImpl::TxCommit(int32_t txId)
- {
- TxEndRequest req(txId, true);
-
- Response rsp;
-
- SendTxMessage(req, rsp);
-
- return rsp.GetStatus();
- }
-
- int32_t TransactionsImpl::TxRollback(int32_t txId)
- {
- TxEndRequest req(txId, false);
-
- Response rsp;
-
- SendTxMessage(req, rsp);
-
- return rsp.GetStatus();
- }
-
- int32_t TransactionsImpl::TxClose(int32_t txId)
- {
- return TxRollback(txId);
- }
-
- void TransactionImpl::Commit()
- {
- ThreadCheck();
-
- txs.TxCommit(txId);
-
- ThreadEnd();
- }
-
- void TransactionImpl::Rollback()
- {
- ThreadCheck();
-
- txs.TxRollback(txId);
-
- ThreadEnd();
- }
-
- void TransactionImpl::Close()
- {
- ThreadCheck();
-
- if (IsClosed())
- {
- return;
- }
-
- txs.TxClose(txId);
-
- ThreadEnd();
- }
-
- void TransactionImpl::SetClosed()
- {
- closed = true;
- }
-
- void TransactionImpl::ThreadEnd()
- {
- this->SetClosed();
-
- txs.ResetCurrent();
- }
-
- void TransactionImpl::ThreadCheck()
- {
- SP_TransactionImpl tx = txs.GetCurrent();
-
- TransactionImpl* ptr = tx.Get();
-
- if (!ptr)
- throw IgniteError(IgniteError::IGNITE_ERR_TX_THIS_THREAD, TX_ALREADY_CLOSED);
-
- if (ptr->TxId() != this->TxId())
- throw IgniteError(IgniteError::IGNITE_ERR_TX_THIS_THREAD, TX_DIFFERENT_THREAD);
- }
}
}
}
diff --git a/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.h b/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.h
index 278545d..a0879fb 100644
--- a/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.h
+++ b/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.h
@@ -18,9 +18,10 @@
#ifndef _IGNITE_IMPL_THIN_TRANSACTIONS_IMPL
#define _IGNITE_IMPL_THIN_TRANSACTIONS_IMPL
-#include "impl/data_router.h"
#include <ignite/common/fixed_size_array.h>
-#include "ignite/thin/transactions/transaction_consts.h"
+#include <ignite/thin/transactions/transaction_consts.h>
+
+#include "impl/data_router.h"
#include "impl/transactions/transaction_impl.h"
namespace ignite
@@ -31,11 +32,6 @@ namespace ignite
{
namespace transactions
{
- class TransactionsImpl;
-
- typedef ignite::common::concurrent::SharedPointer<TransactionImpl> SP_TransactionImpl;
- typedef ignite::common::concurrent::SharedPointer<TransactionsImpl> SP_TransactionsImpl;
-
/**
* Thin client transaction.
*/
@@ -72,32 +68,6 @@ namespace ignite
ignite::common::concurrent::SharedPointer<common::FixedSizeArray<char> > label);
/**
- * Commit Transaction.
- *
- * @param id Transaction ID.
- * @return Resulting state.
- */
- int32_t TxCommit(int32_t id);
-
- /**
- * Rollback Transaction.
- *
- * @param id Transaction ID.
- * @return Resulting state.
- */
- int32_t TxRollback(int32_t id);
-
-
- /**
- * Close the transaction.
- *
- * This method should only be used on the valid instance.
- *
- * @param id Transaction ID.
- */
- int32_t TxClose(int32_t id);
-
- /**
* Get active transaction for the current thread.
*
* @return Active transaction implementation for current thread
@@ -118,16 +88,6 @@ namespace ignite
*/
void ResetCurrent();
- /**
- * Synchronously send message and receive response.
- *
- * @param req Request message.
- * @param rsp Response message.
- * @throw IgniteError on error.
- */
- template<typename ReqT, typename RspT>
- void SendTxMessage(const ReqT& req, RspT& rsp);
-
private:
/** Data router. */
SP_DataRouter router;
@@ -137,6 +97,8 @@ namespace ignite
IGNITE_NO_COPY_ASSIGNMENT(TransactionsImpl);
};
+
+ typedef ignite::common::concurrent::SharedPointer<TransactionsImpl> SP_TransactionsImpl;
}
}
}