You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by is...@apache.org on 2021/03/01 18:04:47 UTC

[ignite] branch master updated: IGNITE-14204 Fix C++ thin transactions

This is an automated email from the ASF dual-hosted git repository.

isapego pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 0675e2a  IGNITE-14204 Fix C++ thin transactions
0675e2a is described below

commit 0675e2a7e800730c9c8230332b82809754ddae5a
Author: Igor Sapego <is...@apache.org>
AuthorDate: Mon Mar 1 21:03:54 2021 +0300

    IGNITE-14204 Fix C++ thin transactions
    
    This closes #8836
---
 .../platforms/cpp/thin-client-test/src/tx_test.cpp |  85 ++++++++----
 modules/platforms/cpp/thin-client/CMakeLists.txt   |   1 +
 .../cpp/thin-client/project/vs/thin-client.vcxproj |   1 +
 .../project/vs/thin-client.vcxproj.filters         |  24 ++++
 .../src/impl/cache/cache_client_impl.cpp           | 145 ++++++++-------------
 .../thin-client/src/impl/cache/cache_client_impl.h |  37 +++++-
 .../cpp/thin-client/src/impl/data_router.h         |  10 ++
 .../platforms/cpp/thin-client/src/impl/message.h   |  18 +--
 ...{transactions_impl.cpp => transaction_impl.cpp} |  94 +++----------
 .../src/impl/transactions/transaction_impl.h       |  50 +++++--
 .../src/impl/transactions/transactions_impl.cpp    | 135 +------------------
 .../src/impl/transactions/transactions_impl.h      |  48 +------
 12 files changed, 252 insertions(+), 396 deletions(-)

diff --git a/modules/platforms/cpp/thin-client-test/src/tx_test.cpp b/modules/platforms/cpp/thin-client-test/src/tx_test.cpp
index 19757a2..04422ad 100644
--- a/modules/platforms/cpp/thin-client-test/src/tx_test.cpp
+++ b/modules/platforms/cpp/thin-client-test/src/tx_test.cpp
@@ -39,7 +39,8 @@ class IgniteTxTestSuiteFixture
 public:
     IgniteTxTestSuiteFixture()
     {
-        serverNode = ignite_test::StartCrossPlatformServerNode("cache.xml", "ServerNode");
+        node1 = ignite_test::StartCrossPlatformServerNode("cache.xml", "node1");
+        node2 = ignite_test::StartCrossPlatformServerNode("cache.xml", "node2");
     }
 
     ~IgniteTxTestSuiteFixture()
@@ -47,9 +48,24 @@ public:
         ignite::Ignition::StopAll(false);
     }
 
+    /**
+     * Start client.
+     */
+    static IgniteClient StartClient()
+    {
+        IgniteClientConfiguration cfg;
+
+        cfg.SetEndPoints("127.0.0.1:11110,127.0.0.1:11111");
+
+        return IgniteClient::Start(cfg);
+    }
+
 private:
-    /** Server node. */
-    ignite::Ignite serverNode;
+    /** Server node #1. */
+    ignite::Ignite node1;
+
+    /** Server node #2. */
+    ignite::Ignite node2;
 };
 
 BOOST_FIXTURE_TEST_SUITE(IgniteTxTestSuite, IgniteTxTestSuiteFixture)
@@ -75,11 +91,7 @@ bool checkTxTimeoutMessage(const ignite::IgniteError& ex)
 
 BOOST_AUTO_TEST_CASE(TestCacheOpsWithTx)
 {
-    IgniteClientConfiguration cfg;
-
-    cfg.SetEndPoints("127.0.0.1:11110");
-
-    IgniteClient client = IgniteClient::Start(cfg);
+    IgniteClient client = StartClient();
 
     cache::CacheClient<int, int> cache =
         client.GetCache<int, int>("partitioned");
@@ -190,7 +202,7 @@ BOOST_AUTO_TEST_CASE(TestCacheOpsWithTx)
 
     tx.Rollback();
 
-    BOOST_CHECK_EQUAL(cache.GetSize(cache::CachePeekMode::ALL), 1);
+    BOOST_CHECK_EQUAL(cache.GetSize(cache::CachePeekMode::PRIMARY), 1);
 
     //---
 
@@ -241,11 +253,7 @@ BOOST_AUTO_TEST_CASE(TestCacheOpsWithTx)
 
 void startAnotherClientAndTx(SharedPointer<SingleLatch>& l)
 {
-    IgniteClientConfiguration cfg;
-
-    cfg.SetEndPoints("127.0.0.1:11110");
-
-    IgniteClient client = IgniteClient::Start(cfg);
+    IgniteClient client = IgniteTxTestSuiteFixture::StartClient();
 
     cache::CacheClient<int, int> cache =
         client.GetCache<int, int>("partitioned");
@@ -263,11 +271,7 @@ void startAnotherClientAndTx(SharedPointer<SingleLatch>& l)
 
 BOOST_AUTO_TEST_CASE(TestTxOps)
 {
-    IgniteClientConfiguration cfg;
-
-    cfg.SetEndPoints("127.0.0.1:11110");
-
-    IgniteClient client = IgniteClient::Start(cfg);
+    IgniteClient client = StartClient();
 
     cache::CacheClient<int, int> cache =
         client.GetCache<int, int>("partitioned");
@@ -347,11 +351,7 @@ bool checkTxLabel1Message(const ignite::IgniteError& ex)
 
 BOOST_AUTO_TEST_CASE(TestTxWithLabel)
 {
-    IgniteClientConfiguration cfg;
-
-    cfg.SetEndPoints("127.0.0.1:11110");
-
-    IgniteClient client = IgniteClient::Start(cfg);
+    IgniteClient client = StartClient();
 
     cache::CacheClient<int, int> cache =
         client.GetCache<int, int>("partitioned");
@@ -397,4 +397,41 @@ BOOST_AUTO_TEST_CASE(TestTxWithLabel)
     tx.Close();
 }
 
+BOOST_AUTO_TEST_CASE(ManyTransactions)
+{
+    IgniteClient client = StartClient();
+
+    cache::CacheClient<int, int> cache =
+            client.GetCache<int, int>("partitioned");
+
+    transactions::ClientTransactions transactions = client.ClientTransactions();
+    const int32_t key = 42;
+
+    for (int32_t val = 0; val < 100; ++val) {
+        transactions::ClientTransaction tx = transactions.TxStart();
+
+        cache.Put(key, val);
+
+        tx.Commit();
+
+        BOOST_CHECK_EQUAL(val, cache.Get(key));
+    }
+
+    const int32_t expected = -42;
+
+    cache.Put(key, expected);
+
+    BOOST_CHECK_EQUAL(expected, cache.Get(key));
+
+    for (int32_t val = 0; val < 100; ++val) {
+        transactions::ClientTransaction tx = transactions.TxStart();
+
+        cache.Put(key, val);
+
+        tx.Rollback();
+
+        BOOST_CHECK_EQUAL(expected, cache.Get(key));
+    }
+}
+
 BOOST_AUTO_TEST_SUITE_END()
diff --git a/modules/platforms/cpp/thin-client/CMakeLists.txt b/modules/platforms/cpp/thin-client/CMakeLists.txt
index 059b012..bc1fbeb 100644
--- a/modules/platforms/cpp/thin-client/CMakeLists.txt
+++ b/modules/platforms/cpp/thin-client/CMakeLists.txt
@@ -34,6 +34,7 @@ set(SOURCES src/impl/data_channel.cpp
         src/impl/message.cpp
         src/impl/cache/cache_client_proxy.cpp
         src/impl/cache/cache_client_impl.cpp
+        src/impl/transactions/transaction_impl.cpp
         src/impl/transactions/transactions_impl.cpp
         src/impl/transactions/transactions_proxy.cpp
         src/ignite_client.cpp
diff --git a/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj b/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj
index 4d60182..d5cdc6f 100644
--- a/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj
+++ b/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj
@@ -169,6 +169,7 @@
     <ClCompile Include="..\..\src\impl\protocol_version.cpp" />
     <ClCompile Include="..\..\src\impl\remote_type_updater.cpp" />
     <ClCompile Include="..\..\src\impl\utility.cpp" />
+    <ClCompile Include="..\..\src\impl\transactions\transaction_impl.cpp" />
     <ClCompile Include="..\..\src\impl\transactions\transactions_impl.cpp" />
     <ClCompile Include="..\..\src\impl\transactions\transactions_proxy.cpp" />
   </ItemGroup>
diff --git a/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj.filters b/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj.filters
index 263325c..c9a092a 100644
--- a/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj.filters
+++ b/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj.filters
@@ -67,6 +67,15 @@
     <ClCompile Include="..\..\src\impl\affinity\affinity_manager.cpp">
       <Filter>Code\impl\affinity</Filter>
     </ClCompile>
+	<ClCompile Include="..\..\src\impl\transactions\transaction_impl.cpp">
+      <Filter>Code\impl\transactions</Filter>
+    </ClCompile>
+    <ClCompile Include="..\..\src\impl\transactions\transactions_impl.cpp">
+      <Filter>Code\impl\transactions</Filter>
+    </ClCompile>
+    <ClCompile Include="..\..\src\impl\transactions\transactions_proxy.cpp">
+      <Filter>Code\impl\transactions</Filter>
+    </ClCompile>
   </ItemGroup>
   <ItemGroup>
     <ClInclude Include="..\..\include\ignite\thin\ignite_client.h">
@@ -174,5 +183,20 @@
     <ClInclude Include="..\..\src\impl\affinity\partition_awareness_group.h">
       <Filter>Code\impl\affinity</Filter>
     </ClInclude>
+    <ClInclude Include="..\..\include\ignite\thin\transactions\transactions.h">
+      <Filter>Code\transactions</Filter>
+    </ClInclude>
+    <ClInclude Include="..\..\include\ignite\thin\transactions\transaction.h">
+      <Filter>Code\transactions</Filter>
+    </ClInclude>
+    <ClInclude Include="..\..\include\ignite\thin\transactions\transaction_consts.h">
+      <Filter>Code\transactions</Filter>
+    </ClInclude>
+    <ClInclude Include="..\..\include\ignite\thin\transactions\transactions_proxy.h">
+      <Filter>Code\transactions</Filter>
+    </ClInclude>
+    <ClInclude Include="..\..\src\impl\transactions\transactions_impl.h">
+      <Filter>Code\impl\transactions</Filter>
+    </ClInclude>
   </ItemGroup>
 </Project>
\ No newline at end of file
diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp
index 51b0c28..d66b9e6 100644
--- a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp
@@ -101,71 +101,79 @@ namespace ignite
                     return channel;
                 }
 
-                template<typename ReqT>
-                void CacheClientImpl::checkTransactional(ReqT& req)
+                template<typename ReqT, typename RspT>
+                void CacheClientImpl::TransactionalSyncCacheKeyMessage(const WritableKey &key, ReqT &req,
+                    RspT &rsp)
+                {
+                    if (!TryProcessTransactional(req, rsp))
+                        SyncCacheKeyMessage(key, req, rsp);
+                }
+
+                template<typename ReqT, typename RspT>
+                void CacheClientImpl::TransactionalSyncMessage(ReqT &req, RspT &rsp)
                 {
-                    SP_TransactionImpl activeTx = tx.Get()->GetCurrent();
+                    if (!TryProcessTransactional(req, rsp))
+                        SyncMessage(req, rsp);
+                }
 
-                    bool isUnderTx = activeTx.IsValid();
+                template<typename ReqT, typename RspT>
+                bool CacheClientImpl::TryProcessTransactional(ReqT& req, RspT& rsp)
+                {
+                    TransactionImpl* activeTx = tx.Get()->GetCurrent().Get();
 
-                    int32_t txId = isUnderTx ? activeTx.Get()->TxId() : 0;
+                    if (!activeTx)
+                        return false;
 
-                    req.activeTx(isUnderTx, txId);
+                    req.activeTx(true, activeTx->TxId());
+
+                    SP_DataChannel channel = activeTx->GetChannel();
+
+                    channel.Get()->SyncMessage(req, rsp, router.Get()->GetIoTimeout());
+
+                    if (rsp.GetStatus() != ResponseStatus::SUCCESS)
+                        throw IgniteError(IgniteError::IGNITE_ERR_CACHE, rsp.GetError().c_str());
+
+                    return true;
                 }
 
                 void CacheClientImpl::Put(const WritableKey& key, const Writable& value)
                 {
                     Cache2ValueRequest<RequestType::CACHE_PUT> req(id, binary, key, value);
-
-                    checkTransactional(req);
-
                     Response rsp;
 
-                    SyncCacheKeyMessage(key, req, rsp);
+                    TransactionalSyncCacheKeyMessage(key, req, rsp);
                 }
 
                 void CacheClientImpl::Get(const WritableKey& key, Readable& value)
                 {
                     CacheValueRequest<RequestType::CACHE_GET> req(id, binary, key);
-
-                    checkTransactional(req);
-
                     CacheValueResponse rsp(value);
 
-                    SyncCacheKeyMessage(key, req, rsp);
+                    TransactionalSyncCacheKeyMessage(key, req, rsp);
                 }
 
                 void CacheClientImpl::PutAll(const Writable & pairs)
                 {
                     CacheValueRequest<RequestType::CACHE_PUT_ALL> req(id, binary, pairs);
-
-                    checkTransactional(req);
-
                     Response rsp;
 
-                    SyncMessage(req, rsp);
+                    TransactionalSyncMessage(req, rsp);
                 }
 
                 void CacheClientImpl::GetAll(const Writable& keys, Readable& pairs)
                 {
                     CacheValueRequest<RequestType::CACHE_GET_ALL> req(id, binary, keys);
-
-                    checkTransactional(req);
-
                     CacheValueResponse rsp(pairs);
 
-                    SyncMessage(req, rsp);
+                    TransactionalSyncMessage(req, rsp);
                 }
 
                 bool CacheClientImpl::Replace(const WritableKey& key, const Writable& value)
                 {
                     Cache2ValueRequest<RequestType::CACHE_REPLACE> req(id, binary, key, value);
-
-                    checkTransactional(req);
-
                     BoolResponse rsp;
 
-                    SyncCacheKeyMessage(key, req, rsp);
+                    TransactionalSyncCacheKeyMessage(key, req, rsp);
 
                     return rsp.GetValue();
                 }
@@ -173,12 +181,9 @@ namespace ignite
                 bool CacheClientImpl::ContainsKey(const WritableKey& key)
                 {
                     CacheValueRequest<RequestType::CACHE_CONTAINS_KEY> req(id, binary, key);
-
-                    checkTransactional(req);
-
                     BoolResponse rsp;
 
-                    SyncCacheKeyMessage(key, req, rsp);
+                    TransactionalSyncCacheKeyMessage(key, req, rsp);
 
                     return rsp.GetValue();
                 }
@@ -186,12 +191,9 @@ namespace ignite
                 bool CacheClientImpl::ContainsKeys(const Writable& keys)
                 {
                     CacheValueRequest<RequestType::CACHE_CONTAINS_KEYS> req(id, binary, keys);
-
-                    checkTransactional(req);
-
                     BoolResponse rsp;
 
-                    SyncMessage(req, rsp);
+                    TransactionalSyncMessage(req, rsp);
 
                     return rsp.GetValue();
                 }
@@ -199,12 +201,9 @@ namespace ignite
                 int64_t CacheClientImpl::GetSize(int32_t peekModes)
                 {
                     CacheGetSizeRequest req(id, binary, peekModes);
-
-                    checkTransactional(req);
-
                     Int64Response rsp;
 
-                    SyncMessage(req, rsp);
+                    TransactionalSyncMessage(req, rsp);
 
                     return rsp.GetValue();
                 }
@@ -212,12 +211,9 @@ namespace ignite
                 bool CacheClientImpl::Remove(const WritableKey& key)
                 {
                     CacheValueRequest<RequestType::CACHE_REMOVE_KEY> req(id, binary, key);
-
-                    checkTransactional(req);
-
                     BoolResponse rsp;
 
-                    SyncCacheKeyMessage(key, req, rsp);
+                    TransactionalSyncCacheKeyMessage(key, req, rsp);
 
                     return rsp.GetValue();
                 }
@@ -225,12 +221,9 @@ namespace ignite
                 bool CacheClientImpl::Remove(const WritableKey& key, const Writable& val)
                 {
                     Cache2ValueRequest<RequestType::CACHE_REMOVE_IF_EQUALS> req(id, binary, key, val);
-
-                    checkTransactional(req);
-
                     BoolResponse rsp;
 
-                    SyncCacheKeyMessage(key, req, rsp);
+                    TransactionalSyncCacheKeyMessage(key, req, rsp);
 
                     return rsp.GetValue();
                 }
@@ -238,78 +231,57 @@ namespace ignite
                 void CacheClientImpl::RemoveAll(const Writable& keys)
                 {
                     CacheValueRequest<RequestType::CACHE_REMOVE_KEYS> req(id, binary, keys);
-
-                    checkTransactional(req);
-
                     Response rsp;
 
-                    SyncMessage(req, rsp);
+                    TransactionalSyncMessage(req, rsp);
                 }
 
                 void CacheClientImpl::RemoveAll()
                 {
                     CacheRequest<RequestType::CACHE_REMOVE_ALL> req(id, binary);
-
-                    checkTransactional(req);
-
                     Response rsp;
 
-                    SyncMessage(req, rsp);
+                    TransactionalSyncMessage(req, rsp);
                 }
 
                 void CacheClientImpl::Clear(const WritableKey& key)
                 {
                     CacheValueRequest<RequestType::CACHE_CLEAR_KEY> req(id, binary, key);
-
-                    checkTransactional(req);
-
                     Response rsp;
 
-                    SyncCacheKeyMessage(key, req, rsp);
+                    TransactionalSyncCacheKeyMessage(key, req, rsp);
                 }
 
                 void CacheClientImpl::Clear()
                 {
                     CacheRequest<RequestType::CACHE_CLEAR> req(id, binary);
-
-                    checkTransactional(req);
-
                     Response rsp;
 
-                    SyncMessage(req, rsp);
+                    TransactionalSyncMessage(req, rsp);
                 }
 
                 void CacheClientImpl::ClearAll(const Writable& keys)
                 {
                     CacheValueRequest<RequestType::CACHE_CLEAR_KEYS> req(id, binary, keys);
-
-                    checkTransactional(req);
-
                     Response rsp;
 
-                    SyncMessage(req, rsp);
+                    TransactionalSyncMessage(req, rsp);
                 }
 
                 void CacheClientImpl::LocalPeek(const WritableKey& key, Readable& value)
                 {
                     CacheValueRequest<RequestType::CACHE_LOCAL_PEEK> req(id, binary, key);
-
-                    checkTransactional(req);
-
                     CacheValueResponse rsp(value);
 
-                    SyncCacheKeyMessage(key, req, rsp);
+                    TransactionalSyncCacheKeyMessage(key, req, rsp);
                 }
 
                 bool CacheClientImpl::Replace(const WritableKey& key, const Writable& oldVal, const Writable& newVal)
                 {
                     Cache3ValueRequest<RequestType::CACHE_REPLACE_IF_EQUALS> req(id, binary, key, oldVal, newVal);
-
-                    checkTransactional(req);
-
                     BoolResponse rsp;
 
-                    SyncCacheKeyMessage(key, req, rsp);
+                    TransactionalSyncCacheKeyMessage(key, req, rsp);
 
                     return rsp.GetValue();
                 }
@@ -317,45 +289,33 @@ namespace ignite
                 void CacheClientImpl::GetAndPut(const WritableKey& key, const Writable& valIn, Readable& valOut)
                 {
                     Cache2ValueRequest<RequestType::CACHE_GET_AND_PUT> req(id, binary, key, valIn);
-
-                    checkTransactional(req);
-
                     CacheValueResponse rsp(valOut);
 
-                    SyncCacheKeyMessage(key, req, rsp);
+                    TransactionalSyncCacheKeyMessage(key, req, rsp);
                 }
 
                 void CacheClientImpl::GetAndRemove(const WritableKey& key, Readable& valOut)
                 {
                     CacheValueRequest<RequestType::CACHE_GET_AND_REMOVE> req(id, binary, key);
-
-                    checkTransactional(req);
-
                     CacheValueResponse rsp(valOut);
 
-                    SyncCacheKeyMessage(key, req, rsp);
+                    TransactionalSyncCacheKeyMessage(key, req, rsp);
                 }
 
                 void CacheClientImpl::GetAndReplace(const WritableKey& key, const Writable& valIn, Readable& valOut)
                 {
                     Cache2ValueRequest<RequestType::CACHE_GET_AND_REPLACE> req(id, binary, key, valIn);
-
-                    checkTransactional(req);
-
                     CacheValueResponse rsp(valOut);
 
-                    SyncCacheKeyMessage(key, req, rsp);
+                    TransactionalSyncCacheKeyMessage(key, req, rsp);
                 }
 
                 bool CacheClientImpl::PutIfAbsent(const WritableKey& key, const Writable& val)
                 {
                     Cache2ValueRequest<RequestType::CACHE_PUT_IF_ABSENT> req(id, binary, key, val);
-
-                    checkTransactional(req);
-
                     BoolResponse rsp;
 
-                    SyncCacheKeyMessage(key, req, rsp);
+                    TransactionalSyncCacheKeyMessage(key, req, rsp);
 
                     return rsp.GetValue();
                 }
@@ -363,12 +323,9 @@ namespace ignite
                 void CacheClientImpl::GetAndPutIfAbsent(const WritableKey& key, const Writable& valIn, Readable& valOut)
                 {
                     Cache2ValueRequest<RequestType::CACHE_GET_AND_PUT_IF_ABSENT> req(id, binary, key, valIn);
-
-                    checkTransactional(req);
-
                     CacheValueResponse rsp(valOut);
 
-                    SyncCacheKeyMessage(key, req, rsp);
+                    TransactionalSyncCacheKeyMessage(key, req, rsp);
                 }
 
                 query::SP_QueryFieldsCursorImpl CacheClientImpl::Query(
diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h
index d74ad29..4efefd7 100644
--- a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h
+++ b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h
@@ -310,9 +310,6 @@ namespace ignite
                     template<typename ReqT, typename RspT>
                     void SyncCacheKeyMessage(const WritableKey& key, const ReqT& req, RspT& rsp);
 
-                    template<typename ReqT>
-                    void checkTransactional(ReqT& req);
-
                     /**
                      * Synchronously send message and receive response.
                      *
@@ -324,6 +321,40 @@ namespace ignite
                     template<typename ReqT, typename RspT>
                     SP_DataChannel SyncMessage(const ReqT& req, RspT& rsp);
 
+                    /**
+                     * Synchronously send request message and receive response taking in account that it can be
+                     * transactional.
+                     *
+                     * @param key Key.
+                     * @param req Request message.
+                     * @param rsp Response message.
+                     * @throw IgniteError on error.
+                     */
+                    template<typename ReqT, typename RspT>
+                    void TransactionalSyncCacheKeyMessage(const WritableKey& key, ReqT& req, RspT& rsp);
+
+                    /**
+                     * Synchronously send message and receive response taking in account that it can be transactional.
+                     *
+                     * @param req Request message.
+                     * @param rsp Response message.
+                     * @return Channel that was used for request.
+                     * @throw IgniteError on error.
+                     */
+                    template<typename ReqT, typename RspT>
+                    void TransactionalSyncMessage(ReqT& req, RspT& rsp);
+
+                    /***
+                     * Check whether request is transactional and process it if it is.
+                     * @tparam ReqT Request type.
+                     * @tparam RspT Response type.
+                     * @param req Request.
+                     * @param rsp Response.
+                     * @return @c true if processed and false otherwise.
+                     */
+                    template<typename ReqT, typename RspT>
+                    bool TryProcessTransactional(ReqT& req, RspT& rsp);
+
                     /** Data router. */
                     SP_DataRouter router;
 
diff --git a/modules/platforms/cpp/thin-client/src/impl/data_router.h b/modules/platforms/cpp/thin-client/src/impl/data_router.h
index baa74ac..701f710 100644
--- a/modules/platforms/cpp/thin-client/src/impl/data_router.h
+++ b/modules/platforms/cpp/thin-client/src/impl/data_router.h
@@ -208,6 +208,16 @@ namespace ignite
                  */
                 affinity::SP_AffinityAssignment GetAffinityAssignment(int32_t cacheId) const;
 
+                /**
+                 * Get IO timeout.
+                 *
+                 * @return IO timeout.
+                 */
+                int32_t GetIoTimeout()
+                {
+                    return ioTimeout;
+                }
+
             private:
                 IGNITE_NO_COPY_ASSIGNMENT(DataRouter);
 
diff --git a/modules/platforms/cpp/thin-client/src/impl/message.h b/modules/platforms/cpp/thin-client/src/impl/message.h
index 80e5437..a7220d8 100644
--- a/modules/platforms/cpp/thin-client/src/impl/message.h
+++ b/modules/platforms/cpp/thin-client/src/impl/message.h
@@ -363,7 +363,8 @@ namespace ignite
                 CacheRequest(int32_t cacheId, bool binary) :
                     cacheId(cacheId),
                     binary(binary),
-                    actTx(false)
+                    actTx(false),
+                    txId(0)
                 {
                     // No-op.
                 }
@@ -536,15 +537,6 @@ namespace ignite
                 }
 
                 /**
-                 * Sets transaction active flag and appropriate txId.
-                 * @param active Transaction activity flag.
-                 * @param id Transaction id.
-                 */
-                void activeTx(bool active, int32_t id) {
-                    CacheRequest<OpCode>::activeTx(active, id);
-                }
-
-                /**
                  * Write request using provided writer.
                  * @param writer Writer.
                  * @param ver Version.
@@ -691,11 +683,11 @@ namespace ignite
                  * Constructor.
                  *
                  * @param id Transaction id.
-                 * @param comm Need to commit flag.
+                 * @param commit Need to commit flag.
                  */
-                TxEndRequest(int32_t id, bool comm) :
+                TxEndRequest(int32_t id, bool commit) :
                     txId(id),
-                    commited(comm)
+                    commited(commit)
                 {
                     // No-op.
                 }
diff --git a/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.cpp
similarity index 64%
copy from modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.cpp
copy to modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.cpp
index d785932..6fd9115 100644
--- a/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.cpp
@@ -16,8 +16,8 @@
  */
 
 #include "impl/message.h"
-#include "impl/transactions/transactions_impl.h"
 #include "impl/transactions/transaction_impl.h"
+#include "impl/transactions/transactions_impl.h"
 #include "impl/response_status.h"
 
 using namespace ignite::common::concurrent;
@@ -32,35 +32,18 @@ namespace ignite
         {
             namespace transactions
             {
-                TransactionsImpl::TransactionsImpl(const SP_DataRouter& router) :
-                    router(router)
-                {
-                    // No-op.
-                }
-
                 template<typename ReqT, typename RspT>
-                void TransactionsImpl::SendTxMessage(const ReqT& req, RspT& rsp)
+                void TransactionImpl::SendTxMessage(const ReqT& req, RspT& rsp)
                 {
-                    router.Get()->SyncMessage(req, rsp);
+                    channel.Get()->SyncMessage(req, rsp, static_cast<int32_t>(timeout / 1000) + ioTimeout);
 
                     if (rsp.GetStatus() != ResponseStatus::SUCCESS)
                         throw IgniteError(IgniteError::IGNITE_ERR_TX, rsp.GetError().c_str());
                 }
 
-                SharedPointer<TransactionImpl> TransactionsImpl::TxStart(
-                        TransactionConcurrency::Type concurrency,
-                        TransactionIsolation::Type isolation,
-                        int64_t timeout,
-                        int32_t txSize,
-                        SharedPointer<common::FixedSizeArray<char> > label)
-                {
-                    SP_TransactionImpl tx = TransactionImpl::Create(*this, concurrency, isolation, timeout, txSize, label);
-
-                    return tx;
-                }
-
                 SP_TransactionImpl TransactionImpl::Create(
                     TransactionsImpl& txs,
+                    SP_DataRouter& router,
                     TransactionConcurrency::Type concurrency,
                     TransactionIsolation::Type isolation,
                     int64_t timeout,
@@ -72,19 +55,21 @@ namespace ignite
                     TransactionImpl* ptr = tx.Get();
 
                     if (ptr && !ptr->IsClosed())
-                    {
                         throw IgniteError(IgniteError::IGNITE_ERR_TX_THIS_THREAD, TX_ALREADY_STARTED);
-                    }
 
                     TxStartRequest req(concurrency, isolation, timeout, label);
 
                     Int32Response rsp;
 
-                    txs.SendTxMessage(req, rsp);
+                    SP_DataChannel channel = router.Get()->SyncMessage(req, rsp);
+
+                    if (rsp.GetStatus() != ResponseStatus::SUCCESS)
+                        throw IgniteError(IgniteError::IGNITE_ERR_TX, rsp.GetError().c_str());
 
                     int32_t curTxId = rsp.GetValue();
 
-                    tx = SP_TransactionImpl(new TransactionImpl(txs, curTxId, concurrency, isolation, timeout, txSize));
+                    tx = SP_TransactionImpl(new TransactionImpl(txs, channel, curTxId, concurrency,
+                        isolation, timeout, router.Get()->GetIoTimeout(), txSize));
 
                     txs.SetCurrent(tx);
 
@@ -96,74 +81,29 @@ namespace ignite
                     return closed;
                 }
 
-                SP_TransactionImpl TransactionsImpl::GetCurrent()
-                {
-                    SP_TransactionImpl tx = threadTx.Get();
-
-                    TransactionImpl* ptr = tx.Get();
-
-                    if (ptr && ptr->IsClosed())
-                    {
-                        threadTx.Remove();
-
-                        tx = SP_TransactionImpl();
-                    }
-
-                    return tx;
-                }
-
-                void TransactionsImpl::SetCurrent(const SP_TransactionImpl& impl)
-                {
-                    threadTx.Set(impl);
-                }
-
-                void TransactionsImpl::ResetCurrent()
+                void TransactionImpl::Commit()
                 {
-                    threadTx.Remove();
-                }
+                    ThreadCheck();
 
-                int32_t TransactionsImpl::TxCommit(int32_t txId)
-                {
                     TxEndRequest req(txId, true);
 
                     Response rsp;
 
                     SendTxMessage(req, rsp);
 
-                    return rsp.GetStatus();
+                    ThreadEnd();
                 }
 
-                int32_t TransactionsImpl::TxRollback(int32_t txId)
+                void TransactionImpl::Rollback()
                 {
+                    ThreadCheck();
+
                     TxEndRequest req(txId, false);
 
                     Response rsp;
 
                     SendTxMessage(req, rsp);
 
-                    return rsp.GetStatus();
-                }
-
-                int32_t TransactionsImpl::TxClose(int32_t txId)
-                {
-                    return TxRollback(txId);
-                }
-
-                void TransactionImpl::Commit()
-                {
-                    ThreadCheck();
-
-                    txs.TxCommit(txId);
-
-                    ThreadEnd();
-                }
-
-                void TransactionImpl::Rollback()
-                {
-                    ThreadCheck();
-
-                    txs.TxRollback(txId);
-
                     ThreadEnd();
                 }
 
@@ -176,7 +116,7 @@ namespace ignite
                         return;
                     }
 
-                    txs.TxClose(txId);
+                    Rollback();
 
                     ThreadEnd();
                 }
diff --git a/modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.h b/modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.h
index b6f9aa1..e395b3b 100644
--- a/modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.h
+++ b/modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.h
@@ -18,10 +18,10 @@
 #ifndef _IGNITE_IMPL_THIN_TRANSACTION_IMPL
 #define _IGNITE_IMPL_THIN_TRANSACTION_IMPL
 
-#include "impl/data_router.h"
 #include <ignite/common/fixed_size_array.h>
-#include "ignite/thin/transactions/transaction_consts.h"
-#include "impl/transactions/transactions_impl.h"
+#include <ignite/thin/transactions/transaction_consts.h>
+
+#include "impl/data_router.h"
 
 namespace ignite
 {
@@ -51,24 +51,30 @@ namespace ignite
                      * Constructor.
                      *
                      * @param txImpl Transactions implementation.
-                     * @param txid Transaction Id.
+                     * @param channel Channel linked to transaction.
+                     * @param txId Transaction Id.
                      * @param concurrency Transaction concurrency.
                      * @param isolation Transaction isolation.
                      * @param timeout Transaction timeout.
+                     * @param ioTimeout IO timeout for channel.
                      * @param size Number of entries participating in transaction (may be approximate).
                      */
                     TransactionImpl(
                             TransactionsImpl& txImpl,
-                            int32_t txid,
+                            SP_DataChannel channel,
+                            int32_t txId,
                             ignite::thin::transactions::TransactionConcurrency::Type concurrency,
                             ignite::thin::transactions::TransactionIsolation::Type isolation,
                             int64_t timeout,
+                            int32_t ioTimeout,
                             int32_t size) :
+                        channel(channel),
                         txs(txImpl),
-                        txId(txid),
+                        txId(txId),
                         concurrency(concurrency),
                         isolation(isolation),
                         timeout(timeout),
+                        ioTimeout(ioTimeout),
                         txSize(size),
                         closed(false)
                     {
@@ -123,6 +129,7 @@ namespace ignite
                      * Starts transaction.
                      *
                      * @param txs Transactions implementation.
+                     * @param router Router to use to start transaction.
                      * @param concurrency Transaction concurrency.
                      * @param isolation Transaction isolation.
                      * @param timeout Transaction timeout.
@@ -131,19 +138,43 @@ namespace ignite
                      */
                     static SP_TransactionImpl Create(
                             TransactionsImpl& txs,
+                            SP_DataRouter& router,
                             ignite::thin::transactions::TransactionConcurrency::Type concurrency,
                             ignite::thin::transactions::TransactionIsolation::Type isolation,
                             int64_t timeout,
                             int32_t txSize,
                             ignite::common::concurrent::SharedPointer<common::FixedSizeArray<char> > label);
-                protected:
+
+                    /**
+                     * Get channel for the transaction.
+                     *
+                     * @return Channel.
+                     */
+                    SP_DataChannel GetChannel()
+                    {
+                        return channel;
+                    }
+
+                private:
                     /** Checks current thread state. */
                     void ThreadCheck();
 
                     /** Completes tc and clear state from storage. */
                     void ThreadEnd();
 
-                private:
+                    /**
+                     * Synchronously send message and receive response.
+                     *
+                     * @param req Request message.
+                     * @param rsp Response message.
+                     * @throw IgniteError on error.
+                     */
+                    template<typename ReqT, typename RspT>
+                    void SendTxMessage(const ReqT& req, RspT& rsp);
+
+                    /** Data channel to use. */
+                    SP_DataChannel channel;
+
                     /** Transactions implementation. */
                     TransactionsImpl& txs;
 
@@ -159,6 +190,9 @@ namespace ignite
                     /** Timeout in milliseconds. */
                     int64_t timeout;
 
+                    /** Channel io timeout. */
+                    int32_t ioTimeout;
+
                     /** Transaction size. */
                     int32_t txSize;
 
diff --git a/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.cpp
index d785932..6227add 100644
--- a/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.cpp
@@ -15,10 +15,7 @@
  * limitations under the License.
  */
 
-#include "impl/message.h"
 #include "impl/transactions/transactions_impl.h"
-#include "impl/transactions/transaction_impl.h"
-#include "impl/response_status.h"
 
 using namespace ignite::common::concurrent;
 using namespace ignite::impl::thin;
@@ -38,15 +35,6 @@ namespace ignite
                     // No-op.
                 }
 
-                template<typename ReqT, typename RspT>
-                void TransactionsImpl::SendTxMessage(const ReqT& req, RspT& rsp)
-                {
-                    router.Get()->SyncMessage(req, rsp);
-
-                    if (rsp.GetStatus() != ResponseStatus::SUCCESS)
-                        throw IgniteError(IgniteError::IGNITE_ERR_TX, rsp.GetError().c_str());
-                }
-
                 SharedPointer<TransactionImpl> TransactionsImpl::TxStart(
                         TransactionConcurrency::Type concurrency,
                         TransactionIsolation::Type isolation,
@@ -54,48 +42,11 @@ namespace ignite
                         int32_t txSize,
                         SharedPointer<common::FixedSizeArray<char> > label)
                 {
-                    SP_TransactionImpl tx = TransactionImpl::Create(*this, concurrency, isolation, timeout, txSize, label);
+                    SP_TransactionImpl tx = TransactionImpl::Create(*this, router, concurrency, isolation, timeout, txSize, label);
 
                     return tx;
                 }
 
-                SP_TransactionImpl TransactionImpl::Create(
-                    TransactionsImpl& txs,
-                    TransactionConcurrency::Type concurrency,
-                    TransactionIsolation::Type isolation,
-                    int64_t timeout,
-                    int32_t txSize,
-                    SharedPointer<common::FixedSizeArray<char> > label)
-                {
-                    SP_TransactionImpl tx = txs.GetCurrent();
-
-                    TransactionImpl* ptr = tx.Get();
-
-                    if (ptr && !ptr->IsClosed())
-                    {
-                        throw IgniteError(IgniteError::IGNITE_ERR_TX_THIS_THREAD, TX_ALREADY_STARTED);
-                    }
-
-                    TxStartRequest req(concurrency, isolation, timeout, label);
-
-                    Int32Response rsp;
-
-                    txs.SendTxMessage(req, rsp);
-
-                    int32_t curTxId = rsp.GetValue();
-
-                    tx = SP_TransactionImpl(new TransactionImpl(txs, curTxId, concurrency, isolation, timeout, txSize));
-
-                    txs.SetCurrent(tx);
-
-                    return tx;
-                }
-
-                bool TransactionImpl::IsClosed() const
-                {
-                    return closed;
-                }
-
                 SP_TransactionImpl TransactionsImpl::GetCurrent()
                 {
                     SP_TransactionImpl tx = threadTx.Get();
@@ -121,90 +72,6 @@ namespace ignite
                 {
                     threadTx.Remove();
                 }
-
-                int32_t TransactionsImpl::TxCommit(int32_t txId)
-                {
-                    TxEndRequest req(txId, true);
-
-                    Response rsp;
-
-                    SendTxMessage(req, rsp);
-
-                    return rsp.GetStatus();
-                }
-
-                int32_t TransactionsImpl::TxRollback(int32_t txId)
-                {
-                    TxEndRequest req(txId, false);
-
-                    Response rsp;
-
-                    SendTxMessage(req, rsp);
-
-                    return rsp.GetStatus();
-                }
-
-                int32_t TransactionsImpl::TxClose(int32_t txId)
-                {
-                    return TxRollback(txId);
-                }
-
-                void TransactionImpl::Commit()
-                {
-                    ThreadCheck();
-
-                    txs.TxCommit(txId);
-
-                    ThreadEnd();
-                }
-
-                void TransactionImpl::Rollback()
-                {
-                    ThreadCheck();
-
-                    txs.TxRollback(txId);
-
-                    ThreadEnd();
-                }
-
-                void TransactionImpl::Close()
-                {
-                    ThreadCheck();
-
-                    if (IsClosed())
-                    {
-                        return;
-                    }
-
-                    txs.TxClose(txId);
-
-                    ThreadEnd();
-                }
-
-                void TransactionImpl::SetClosed()
-                {
-                    closed = true;
-                }
-
-                void TransactionImpl::ThreadEnd()
-                {
-                    this->SetClosed();
-
-                    txs.ResetCurrent();
-                }
-
-                void TransactionImpl::ThreadCheck()
-                {
-                    SP_TransactionImpl tx = txs.GetCurrent();
-
-                    TransactionImpl* ptr = tx.Get();
-
-                    if (!ptr)
-                        throw IgniteError(IgniteError::IGNITE_ERR_TX_THIS_THREAD, TX_ALREADY_CLOSED);
-
-                    if (ptr->TxId() != this->TxId())
-                        throw IgniteError(IgniteError::IGNITE_ERR_TX_THIS_THREAD, TX_DIFFERENT_THREAD);
-                }
             }
         }
     }
diff --git a/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.h b/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.h
index 278545d..a0879fb 100644
--- a/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.h
+++ b/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.h
@@ -18,9 +18,10 @@
 #ifndef _IGNITE_IMPL_THIN_TRANSACTIONS_IMPL
 #define _IGNITE_IMPL_THIN_TRANSACTIONS_IMPL
 
-#include "impl/data_router.h"
 #include <ignite/common/fixed_size_array.h>
-#include "ignite/thin/transactions/transaction_consts.h"
+#include <ignite/thin/transactions/transaction_consts.h>
+
+#include "impl/data_router.h"
 #include "impl/transactions/transaction_impl.h"
 
 namespace ignite
@@ -31,11 +32,6 @@ namespace ignite
         {
             namespace transactions
             {
-                class TransactionsImpl;
-
-                typedef ignite::common::concurrent::SharedPointer<TransactionImpl> SP_TransactionImpl;
-                typedef ignite::common::concurrent::SharedPointer<TransactionsImpl> SP_TransactionsImpl;
-
                 /**
                  * Thin client transaction.
                  */
@@ -72,32 +68,6 @@ namespace ignite
                             ignite::common::concurrent::SharedPointer<common::FixedSizeArray<char> > label);
 
                     /**
-                     * Commit Transaction.
-                     *
-                     * @param id Transaction ID.
-                     * @return Resulting state.
-                     */
-                    int32_t TxCommit(int32_t id);
-
-                    /**
-                     * Rollback Transaction.
-                     *
-                     * @param id Transaction ID.
-                     * @return Resulting state.
-                     */
-                    int32_t TxRollback(int32_t id);
-
-
-                    /**
-                     * Close the transaction.
-                     *
-                     * This method should only be used on the valid instance.
-                     *
-                     * @param id Transaction ID.
-                     */
-                    int32_t TxClose(int32_t id);
-
-                    /**
                      * Get active transaction for the current thread.
                      *
                      * @return Active transaction implementation for current thread
@@ -118,16 +88,6 @@ namespace ignite
                      */
                     void ResetCurrent();
 
-                    /**
-                     * Synchronously send message and receive response.
-                     *
-                     * @param req Request message.
-                     * @param rsp Response message.
-                     * @throw IgniteError on error.
-                     */
-                    template<typename ReqT, typename RspT>
-                    void SendTxMessage(const ReqT& req, RspT& rsp);
-
                 private:
                     /** Data router. */
                     SP_DataRouter router;
@@ -137,6 +97,8 @@ namespace ignite
 
                     IGNITE_NO_COPY_ASSIGNMENT(TransactionsImpl);
                 };
+
+                typedef ignite::common::concurrent::SharedPointer<TransactionsImpl> SP_TransactionsImpl;
             }
         }
     }