You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by is...@apache.org on 2022/01/14 10:12:37 UTC

[ignite] branch master updated: IGNITE-15766 C++ thin Continuous Queries (#9732)

This is an automated email from the ASF dual-hosted git repository.

isapego pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new fe23f1b  IGNITE-15766 C++ thin Continuous Queries (#9732)
fe23f1b is described below

commit fe23f1b0ca08a6741253f2d6ac10dfd488befb7b
Author: Igor Sapego <is...@apache.org>
AuthorDate: Fri Jan 14 12:48:46 2022 +0300

    IGNITE-15766 C++ thin Continuous Queries (#9732)
---
 modules/platforms/cpp/common/CMakeLists.txt        |   1 +
 .../cpp/common/include/ignite/common/concurrent.h  |   2 +-
 .../cpp/common/include/ignite/common/thread_pool.h | 214 +++++++++
 .../os/linux/include/ignite/common/concurrent_os.h |   9 +
 .../common/os/linux/src/common/concurrent_os.cpp   |   9 +
 .../os/win/include/ignite/common/concurrent_os.h   |   9 +
 .../cpp/common/os/win/src/common/concurrent_os.cpp |  11 +-
 .../cpp/common/src/common/thread_pool.cpp          | 216 +++++++++
 .../cache/query/continuous/continuous_query_impl.h |   6 +-
 .../platforms/cpp/thin-client-test/CMakeLists.txt  |   1 +
 .../config/cache-query-continuous-32.xml           |  48 ++
 .../config/cache-query-continuous-default.xml      |  98 ++++
 .../config/cache-query-continuous.xml              |  30 ++
 .../thin-client-test/src/continuous_query_test.cpp | 528 +++++++++++++++++++++
 modules/platforms/cpp/thin-client/CMakeLists.txt   |   1 +
 .../ignite/impl/thin/cache/cache_client_proxy.h    |  11 +
 .../continuous/continuous_query_client_holder.h    | 182 +++++++
 .../thin/cache/query/query_fields_cursor_impl.h    | 147 ------
 .../impl/thin/cache/query/query_fields_row_impl.h  | 197 --------
 .../include/ignite/thin/cache/cache_client.h       |  21 +-
 .../include/ignite/thin/cache/cache_entry.h        | 168 +++++++
 .../ignite/thin/cache/event/cache_entry_event.h    | 209 ++++++++
 .../thin/cache/event/cache_entry_event_listener.h  |  82 ++++
 .../query/continuous/continuous_query_client.h     | 232 +++++++++
 .../query/continuous/continuous_query_handle.h     |  78 +++
 .../src/impl/cache/cache_client_impl.cpp           |  66 ++-
 .../thin-client/src/impl/cache/cache_client_impl.h |  12 +
 .../src/impl/cache/cache_client_proxy.cpp          |  15 +-
 .../continuous/continuous_query_handle_impl.h      |  90 ++++
 .../continuous_query_notification_handler.cpp      |  63 +++
 .../continuous_query_notification_handler.h        |  96 ++++
 .../thin-client/src/impl/channel_state_handler.h   |   8 +
 .../src/impl/compute/compute_client_impl.cpp       |  11 +-
 .../cpp/thin-client/src/impl/data_channel.cpp      |  78 ++-
 .../cpp/thin-client/src/impl/data_channel.h        |  41 +-
 .../cpp/thin-client/src/impl/data_router.cpp       |  55 ++-
 .../cpp/thin-client/src/impl/data_router.h         |  29 ++
 .../thin-client/src/impl/ignite_client_impl.cpp    |   6 +-
 .../platforms/cpp/thin-client/src/impl/message.cpp |  87 ++--
 .../platforms/cpp/thin-client/src/impl/message.h   | 356 +++++++++++---
 .../thin-client/src/impl/notification_handler.h    | 190 +++++++-
 41 files changed, 3136 insertions(+), 577 deletions(-)

diff --git a/modules/platforms/cpp/common/CMakeLists.txt b/modules/platforms/cpp/common/CMakeLists.txt
index 5469dd6..6e8cad6 100644
--- a/modules/platforms/cpp/common/CMakeLists.txt
+++ b/modules/platforms/cpp/common/CMakeLists.txt
@@ -25,6 +25,7 @@ set(SOURCES src/common/big_integer.cpp
         src/common/bits.cpp
         src/common/concurrent.cpp
         src/common/decimal.cpp
+        src/common/thread_pool.cpp
         src/common/utils.cpp
         src/date.cpp
         src/ignite_error.cpp
diff --git a/modules/platforms/cpp/common/include/ignite/common/concurrent.h b/modules/platforms/cpp/common/include/ignite/common/concurrent.h
index 290efd7..54bf583 100644
--- a/modules/platforms/cpp/common/include/ignite/common/concurrent.h
+++ b/modules/platforms/cpp/common/include/ignite/common/concurrent.h
@@ -293,7 +293,7 @@ namespace ignite
                  *
                  * @return Raw pointer.
                  */
-                const T* Get() const
+                T* Get() const
                 {
                     return ptr;
                 }
diff --git a/modules/platforms/cpp/common/include/ignite/common/thread_pool.h b/modules/platforms/cpp/common/include/ignite/common/thread_pool.h
new file mode 100644
index 0000000..ef05213
--- /dev/null
+++ b/modules/platforms/cpp/common/include/ignite/common/thread_pool.h
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::common::ThreadPool class.
+ */
+
+#ifndef _IGNITE_COMMON_THREAD_POOL
+#define _IGNITE_COMMON_THREAD_POOL
+
+#include <deque>
+#include <vector>
+
+#include <ignite/ignite_error.h>
+
+#include <ignite/common/common.h>
+#include <ignite/common/concurrent.h>
+
+namespace ignite
+{
+    namespace common
+    {
+        /**
+         * Thread pool task.
+         */
+        class ThreadPoolTask
+        {
+        public:
+            /**
+             * Destructor.
+             */
+            virtual ~ThreadPoolTask()
+            {
+                // No-op.
+            }
+
+            /**
+             * Execute task.
+             */
+            virtual void Execute() = 0;
+
+            /**
+             * Called if error occurred during task processing.
+             *
+             * @param err Error.
+             */
+             virtual void OnError(const IgniteError& err) = 0;
+        };
+
+        /** Shared pointer to thread pool task. */
+        typedef concurrent::SharedPointer< ThreadPoolTask > SP_ThreadPoolTask;
+
+        /**
+         * Thread Pool.
+         */
+        class ThreadPool
+        {
+        public:
+            /**
+             * Constructor.
+             *
+             * @param threadsNum Number of threads. If set to 0 current number of processors is used.
+             */
+            IGNITE_IMPORT_EXPORT explicit ThreadPool(uint32_t threadsNum);
+
+            /**
+             * Destructor.
+             */
+            IGNITE_IMPORT_EXPORT virtual ~ThreadPool();
+
+            /**
+             * Start threads in pool.
+             */
+            IGNITE_IMPORT_EXPORT void Start();
+
+            /**
+             * Stop threads in pool.
+             *
+             * @warning Once stopped it can not be restarted.
+             */
+            IGNITE_IMPORT_EXPORT void Stop();
+
+            /**
+             * Dispatch task.
+             *
+             * @param task Task.
+             */
+            IGNITE_IMPORT_EXPORT void Dispatch(const SP_ThreadPoolTask& task);
+
+        private:
+            IGNITE_NO_COPY_ASSIGNMENT(ThreadPool);
+
+            /**
+             * Task queue.
+             */
+            class TaskQueue
+            {
+            public:
+                /**
+                 * Constructor.
+                 */
+                TaskQueue();
+
+                /**
+                 * Destructor.
+                 */
+                ~TaskQueue();
+
+                /**
+                 * Push a new task to the queue.
+                 *
+                 * @param task Task. Should not be null.
+                 */
+                void Push(const SP_ThreadPoolTask& task);
+
+                /**
+                 * Pop a task from the queue.
+                 *
+                 * @return New task or null when unblocked.
+                 */
+                SP_ThreadPoolTask Pop();
+
+                /**
+                 * Unblock queue. When unblocked queue will not block or return new tasks.
+                 */
+                void Unblock();
+
+            private:
+                /** If true queue will not block. */
+                volatile bool unblocked;
+
+                /** Tasks queue. */
+                std::deque< SP_ThreadPoolTask > tasks;
+
+                /** Critical section. */
+                concurrent::CriticalSection mutex;
+
+                /** Condition variable. */
+                concurrent::ConditionVariable waitPoint;
+            };
+
+            /**
+             * Worker thread.
+             */
+            class WorkerThread : public concurrent::Thread
+            {
+            public:
+                /**
+                 * Constructor.
+                 *
+                 * @param taskQueue Task queue.
+                 */
+                explicit WorkerThread(TaskQueue& taskQueue);
+
+                /**
+                 * Destructor.
+                 */
+                ~WorkerThread();
+
+            private:
+                /**
+                 * Run thread.
+                 */
+                virtual void Run();
+
+                /** Task queue. */
+                TaskQueue& taskQueue;
+            };
+
+            /**
+             * Handle an error that occurred during task execution.
+             *
+             * @param task Task.
+             * @param err Error.
+             */
+            static void HandleTaskError(ThreadPoolTask &task, const IgniteError &err);
+
+            /** Shared pointer to thread pool worker thread. */
+            typedef concurrent::SharedPointer< WorkerThread > SP_WorkerThread;
+
+            /** Started flag. */
+            bool started;
+
+            /** Stopped flag. */
+            bool stopped;
+
+            /** Task queue. */
+            TaskQueue queue;
+
+            /** Worker Threads. */
+            std::vector<SP_WorkerThread> threads;
+
+            /** Critical section. */
+            concurrent::CriticalSection mutex;
+        };
+    }
+}
+
+#endif //_IGNITE_COMMON_THREAD_POOL
diff --git a/modules/platforms/cpp/common/os/linux/include/ignite/common/concurrent_os.h b/modules/platforms/cpp/common/os/linux/include/ignite/common/concurrent_os.h
index 43ffb02..c8f7293 100644
--- a/modules/platforms/cpp/common/os/linux/include/ignite/common/concurrent_os.h
+++ b/modules/platforms/cpp/common/os/linux/include/ignite/common/concurrent_os.h
@@ -724,6 +724,8 @@ namespace ignite
                 virtual void Join();
 
             private:
+                IGNITE_NO_COPY_ASSIGNMENT(Thread);
+
                 /**
                  * Routine.
                  * @param lpParam Param.
@@ -734,6 +736,13 @@ namespace ignite
                 /** Thread handle. */
                 pthread_t thread;
             };
+
+            /**
+             * Get number of logical processors in system.
+             *
+             * @return Number of logical processors.
+             */
+            IGNITE_IMPORT_EXPORT uint32_t GetNumberOfProcessors();
         }
     }
 }
diff --git a/modules/platforms/cpp/common/os/linux/src/common/concurrent_os.cpp b/modules/platforms/cpp/common/os/linux/src/common/concurrent_os.cpp
index ff36c55..0370b57 100644
--- a/modules/platforms/cpp/common/os/linux/src/common/concurrent_os.cpp
+++ b/modules/platforms/cpp/common/os/linux/src/common/concurrent_os.cpp
@@ -15,6 +15,8 @@
  * limitations under the License.
  */
 
+#include <sys/sysinfo.h>
+
 #include "ignite/common/concurrent_os.h"
 
 namespace ignite
@@ -236,6 +238,13 @@ namespace ignite
             {
                 pthread_join(thread, 0);
             }
+
+            uint32_t GetNumberOfProcessors()
+            {
+                int res = get_nprocs();
+
+                return static_cast<uint32_t>(res < 0 ? 0 : res);
+            }
         }
     }
 }
diff --git a/modules/platforms/cpp/common/os/win/include/ignite/common/concurrent_os.h b/modules/platforms/cpp/common/os/win/include/ignite/common/concurrent_os.h
index 4c0e4a2..25fa5c2 100644
--- a/modules/platforms/cpp/common/os/win/include/ignite/common/concurrent_os.h
+++ b/modules/platforms/cpp/common/os/win/include/ignite/common/concurrent_os.h
@@ -633,6 +633,8 @@ namespace ignite
                 virtual void Join();
 
             private:
+                IGNITE_NO_COPY_ASSIGNMENT(Thread);
+
                 /**
                  * Routine.
                  * @param lpParam Param.
@@ -643,6 +645,13 @@ namespace ignite
                 /** Thread handle. */
                 HANDLE handle;
             };
+
+            /**
+             * Get number of logical processors in system.
+             *
+             * @return Number of logical processors.
+             */
+            IGNITE_IMPORT_EXPORT uint32_t GetNumberOfProcessors();
         }
     }
 }
diff --git a/modules/platforms/cpp/common/os/win/src/common/concurrent_os.cpp b/modules/platforms/cpp/common/os/win/src/common/concurrent_os.cpp
index 8d2d29b..02b6834 100644
--- a/modules/platforms/cpp/common/os/win/src/common/concurrent_os.cpp
+++ b/modules/platforms/cpp/common/os/win/src/common/concurrent_os.cpp
@@ -216,7 +216,8 @@ namespace ignite
 
             Thread::~Thread()
             {
-                CloseHandle(handle);
+                if (handle)
+                    CloseHandle(handle);
             }
 
             DWORD Thread::ThreadRoutine(LPVOID lpParam)
@@ -239,6 +240,14 @@ namespace ignite
             {
                 WaitForSingleObject(handle, INFINITE);
             }
+
+            uint32_t GetNumberOfProcessors()
+            {
+                SYSTEM_INFO info;
+                GetSystemInfo(&info);
+
+                return static_cast<uint32_t>(info.dwNumberOfProcessors < 0 ? 0 : info.dwNumberOfProcessors);
+            }
         }
     }
 }
diff --git a/modules/platforms/cpp/common/src/common/thread_pool.cpp b/modules/platforms/cpp/common/src/common/thread_pool.cpp
new file mode 100644
index 0000000..456c7eb
--- /dev/null
+++ b/modules/platforms/cpp/common/src/common/thread_pool.cpp
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <exception>
+#include <iostream>
+
+#include <ignite/common/thread_pool.h>
+
+namespace ignite
+{
+    namespace common
+    {
+        ThreadPool::ThreadPool(uint32_t threadsNum):
+            started(false),
+            stopped(false),
+            threads()
+        {
+            uint32_t threadToStart = threadsNum != 0 ? threadsNum : concurrent::GetNumberOfProcessors();
+            if (!threadToStart)
+                threadToStart = 2;
+
+            threads.reserve(threadToStart);
+            for (uint32_t i = 0; i < threadToStart; ++i)
+            {
+                SP_WorkerThread thread(new WorkerThread(queue));
+                threads.push_back(thread);
+            }
+        }
+
+        ThreadPool::~ThreadPool()
+        {
+            Stop();
+        }
+
+        void ThreadPool::Start()
+        {
+            concurrent::CsLockGuard guard(mutex);
+
+            if (started)
+                return;
+
+            started = true;
+
+            for (std::vector<SP_WorkerThread>::iterator it = threads.begin(); it != threads.end(); ++it)
+                it->Get()->Start();
+        }
+
+        void ThreadPool::Stop()
+        {
+            concurrent::CsLockGuard guard(mutex);
+
+            if (stopped || !started)
+                return;
+
+            stopped = true;
+
+            queue.Unblock();
+
+            for (std::vector<SP_WorkerThread>::iterator it = threads.begin(); it != threads.end(); ++it)
+                it->Get()->Join();
+        }
+
+        void ThreadPool::Dispatch(const SP_ThreadPoolTask& task)
+        {
+            queue.Push(task);
+        }
+
+        ThreadPool::TaskQueue::TaskQueue() :
+            unblocked(false)
+        {
+            // No-op.
+        }
+
+        ThreadPool::TaskQueue::~TaskQueue()
+        {
+            // No-op.
+        }
+
+        void ThreadPool::TaskQueue::Push(const SP_ThreadPoolTask &task)
+        {
+            if (!task.IsValid())
+                return;
+
+            concurrent::CsLockGuard guard(mutex);
+
+            if (unblocked)
+            {
+                IgniteError err(IgniteError::IGNITE_ERR_GENERIC, "Execution thread pool is stopped");
+                HandleTaskError(*task.Get(), err);
+
+                return;
+            }
+
+            tasks.push_back(task);
+
+            waitPoint.NotifyOne();
+        }
+
+        SP_ThreadPoolTask ThreadPool::TaskQueue::Pop()
+        {
+            concurrent::CsLockGuard guard(mutex);
+            if (unblocked)
+                return SP_ThreadPoolTask();
+
+            while (tasks.empty())
+            {
+                waitPoint.Wait(mutex);
+
+                if (unblocked)
+                    return SP_ThreadPoolTask();
+            }
+
+            SP_ThreadPoolTask res = tasks.front();
+            tasks.pop_front();
+            return res;
+        }
+
+        void ThreadPool::TaskQueue::Unblock()
+        {
+            concurrent::CsLockGuard guard(mutex);
+            unblocked = true;
+
+            IgniteError err(IgniteError::IGNITE_ERR_GENERIC, "Execution thread pool is stopped");
+            for (std::deque< SP_ThreadPoolTask >::iterator it = tasks.begin(); it != tasks.end(); ++it)
+                HandleTaskError(*it->Get(), err);
+
+            tasks.clear();
+
+            waitPoint.NotifyAll();
+        }
+
+        ThreadPool::WorkerThread::WorkerThread(TaskQueue& taskQueue) :
+            taskQueue(taskQueue)
+        {
+            // No-op.
+        }
+
+        ThreadPool::WorkerThread::~WorkerThread()
+        {
+            // No-op.
+        }
+
+        void ThreadPool::WorkerThread::Run()
+        {
+            while (true)
+            {
+                SP_ThreadPoolTask task = taskQueue.Pop();
+
+                // Queue is unblocked and workers should stop.
+                if (!task.IsValid())
+                    break;
+
+                ThreadPoolTask &task0 = *task.Get();
+
+                try
+                {
+                    task0.Execute();
+                }
+                catch (const IgniteError& err)
+                {
+                    HandleTaskError(task0, err);
+                }
+                catch (const std::exception& err)
+                {
+                    IgniteError err0(IgniteError::IGNITE_ERR_STD, err.what());
+                    HandleTaskError(task0, err0);
+                }
+                catch (...)
+                {
+                    IgniteError err(IgniteError::IGNITE_ERR_UNKNOWN, "Unknown error occurred when executing task");
+                    HandleTaskError(task0, err);
+                }
+            }
+        }
+
+        void ThreadPool::HandleTaskError(ThreadPoolTask &task, const IgniteError &err)
+        {
+            try
+            {
+                task.OnError(err);
+
+                return;
+            }
+            catch (const IgniteError& err0)
+            {
+                std::cerr << "Exception is thrown during handling of exception: "
+                          << err0.what() << "Aborting execution" << std::endl;
+            }
+            catch (const std::exception& err0)
+            {
+                std::cerr << "Exception is thrown during handling of exception: "
+                          << err0.what() << "Aborting execution" << std::endl;
+            }
+            catch (...)
+            {
+                std::cerr << "Unknown exception is thrown during handling of exception. Aborting execution" << std::endl;
+            }
+
+            std::terminate();
+        }
+    }
+}
diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h b/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h
index d2bf241..ce1a46e 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h
@@ -158,7 +158,7 @@ namespace ignite
                          * means that time check is disabled and entries will be
                          * sent only when buffer is full.
                          *
-                         * @param val Time interval in miliseconds.
+                         * @param val Time interval in milliseconds.
                          */
                         void SetTimeInterval(int64_t val)
                         {
@@ -223,7 +223,7 @@ namespace ignite
                         int32_t bufferSize;
 
                         /**
-                         * Time interval in miliseconds. When a cache update
+                         * Time interval in milliseconds. When a cache update
                          * happens, entry is first put into a buffer. Entries from
                          * buffer will be sent to the master node only if the buffer
                          * is full (its size can be changed via SetBufferSize) or
@@ -374,7 +374,7 @@ namespace ignite
                          * Constructor.
                          */
                         template<typename F>
-                        RemoteFilterHolder(const Reference<F>& filter):
+                        explicit RemoteFilterHolder(const Reference<F>& filter):
                             ContinuousQueryImplBase(false, new event::CacheEntryEventFilterHolder<F>(filter))
                         {
                             // No-op.
diff --git a/modules/platforms/cpp/thin-client-test/CMakeLists.txt b/modules/platforms/cpp/thin-client-test/CMakeLists.txt
index 824edd0..17858fe 100644
--- a/modules/platforms/cpp/thin-client-test/CMakeLists.txt
+++ b/modules/platforms/cpp/thin-client-test/CMakeLists.txt
@@ -32,6 +32,7 @@ set(SOURCES src/teamcity/teamcity_boost.cpp
         src/teamcity/teamcity_messages.cpp
         src/cache_client_test.cpp
         src/compute_client_test.cpp
+        src/continuous_query_test.cpp
         src/test_utils.cpp
         src/ignite_client_test.cpp
         src/interop_test.cpp
diff --git a/modules/platforms/cpp/thin-client-test/config/cache-query-continuous-32.xml b/modules/platforms/cpp/thin-client-test/config/cache-query-continuous-32.xml
new file mode 100644
index 0000000..1a3629e
--- /dev/null
+++ b/modules/platforms/cpp/thin-client-test/config/cache-query-continuous-32.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:util="http://www.springframework.org/schema/util"
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans
+        http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util
+        http://www.springframework.org/schema/util/spring-util.xsd">
+    <import resource="cache-query-continuous-default.xml"/>
+
+    <bean parent="grid.cfg">
+        <property name="memoryConfiguration">
+            <bean class="org.apache.ignite.configuration.MemoryConfiguration">
+                <property name="systemCacheInitialSize" value="#{10 * 1024 * 1024}"/>
+                <property name="systemCacheMaxSize" value="#{40 * 1024 * 1024}"/>
+                <property name="defaultMemoryPolicyName" value="dfltPlc"/>
+
+                <property name="memoryPolicies">
+                    <list>
+                        <bean class="org.apache.ignite.configuration.MemoryPolicyConfiguration">
+                            <property name="name" value="dfltPlc"/>
+                            <property name="maxSize" value="#{100 * 1024 * 1024}"/>
+                            <property name="initialSize" value="#{10 * 1024 * 1024}"/>
+                        </bean>
+                    </list>
+                </property>
+            </bean>
+        </property>
+    </bean>
+</beans>
diff --git a/modules/platforms/cpp/thin-client-test/config/cache-query-continuous-default.xml b/modules/platforms/cpp/thin-client-test/config/cache-query-continuous-default.xml
new file mode 100644
index 0000000..08e9e5f
--- /dev/null
+++ b/modules/platforms/cpp/thin-client-test/config/cache-query-continuous-default.xml
@@ -0,0 +1,98 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:util="http://www.springframework.org/schema/util"
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans
+        http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util
+        http://www.springframework.org/schema/util/spring-util.xsd">
+    <bean abstract="true" id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+        <property name="localHost" value="127.0.0.1"/>
+        <property name="connectorConfiguration"><null/></property>
+
+        <property name="clientConnectorConfiguration">
+            <bean class="org.apache.ignite.configuration.ClientConnectorConfiguration">
+                <property name="host" value="127.0.0.1"/>
+                <property name="port" value="11110"/>
+                <property name="portRange" value="10"/>
+            </bean>
+        </property>
+
+        <property name="cacheConfiguration">
+            <list>
+                <bean parent="cache-template">
+                    <property name="name" value="transactional_no_backup"/>
+                </bean>
+
+                <bean parent="cache-template">
+                    <property name="name" value="with_expiry"/>
+                    <property name="expiryPolicyFactory">
+                        <bean class="javax.cache.expiry.CreatedExpiryPolicy" factory-method="factoryOf">
+                            <constructor-arg>
+                                <bean class="javax.cache.expiry.Duration">
+                                    <constructor-arg value="MILLISECONDS"/>
+                                    <constructor-arg value="500"/>
+                                </bean>
+                            </constructor-arg>
+                        </bean>
+                    </property>
+                </bean>
+            </list>
+        </property>
+
+        <property name="discoverySpi">
+            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+                <property name="ipFinder">
+                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+                        <property name="addresses">
+                            <list>
+                                <!-- In distributed environment, replace with actual host IP address. -->
+                                <value>127.0.0.1:47500</value>
+                            </list>
+                        </property>
+                    </bean>
+                </property>
+                <property name="socketTimeout" value="300" />
+            </bean>
+        </property>
+    </bean>
+
+    <bean id="cache-template" abstract="true" class="org.apache.ignite.configuration.CacheConfiguration">
+        <property name="cacheMode" value="PARTITIONED"/>
+        <property name="atomicityMode" value="TRANSACTIONAL"/>
+        <property name="writeSynchronizationMode" value="FULL_SYNC"/>
+        <property name="backups" value="0"/>
+        <property name="queryEntities">
+            <list>
+                <bean class="org.apache.ignite.cache.QueryEntity">
+                    <property name="keyType" value="java.lang.Integer"/>
+                    <property name="valueType" value="TestEntry"/>
+
+                    <property name="fields">
+                        <map>
+                            <entry key="value" value="java.lang.Integer"/>
+                        </map>
+                    </property>
+                </bean>
+            </list>
+        </property>
+    </bean>
+</beans>
diff --git a/modules/platforms/cpp/thin-client-test/config/cache-query-continuous.xml b/modules/platforms/cpp/thin-client-test/config/cache-query-continuous.xml
new file mode 100644
index 0000000..1c4e275
--- /dev/null
+++ b/modules/platforms/cpp/thin-client-test/config/cache-query-continuous.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:util="http://www.springframework.org/schema/util"
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans
+        http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util
+        http://www.springframework.org/schema/util/spring-util.xsd">
+    <import resource="cache-query-continuous-default.xml"/>
+
+    <bean parent="grid.cfg"/>
+</beans>
diff --git a/modules/platforms/cpp/thin-client-test/src/continuous_query_test.cpp b/modules/platforms/cpp/thin-client-test/src/continuous_query_test.cpp
new file mode 100644
index 0000000..80c0912
--- /dev/null
+++ b/modules/platforms/cpp/thin-client-test/src/continuous_query_test.cpp
@@ -0,0 +1,528 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <deque>
+
+#include <boost/test/unit_test.hpp>
+#include <boost/optional.hpp>
+
+#include <ignite/common/concurrent.h>
+
+#include <ignite/ignition.h>
+#include <ignite/thin/ignite_client.h>
+
+#include <test_utils.h>
+
+using namespace ignite;
+using namespace ignite::thin;
+using namespace ignite::thin::cache;
+using namespace ignite::thin::cache::event;
+using namespace ignite::thin::cache::query;
+using namespace ignite::thin::cache::query::continuous;
+
+using namespace boost::unit_test;
+
+
+/**
+ * Very simple concurrent queue implementation.
+ */
+template<typename T>
+class ConcurrentQueue
+{
+public:
+    /**
+     * Constructor.
+     */
+    ConcurrentQueue()
+    {
+        // No-op.
+    }
+
+    /**
+     * Push next element to queue.
+     *
+     * @param val Value to push.
+     */
+    void Push(const T& val)
+    {
+        common::concurrent::CsLockGuard guard(mutex);
+
+        queue.push_back(val);
+
+        cv.NotifyOne();
+    }
+
+    /**
+     * Pull element from the queue with the specified timeout.
+     *
+     * @param val Value is placed there on success.
+     * @param timeout Timeout in ms.
+     * @return True on success and false on timeout.
+     */
+    bool Pull(T& val, int32_t timeout)
+    {
+        common::concurrent::CsLockGuard guard(mutex);
+
+        if (queue.empty())
+        {
+            bool notified = cv.WaitFor(mutex, timeout);
+
+            if (!notified)
+                return false;
+        }
+
+        assert(!queue.empty());
+
+        val = queue.front();
+
+        queue.pop_front();
+
+        return true;
+    }
+
+private:
+    /** Mutex. */
+    common::concurrent::CriticalSection mutex;
+
+    /** Condition variable. */
+    common::concurrent::ConditionVariable cv;
+
+    /** Queue. */
+    std::deque<T> queue;
+};
+
+/**
+ * Test listener class. Stores events it has been notified about in concurrent
+ * queue so they can be checked later.
+ */
+template<typename K, typename V>
+class Listener : public CacheEntryEventListener<K, V>
+{
+public:
+    enum { DEFAULT_WAIT_TIMEOUT = 1000 };
+
+    /**
+     * Default constructor.
+     */
+    Listener() :
+        disconnected(false),
+        handlingDelay(0)
+    {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param handlingDelay Handling delay.
+     */
+    Listener(int32_t handlingDelay) :
+        disconnected(false),
+        handlingDelay(handlingDelay)
+    {
+        // No-op.
+    }
+
+    /**
+     * Event callback.
+     *
+     * @param evts Events.
+     * @param num Events number.
+     */
+    virtual void OnEvent(const CacheEntryEvent<K, V>* evts, uint32_t num)
+    {
+        for (uint32_t i = 0; i < num; ++i)
+        {
+            if (handlingDelay)
+                boost::this_thread::sleep_for(boost::chrono::milliseconds(handlingDelay));
+
+            eventQueue.Push(evts[i]);
+        }
+    }
+
+    /**
+     * Disconnected callback.
+     *
+     * Called if channel was disconnected. This also means that continuous query was closed and no more
+     * events will be provided for this listener.
+     */
+    virtual void OnDisconnected()
+    {
+        common::concurrent::CsLockGuard guard(disconnectedMutex);
+
+        disconnected = true;
+        disconnectedCv.NotifyAll();
+    }
+
+    /**
+     * Check that next received event contains specific values.
+     *
+     * @param key Key.
+     * @param oldVal Old value.
+     * @param val Current value.
+     * @param eType Event type.
+     */
+    void CheckNextEvent(const K& key, boost::optional<V> oldVal, boost::optional<V> val, CacheEntryEventType::Type eType)
+    {
+        CacheEntryEvent<K, V> event;
+        bool success = eventQueue.Pull(event, DEFAULT_WAIT_TIMEOUT);
+
+        BOOST_REQUIRE(success);
+
+        BOOST_CHECK_EQUAL(event.GetKey(), key);
+        BOOST_CHECK_EQUAL(event.HasOldValue(), oldVal.is_initialized());
+        BOOST_CHECK_EQUAL(event.HasValue(), val.is_initialized());
+        BOOST_CHECK_EQUAL(event.GetEventType(), eType);
+
+        if (oldVal && event.HasOldValue())
+            BOOST_CHECK_EQUAL(event.GetOldValue().value, oldVal->value);
+
+        if (val && event.HasValue())
+            BOOST_CHECK_EQUAL(event.GetValue().value, val->value);
+    }
+
+    /**
+     * Check that there is no nex event in specified period of time.
+     *
+     * @param millis Time span in milliseconds.
+     */
+    void CheckNoEvent(int32_t millis = DEFAULT_WAIT_TIMEOUT)
+    {
+        CacheEntryEvent<K, V> event;
+        bool success = eventQueue.Pull(event, millis);
+
+        BOOST_CHECK(!success);
+    }
+
+    /**
+     * Check that next event is the cache entry expiry event.
+     *
+     * @param key Key.
+     */
+    void CheckExpired(const K& key)
+    {
+        CacheEntryEvent<K, V> event;
+        bool success = eventQueue.Pull(event, DEFAULT_WAIT_TIMEOUT);
+
+        BOOST_CHECK(success);
+
+        BOOST_CHECK_EQUAL(event.GetKey(), key);
+        BOOST_CHECK_EQUAL(event.GetEventType(), CacheEntryEventType::EXPIRED);
+    }
+
+    /**
+     * Make sure that channel is disconnected within specified time.
+     *
+     * @param millis Time span in milliseconds.
+     */
+    void CheckDisconnected(int32_t millis = DEFAULT_WAIT_TIMEOUT)
+    {
+        common::concurrent::CsLockGuard guard(disconnectedMutex);
+
+        if (disconnected)
+            return;
+
+        disconnectedCv.WaitFor(disconnectedMutex, millis);
+        BOOST_CHECK(disconnected);
+    }
+
+private:
+    /** Disconnected Mutex. */
+    common::concurrent::CriticalSection disconnectedMutex;
+
+    /** Disconnected Condition variable. */
+    common::concurrent::ConditionVariable disconnectedCv;
+
+    /** Disconnected flag. */
+    bool disconnected;
+
+    /** Handling delay. */
+    int32_t handlingDelay;
+
+    /** Events queue. */
+    ConcurrentQueue< CacheEntryEvent<K, V> > eventQueue;
+};
+
+/*
+ * Test entry.
+ */
+struct TestEntry
+{
+    /*
+     * Default constructor.
+     */
+    TestEntry() : value(0)
+    {
+        // No-op.
+    }
+
+    /*
+     * Constructor.
+     */
+    explicit TestEntry(int32_t val) : value(val)
+    {
+        // No-op.
+    }
+
+    /* Value */
+    int32_t value;
+};
+
+namespace ignite
+{
+    namespace binary
+    {
+        template<>
+        struct BinaryType<TestEntry> : BinaryTypeDefaultAll<TestEntry>
+        {
+            static void GetTypeName(std::string& dst)
+            {
+                dst = "TestEntry";
+            }
+
+            static void Write(BinaryWriter& writer, const TestEntry& obj)
+            {
+                writer.WriteInt32("value", obj.value);
+            }
+
+            static void Read(BinaryReader& reader, TestEntry& dst)
+            {
+                dst.value = reader.ReadInt32("value");
+            }
+        };
+    }
+}
+
+/**
+ * Test setup fixture.
+ */
+class ContinuousQueryTestSuiteFixture
+{
+public:
+    /**
+     * Constructor.
+     */
+    ContinuousQueryTestSuiteFixture() :
+        node(ignite_test::StartCrossPlatformServerNode("cache-query-continuous.xml", "node-01")),
+        client(),
+        cache()
+    {
+        client = StartClient();
+        cache = GetDefaultCache(client);
+    }
+
+    /**
+     * Destructor.
+     */
+    ~ContinuousQueryTestSuiteFixture()
+    {
+        Ignition::StopAll(false);
+
+        node = Ignite();
+    }
+
+    /**
+     * Start new client.
+     */
+    static IgniteClient StartClient()
+    {
+        IgniteClientConfiguration cfg;
+        cfg.SetEndPoints("127.0.0.1:11110");
+
+        return IgniteClient::Start(cfg);
+    }
+
+    /**
+     * Get test cache using client.
+     *
+     * @param client Client to use.
+     */
+    static CacheClient<int32_t, TestEntry> GetDefaultCache(IgniteClient& client)
+    {
+        return client.GetOrCreateCache<int32_t, TestEntry>("transactional_no_backup");
+    }
+
+    /**
+     * Get cache with configured expiry policy using client.
+     *
+     * @param client Client to use.
+     */
+    static CacheClient<int32_t, TestEntry> GetExpiryCache(IgniteClient& client)
+    {
+        return client.GetOrCreateCache<int32_t, TestEntry>("with_expiry");
+    }
+
+protected:
+    /** Node. */
+    Ignite node;
+
+    /** Client. */
+    IgniteClient client;
+
+    /** Cache client. */
+    CacheClient<int32_t, TestEntry> cache;
+};
+
+void CheckEvents(CacheClient<int32_t, TestEntry>& cache, Listener<int32_t, TestEntry>& listener)
+{
+    cache.Put(1, TestEntry(10));
+    listener.CheckNextEvent(1, boost::none, TestEntry(10), CacheEntryEventType::CREATED);
+
+    cache.Put(1, TestEntry(20));
+    listener.CheckNextEvent(1, TestEntry(10), TestEntry(20), CacheEntryEventType::UPDATED);
+
+    cache.Put(2, TestEntry(20));
+    listener.CheckNextEvent(2, boost::none, TestEntry(20), CacheEntryEventType::CREATED);
+
+    cache.Remove(1);
+    listener.CheckNextEvent(1, TestEntry(20), TestEntry(20), CacheEntryEventType::REMOVED);
+}
+
+BOOST_FIXTURE_TEST_SUITE(ContinuousQueryTestSuite, ContinuousQueryTestSuiteFixture)
+
+BOOST_AUTO_TEST_CASE(TestBasic)
+{
+    Listener<int32_t, TestEntry> listener;
+
+    ContinuousQueryClient<int32_t, TestEntry> qry(MakeReference(listener));
+
+    ContinuousQueryHandleClient handle = cache.QueryContinuous(qry);
+
+    CheckEvents(cache, listener);
+}
+
+BOOST_AUTO_TEST_CASE(TestExpiredQuery)
+{
+    Listener<int32_t, TestEntry> listener;
+    ContinuousQueryHandleClient handle;
+
+    {
+        // Query scope.
+        ContinuousQueryClient<int32_t, TestEntry> qry(MakeReference(listener));
+
+        handle = cache.QueryContinuous(qry);
+    }
+
+    // Query is destroyed here.
+
+    CheckEvents(cache, listener);
+}
+
+BOOST_AUTO_TEST_CASE(TestExpiredEventsReceived)
+{
+    cache = GetExpiryCache(client);
+
+    Listener<int32_t, TestEntry> listener;
+
+    ContinuousQueryClient<int32_t, TestEntry> qry(MakeReference(listener));
+    qry.SetIncludeExpired(true);
+
+    ContinuousQueryHandleClient handle = cache.QueryContinuous(qry);
+
+    cache.Put(1, TestEntry(10));
+    listener.CheckNextEvent(1, boost::none, TestEntry(10), CacheEntryEventType::CREATED);
+    listener.CheckNoEvent(100);
+    listener.CheckExpired(1);
+}
+
+BOOST_AUTO_TEST_CASE(TestExpiredEventsNotReceived)
+{
+    cache = GetExpiryCache(client);
+
+    Listener<int32_t, TestEntry> listener;
+
+    ContinuousQueryClient<int32_t, TestEntry> qry(MakeReference(listener));
+    qry.SetIncludeExpired(false);
+
+    ContinuousQueryHandleClient handle = cache.QueryContinuous(qry);
+
+    cache.Put(1, TestEntry(10));
+    listener.CheckNextEvent(1, boost::none, TestEntry(10), CacheEntryEventType::CREATED);
+    listener.CheckNoEvent();
+}
+
+BOOST_AUTO_TEST_CASE(TestGetSetBufferSize)
+{
+    typedef ContinuousQueryClient<int32_t, TestEntry> QueryType;
+    Listener<int32_t, TestEntry> listener;
+
+    ContinuousQueryClient<int32_t, TestEntry> qry(MakeReference(listener));
+
+    BOOST_CHECK_EQUAL(qry.GetBufferSize(), (int32_t) QueryType::DEFAULT_BUFFER_SIZE);
+
+    qry.SetBufferSize(2 * QueryType::DEFAULT_BUFFER_SIZE);
+
+    BOOST_CHECK_EQUAL(qry.GetBufferSize(), 2 * QueryType::DEFAULT_BUFFER_SIZE);
+
+    ContinuousQueryHandleClient handle = cache.QueryContinuous(qry);
+
+    BOOST_CHECK_EQUAL(qry.GetBufferSize(), 2 * QueryType::DEFAULT_BUFFER_SIZE);
+
+    CheckEvents(cache, listener);
+}
+
+BOOST_AUTO_TEST_CASE(TestGetSetTimeInterval)
+{
+    typedef ContinuousQueryClient<int32_t, TestEntry> QueryType;
+    Listener<int32_t, TestEntry> listener;
+
+    ContinuousQueryClient<int32_t, TestEntry> qry(MakeReference(listener));
+
+    qry.SetBufferSize(10);
+
+    BOOST_CHECK_EQUAL(qry.GetTimeInterval(), static_cast<int>(QueryType::DEFAULT_TIME_INTERVAL));
+
+    qry.SetTimeInterval(500);
+
+    BOOST_CHECK_EQUAL(qry.GetTimeInterval(), 500);
+
+    ContinuousQueryHandleClient handle = cache.QueryContinuous(qry);
+
+    BOOST_CHECK_EQUAL(qry.GetTimeInterval(), 500);
+
+    CheckEvents(cache, listener);
+}
+
+BOOST_AUTO_TEST_CASE(TestPublicPrivateConstantsConsistence)
+{
+    typedef ContinuousQueryClient<int32_t, TestEntry> QueryType;
+    typedef impl::cache::query::continuous::ContinuousQueryImpl<int, TestEntry> QueryImplType;
+    
+    BOOST_CHECK_EQUAL(static_cast<int>(QueryImplType::DEFAULT_TIME_INTERVAL),
+        static_cast<int>(QueryType::DEFAULT_TIME_INTERVAL));
+
+    BOOST_CHECK_EQUAL(static_cast<int>(QueryImplType::DEFAULT_BUFFER_SIZE),
+        static_cast<int>(QueryType::DEFAULT_BUFFER_SIZE));
+}
+
+BOOST_AUTO_TEST_CASE(TestLongEventsProcessingDisconnect)
+{
+    boost::shared_ptr< Listener<int32_t, TestEntry> > listener(new Listener<int32_t, TestEntry>(200));
+
+    ContinuousQueryClient<int32_t, TestEntry> qry(MakeReferenceFromSmartPointer(listener));
+
+    ContinuousQueryHandleClient handle = cache.QueryContinuous(qry);
+
+    for (int32_t i = 0; i < 20; ++i)
+        cache.Put(i, TestEntry(i * 10));
+
+    Ignition::Stop(node.GetName(), true);
+
+    listener->CheckDisconnected();
+}
+
+BOOST_AUTO_TEST_SUITE_END()
diff --git a/modules/platforms/cpp/thin-client/CMakeLists.txt b/modules/platforms/cpp/thin-client/CMakeLists.txt
index 29dadca..20d52ec 100644
--- a/modules/platforms/cpp/thin-client/CMakeLists.txt
+++ b/modules/platforms/cpp/thin-client/CMakeLists.txt
@@ -30,6 +30,7 @@ set(SOURCES src/impl/data_channel.cpp
         src/impl/affinity/affinity_topology_version.cpp
         src/impl/affinity/affinity_assignment.cpp
         src/impl/affinity/affinity_manager.cpp
+        src/impl/cache/query/continuous/continuous_query_notification_handler.cpp
         src/impl/remote_type_updater.cpp
         src/impl/message.cpp
         src/impl/cache/cache_client_proxy.cpp
diff --git a/modules/platforms/cpp/thin-client/include/ignite/impl/thin/cache/cache_client_proxy.h b/modules/platforms/cpp/thin-client/include/ignite/impl/thin/cache/cache_client_proxy.h
index da5f42b..01ee395 100644
--- a/modules/platforms/cpp/thin-client/include/ignite/impl/thin/cache/cache_client_proxy.h
+++ b/modules/platforms/cpp/thin-client/include/ignite/impl/thin/cache/cache_client_proxy.h
@@ -23,6 +23,8 @@
 #include <ignite/thin/cache/query/query_fields_cursor.h>
 #include <ignite/thin/cache/query/query_sql_fields.h>
 
+#include <ignite/impl/thin/cache/continuous/continuous_query_client_holder.h>
+
 namespace ignite
 {
     namespace impl
@@ -298,6 +300,15 @@ namespace ignite
                             const ignite::thin::cache::query::SqlFieldsQuery& qry);
 
                     /**
+                     * Starts the continuous query execution
+                     *
+                     * @param continuousQuery Continuous query.
+                     * @return Query handle. Once all instances are destroyed query execution stopped.
+                     */
+                    ignite::thin::cache::query::continuous::ContinuousQueryHandleClient QueryContinuous(
+                            const query::continuous::SP_ContinuousQueryClientHolderBase& continuousQuery);
+
+                    /**
                      * Get from CacheClient.
                      * Use for testing purposes only.
                      */
diff --git a/modules/platforms/cpp/thin-client/include/ignite/impl/thin/cache/continuous/continuous_query_client_holder.h b/modules/platforms/cpp/thin-client/include/ignite/impl/thin/cache/continuous/continuous_query_client_holder.h
new file mode 100644
index 0000000..13dee90
--- /dev/null
+++ b/modules/platforms/cpp/thin-client/include/ignite/impl/thin/cache/continuous/continuous_query_client_holder.h
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_IMPL_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_CLIENT
+#define _IGNITE_IMPL_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_CLIENT
+
+#include <ignite/common/concurrent.h>
+
+#include <ignite/thin/cache/query/continuous/continuous_query_client.h>
+
+namespace ignite
+{
+    namespace impl
+    {
+        namespace thin
+        {
+            namespace cache
+            {
+                namespace query
+                {
+                    namespace continuous
+                    {
+                        /**
+                         * Continuous query client holder.
+                         */
+                        class ContinuousQueryClientHolderBase
+                        {
+                        public:
+                            /**
+                             * Destructor.
+                             */
+                            virtual ~ContinuousQueryClientHolderBase()
+                            {
+                                // No-op.
+                            }
+
+                            /**
+                             * Read and process cache events.
+                             *
+                             * @param reader Reader to use.
+                             */
+                            virtual void ReadAndProcessEvents(ignite::binary::BinaryRawReader& reader) = 0;
+
+                            /**
+                             * Get buffer size.
+                             *
+                             * @return Buffer size.
+                             */
+                            virtual int32_t GetBufferSize() const = 0;
+
+                            /**
+                             * Get time interval.
+                             *
+                             * @return Time interval.
+                             */
+                            virtual int64_t GetTimeInterval() const = 0;
+
+                            /**
+                             * Gets a value indicating whether to notify about Expired events.
+                             *
+                             * @return Flag value.
+                             */
+                            virtual bool GetIncludeExpired() const = 0;
+
+                            /**
+                             * Disconnected callback.
+                             *
+                             * Called if channel was disconnected.
+                             */
+                            virtual void OnDisconnected() = 0;
+                        };
+
+                        /** Shared pointer to ContinuousQueryClientHolderBase. */
+                        typedef common::concurrent::SharedPointer<ContinuousQueryClientHolderBase> SP_ContinuousQueryClientHolderBase;
+
+                        /**
+                         * Continuous query client holder.
+                         */
+                        template<typename K, typename V>
+                        class ContinuousQueryClientHolder : public ContinuousQueryClientHolderBase
+                        {
+                        public:
+                            /** Wrapped continuous query type. */
+                            typedef ignite::thin::cache::query::continuous::ContinuousQueryClient<K, V> ContinuousQuery;
+
+                            /**
+                             * Constructor.
+                             *
+                             * @param continuousQuery
+                             */
+                            ContinuousQueryClientHolder(const ContinuousQuery& continuousQuery) :
+                                continuousQuery(continuousQuery)
+                            {
+                                // No-op.
+                            }
+
+                            /**
+                             * Read and process cache events.
+                             *
+                             * @param reader Reader to use.
+                             */
+                            virtual void ReadAndProcessEvents(ignite::binary::BinaryRawReader& reader)
+                            {
+                                // Number of events.
+                                int32_t cnt = reader.ReadInt32();
+
+                                // Storing events here.
+                                std::vector< ignite::thin::cache::CacheEntryEvent<K, V> > events;
+                                events.resize(cnt);
+
+                                for (int32_t i = 0; i < cnt; ++i)
+                                    events[i].Read(reader);
+
+                                continuousQuery.GetListener().OnEvent(events.data(), cnt);
+                            }
+
+                            /**
+                             * Get buffer size.
+                             *
+                             * @return Buffer size.
+                             */
+                            virtual int32_t GetBufferSize() const
+                            {
+                                return continuousQuery.GetBufferSize();
+                            }
+
+                            /**
+                             * Get time interval.
+                             *
+                             * @return Time interval.
+                             */
+                            virtual int64_t GetTimeInterval() const
+                            {
+                                return continuousQuery.GetTimeInterval();
+                            }
+
+                            /**
+                             * Gets a value indicating whether to notify about Expired events.
+                             *
+                             * @return Flag value.
+                             */
+                            virtual bool GetIncludeExpired() const
+                            {
+                                return continuousQuery.GetIncludeExpired();
+                            }
+
+                            /**
+                             * Disconnected callback.
+                             *
+                             * Called if channel was disconnected.
+                             */
+                            virtual void OnDisconnected()
+                            {
+                                continuousQuery.GetListener().OnDisconnected();
+                            }
+
+                        private:
+                            /** Continuous query. */
+                            ContinuousQuery continuousQuery;
+                        };
+                    }
+                }
+            }
+        }
+    }
+}
+
+#endif //_IGNITE_IMPL_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_CLIENT
\ No newline at end of file
diff --git a/modules/platforms/cpp/thin-client/include/ignite/impl/thin/cache/query/query_fields_cursor_impl.h b/modules/platforms/cpp/thin-client/include/ignite/impl/thin/cache/query/query_fields_cursor_impl.h
deleted file mode 100644
index 13af4cb..0000000
--- a/modules/platforms/cpp/thin-client/include/ignite/impl/thin/cache/query/query_fields_cursor_impl.h
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef _IGNITE_IMPL_THIN_CACHE_QUERY_QUERY_FIELDS_CURSOR_IMPL
-#define _IGNITE_IMPL_THIN_CACHE_QUERY_QUERY_FIELDS_CURSOR_IMPL
-
-#include <ignite/ignite_error.h>
-
-#include "ignite/impl/ignite_environment.h"
-#include "ignite/impl/operations.h"
-#include "ignite/impl/cache/query/query_batch.h"
-
-namespace ignite
-{
-    namespace impl
-    {
-        namespace thin
-        {
-            namespace cache
-            {
-                namespace query
-                {
-                    class QueryFieldsRowImpl;
-
-                    /**
-                     * Query cursor implementation.
-                     */
-                    class IGNITE_IMPORT_EXPORT QueryCursorImpl
-                    {
-                    public:
-                        /**
-                         * Constructor.
-                         *
-                         * @param env Environment.
-                         * @param javaRef Java reference.
-                         */
-                        QueryCursorImpl(ignite::common::concurrent::SharedPointer<IgniteEnvironment> env, jobject javaRef);
-
-                        /**
-                         * Destructor.
-                         */
-                        ~QueryCursorImpl();
-
-                        /**
-                         * Check whether next result exists.
-                         *
-                         * @param err Error.
-                         * @return True if exists.
-                         */
-                        bool HasNext(IgniteError& err);
-
-                        /**
-                         * Get next object.
-                         *
-                         * @param op Operation.
-                         * @param err Error.
-                         */
-                        void GetNext(OutputOperation& op, IgniteError& err);
-
-                        /**
-                         * Get next row.
-                         *
-                         * @param err Error.
-                         * @return Output row.
-                         */
-                        QueryFieldsRowImpl* GetNextRow(IgniteError& err);
-
-                        /**
-                         * Get all cursor entries.
-                         *
-                         * @param op Operation.
-                         * @param err Error.
-                         */
-                        void GetAll(OutputOperation& op, IgniteError& err);
-
-                        /**
-                         * Get all cursor entries.
-                         *
-                         * @param op Operation.
-                         */
-                        void GetAll(OutputOperation& op);
-
-                    private:
-                        /** Environment. */
-                        ignite::common::concurrent::SharedPointer<impl::IgniteEnvironment> env;
-
-                        /** Handle to Java object. */
-                        jobject javaRef;
-
-                        /** Current result batch. */
-                        QueryBatch* batch;
-
-                        /** Whether cursor has no more elements available. */
-                        bool endReached;
-
-                        /** Whether iteration methods were called. */
-                        bool iterCalled;
-
-                        /** Whether GetAll() method was called. */
-                        bool getAllCalled;
-
-                        IGNITE_NO_COPY_ASSIGNMENT(QueryCursorImpl);
-
-                        /**
-                         * Create Java-side iterator if needed.
-                         *
-                         * @param err Error.
-                         * @return True in case of success, false if an error is thrown.
-                         */
-                        bool CreateIteratorIfNeeded(IgniteError& err);
-
-                       /**
-                         * Get next result batch if update is needed.
-                         *
-                         * @param err Error.
-                         * @return True if operation has been successful.
-                         */
-                        bool GetNextBatchIfNeeded(IgniteError& err);
-
-                        /**
-                         * Check whether Java-side iterator has next element.
-                         *
-                         * @param err Error.
-                         * @return True if the next element is available.
-                         */
-                        bool IteratorHasNext(IgniteError& err);
-                    };
-                }
-            }
-        }
-    }
-}
-
-#endif //_IGNITE_IMPL_THIN_CACHE_QUERY_QUERY_FIELDS_CURSOR_IMPL
\ No newline at end of file
diff --git a/modules/platforms/cpp/thin-client/include/ignite/impl/thin/cache/query/query_fields_row_impl.h b/modules/platforms/cpp/thin-client/include/ignite/impl/thin/cache/query/query_fields_row_impl.h
deleted file mode 100644
index 3c08901..0000000
--- a/modules/platforms/cpp/thin-client/include/ignite/impl/thin/cache/query/query_fields_row_impl.h
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef _IGNITE_IMPL_CACHE_QUERY_CACHE_QUERY_FIELDS_ROW_IMPL
-#define _IGNITE_IMPL_CACHE_QUERY_CACHE_QUERY_FIELDS_ROW_IMPL
-
-#include <ignite/common/concurrent.h>
-#include <ignite/ignite_error.h>
-
-namespace ignite
-{
-    namespace impl
-    {
-        namespace cache
-        {
-            namespace query
-            {
-                /**
-                 * Query fields cursor implementation.
-                 */
-                class IGNITE_IMPORT_EXPORT QueryFieldsRowImpl
-                {
-                public:
-                    typedef common::concurrent::SharedPointer<interop::InteropMemory> SP_InteropMemory;
-
-                    /**
-                     * Constructor.
-                     *
-                     * @param mem Memory containig row data.
-                     */
-                    QueryFieldsRowImpl(SP_InteropMemory mem, int32_t rowBegin, int32_t columnNum) :
-                        mem(mem),
-                        stream(mem.Get()),
-                        reader(&stream),
-                        columnNum(columnNum),
-                        processed(0)
-                    {
-                        stream.Position(rowBegin);
-                    }
-
-                    /**
-                     * Check whether next entry exists.
-                     *
-                     * @return True if next entry exists.
-                     */
-                    bool HasNext()
-                    {
-                        IgniteError err;
-
-                        bool res = HasNext(err);
-
-                        IgniteError::ThrowIfNeeded(err);
-
-                        return res;
-                    }
-
-                    /**
-                     * Check whether next entry exists.
-                     *
-                     * @param err Error.
-                     * @return True if next entry exists.
-                     */
-                    bool HasNext(IgniteError& err)
-                    {
-                        if (IsValid())
-                            return processed < columnNum;
-                        else
-                        {
-                            err = IgniteError(IgniteError::IGNITE_ERR_GENERIC,
-                                "Instance is not usable (did you check for error?).");
-
-                            return false;
-                        }
-                    }
-
-                    /**
-                     * Get next entry.
-                     *
-                     * @return Next entry.
-                     */
-                    template<typename T>
-                    T GetNext()
-                    {
-                        IgniteError err;
-
-                        QueryFieldsRowImpl res = GetNext<T>(err);
-
-                        IgniteError::ThrowIfNeeded(err);
-
-                        return res;
-                    }
-
-                    /**
-                     * Get next entry.
-                     *
-                     * @param err Error.
-                     * @return Next entry.
-                     */
-                    template<typename T>
-                    T GetNext(IgniteError& err)
-                    {
-                        if (IsValid()) {
-                            ++processed;
-                            return reader.ReadTopObject<T>();
-                        }
-                        else
-                        {
-                            err = IgniteError(IgniteError::IGNITE_ERR_GENERIC,
-                                "Instance is not usable (did you check for error?).");
-
-                            return T();
-                        }
-                    }
-
-                    /**
-                     * Get next entry assuming it's an array of 8-byte signed
-                     * integers. Maps to "byte[]" type in Java.
-                     *
-                     * @param dst Array to store data to.
-                     * @param len Expected length of array.
-                     * @return Actual amount of elements read. If "len" argument is less than actual
-                     *     array size or resulting array is set to null, nothing will be written
-                     *     to resulting array and returned value will contain required array length.
-                     *     -1 will be returned in case array in stream was null.
-                     */
-                    int32_t GetNextInt8Array(int8_t* dst, int32_t len)
-                    {
-                        if (IsValid()) {
-
-                            int32_t actualLen = reader.ReadInt8Array(dst, len);
-
-                            if (actualLen == 0 || (dst && len >= actualLen))
-                                ++processed;
-
-                            return actualLen;
-                        }
-                        else
-                        {
-                            throw IgniteError(IgniteError::IGNITE_ERR_GENERIC,
-                                "Instance is not usable (did you check for error?).");
-                        }
-                    }
-
-                    /**
-                     * Check if the instance is valid.
-                     *
-                     * Invalid instance can be returned if some of the previous
-                     * operations have resulted in a failure. For example invalid
-                     * instance can be returned by not-throwing version of method
-                     * in case of error. Invalid instances also often can be
-                     * created using default constructor.
-                     *
-                     * @return True if the instance is valid and can be used.
-                     */
-                    bool IsValid()
-                    {
-                        return mem.Get() != 0;
-                    }
-
-                private:
-                    /** Row memory. */
-                    SP_InteropMemory mem;
-
-                    /** Row data stream. */
-                    interop::InteropInputStream stream;
-
-                    /** Row data reader. */
-                    binary::BinaryReaderImpl reader;
-
-                    /** Number of elements in a row. */
-                    int32_t columnNum;
-
-                    /** Number of elements that have been read by now. */
-                    int32_t processed;
-
-                    IGNITE_NO_COPY_ASSIGNMENT(QueryFieldsRowImpl);
-                };
-            }
-        }
-    }
-}
-
-#endif //_IGNITE_IMPL_CACHE_QUERY_CACHE_QUERY_FIELDS_ROW_IMPL
diff --git a/modules/platforms/cpp/thin-client/include/ignite/thin/cache/cache_client.h b/modules/platforms/cpp/thin-client/include/ignite/thin/cache/cache_client.h
index e53ad98..544010e 100644
--- a/modules/platforms/cpp/thin-client/include/ignite/thin/cache/cache_client.h
+++ b/modules/platforms/cpp/thin-client/include/ignite/thin/cache/cache_client.h
@@ -27,12 +27,15 @@
 
 #include <ignite/thin/cache/query/query_fields_cursor.h>
 #include <ignite/thin/cache/query/query_sql_fields.h>
+#include <ignite/thin/cache/query/continuous/continuous_query_client.h>
+#include <ignite/thin/cache/query/continuous/continuous_query_handle.h>
 
 #include <ignite/impl/thin/writable.h>
 #include <ignite/impl/thin/writable_key.h>
 
 #include <ignite/impl/thin/readable.h>
 #include <ignite/impl/thin/cache/cache_client_proxy.h>
+#include <ignite/impl/thin/cache/continuous/continuous_query_client_holder.h>
 
 namespace ignite
 {
@@ -72,7 +75,7 @@ namespace ignite
                  *
                  * @param impl Implementation.
                  */
-                CacheClient(common::concurrent::SharedPointer<void> impl) :
+                explicit CacheClient(const common::concurrent::SharedPointer<void>& impl) :
                     proxy(impl)
                 {
                     // No-op.
@@ -597,6 +600,22 @@ namespace ignite
                 }
 
                 /**
+                 * Starts the continuous query execution
+                 *
+                 * @param continuousQuery Continuous query.
+                 * @return Query handle. Once all instances are destroyed query execution stopped.
+                 */
+                query::continuous::ContinuousQueryHandleClient QueryContinuous(
+                    query::continuous::ContinuousQueryClient<K, V> continuousQuery)
+                {
+                    using namespace impl::thin::cache::query::continuous;
+
+                    SP_ContinuousQueryClientHolderBase holder(new ContinuousQueryClientHolder<K, V>(continuousQuery));
+
+                    return proxy.QueryContinuous(holder);
+                }
+
+                /**
                  * Refresh affinity mapping.
                  *
                  * @deprecated Does nothing since Apache Ignite 2.8. Affinity mapping is refreshed automatically now.
diff --git a/modules/platforms/cpp/thin-client/include/ignite/thin/cache/cache_entry.h b/modules/platforms/cpp/thin-client/include/ignite/thin/cache/cache_entry.h
new file mode 100644
index 0000000..ef52814
--- /dev/null
+++ b/modules/platforms/cpp/thin-client/include/ignite/thin/cache/cache_entry.h
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::thin::cache::CacheEntry class.
+ */
+
+#ifndef _IGNITE_THIN_CACHE_CACHE_ENTRY
+#define _IGNITE_THIN_CACHE_CACHE_ENTRY
+
+#include <utility>
+#include <ignite/common/common.h>
+
+namespace ignite
+{
+    namespace thin
+    {
+        namespace cache
+        {
+            /**
+             * %Cache entry class template.
+             *
+             * Both key and value types should be default-constructable,
+             * copy-constructable and assignable.
+             */
+            template<typename K, typename V>
+            class CacheEntry
+            {
+            public:
+                /**
+                 * Default constructor.
+                 *
+                 * Creates instance with both key and value default-constructed.
+                 */
+                CacheEntry() :
+                    key(),
+                    val(),
+                    hasValue(false)
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Constructor.
+                 *
+                 * @param key Key.
+                 * @param val Value.
+                 */
+                CacheEntry(const K& key, const V& val) :
+                    key(key),
+                    val(val),
+                    hasValue(true)
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Copy constructor.
+                 *
+                 * @param other Other instance.
+                 */
+                CacheEntry(const CacheEntry& other) :
+                    key(other.key),
+                    val(other.val),
+                    hasValue(other.hasValue)
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Constructor.
+                 *
+                 * @param p Pair.
+                 */
+                CacheEntry(const std::pair<K, V>& p) :
+                    key(p.first),
+                    val(p.second),
+                    hasValue(true)
+                {
+                    // No-op.
+                }
+
+
+                /**
+                 * Destructor.
+                 */
+                virtual ~CacheEntry()
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Assignment operator.
+                 *
+                 * @param other Other instance.
+                 */
+                CacheEntry& operator=(const CacheEntry& other)
+                {
+                    if (this != &other)
+                    {
+                        key = other.key;
+                        val = other.val;
+                        hasValue = other.hasValue;
+                    }
+
+                    return *this;
+                }
+
+                /**
+                 * Get key.
+                 *
+                 * @return Key.
+                 */
+                const K& GetKey() const
+                {
+                    return key;
+                }
+
+                /**
+                 * Get value.
+                 *
+                 * @return Value.
+                 */
+                const V& GetValue() const
+                {
+                    return val;
+                }
+
+                /**
+                 * Check if the value exists.
+                 *
+                 * @return True, if the value exists.
+                 */
+                bool HasValue() const
+                {
+                    return hasValue;
+                }
+
+            protected:
+                /** Key. */
+                K key;
+
+                /** Value. */
+                V val;
+
+                /** Indicates whether value exists */
+                bool hasValue;
+            };
+        }
+    }
+}
+
+#endif //_IGNITE_THIN_CACHE_CACHE_ENTRY
diff --git a/modules/platforms/cpp/thin-client/include/ignite/thin/cache/event/cache_entry_event.h b/modules/platforms/cpp/thin-client/include/ignite/thin/cache/event/cache_entry_event.h
new file mode 100644
index 0000000..d906b4b
--- /dev/null
+++ b/modules/platforms/cpp/thin-client/include/ignite/thin/cache/event/cache_entry_event.h
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::thin::cache::event::CacheEntryEvent class.
+ */
+
+#ifndef _IGNITE_THIN_CACHE_EVENT_CACHE_ENTRY_EVENT
+#define _IGNITE_THIN_CACHE_EVENT_CACHE_ENTRY_EVENT
+
+#include <ignite/binary/binary_raw_reader.h>
+#include <ignite/thin/cache/cache_entry.h>
+
+namespace ignite
+{
+    namespace impl
+    {
+        namespace thin
+        {
+            namespace cache
+            {
+                namespace query
+                {
+                    namespace continuous
+                    {
+                        template<typename K, typename V>
+                        class ContinuousQueryClientHolder;
+                    }
+                }
+            }
+        }
+    }
+    namespace thin
+    {
+        namespace cache
+        {
+            /**
+             * Cache event type.
+             */
+            struct CacheEntryEventType
+            {
+                enum Type
+                {
+                    /** An event type indicating that the cache entry was created. */
+                    CREATED = 0,
+
+                    /** An event type indicating that the cache entry was updated. i.e. a previous */
+                    UPDATED = 1,
+
+                    /** An event type indicating that the cache entry was removed. */
+                    REMOVED = 2,
+
+                    /** An event type indicating that the cache entry was removed by expiration policy. */
+                    EXPIRED = 3
+                };
+            };
+
+            /**
+             * Cache entry event class template.
+             *
+             * Both key and value types should be default-constructable,
+             * copy-constructable and assignable.
+             */
+            template<typename K, typename V>
+            class CacheEntryEvent : public CacheEntry<K, V>
+            {
+                friend class ignite::impl::thin::cache::query::continuous::ContinuousQueryClientHolder<K, V>;
+
+            public:
+                /**
+                 * Default constructor.
+                 *
+                 * Creates instance with all fields default-constructed.
+                 */
+                CacheEntryEvent() :
+                    CacheEntry<K, V>(),
+                    oldVal(),
+                    hasOldValue(false),
+                    eventType(CacheEntryEventType::CREATED)
+                {
+                    // No-op.
+                }
+    
+                /**
+                 * Copy constructor.
+                 *
+                 * @param other Other instance.
+                 */
+                CacheEntryEvent(const CacheEntryEvent<K, V>& other) :
+                    CacheEntry<K, V>(other),
+                    oldVal(other.oldVal),
+                    hasOldValue(other.hasOldValue),
+                    eventType(other.eventType)
+                {
+                    // No-op.
+                }
+    
+                /**
+                 * Destructor.
+                 */
+                virtual ~CacheEntryEvent()
+                {
+                    // No-op.
+                }
+    
+                /**
+                 * Assignment operator.
+                 *
+                 * @param other Other instance.
+                 * @return *this.
+                 */
+                CacheEntryEvent& operator=(const CacheEntryEvent<K, V>& other)
+                {
+                    if (this != &other)
+                    {
+                        CacheEntry<K, V>::operator=(other);
+    
+                        oldVal = other.oldVal;
+                        hasOldValue = other.hasOldValue;
+                        eventType = other.eventType;
+                    }
+    
+                    return *this;
+                }
+
+                /**
+                 * Get event type.
+                 *
+                 * @see CacheEntryEventType for details on events you can receive.
+                 *
+                 * @return Event type.
+                 */
+                CacheEntryEventType::Type GetEventType() const
+                {
+                    return eventType;
+                };
+    
+                /**
+                 * Get old value.
+                 *
+                 * @return Old value.
+                 */
+                const V& GetOldValue() const
+                {
+                    return oldVal;
+                }
+    
+                /**
+                 * Check if the old value exists.
+                 *
+                 * @return True, if the old value exists.
+                 */
+                bool HasOldValue() const
+                {
+                    return hasOldValue;
+                }
+    
+            private:
+                /**
+                 * Reads cache event using provided raw reader.
+                 *
+                 * @param reader Reader to use.
+                 */
+                void Read(binary::BinaryRawReader& reader)
+                {
+                    this->key = reader.ReadObject<K>();
+
+                    this->hasOldValue = reader.TryReadObject(this->oldVal);
+                    this->hasValue = reader.TryReadObject(this->val);
+
+                    int8_t intType = reader.ReadInt8();
+                    if (intType < 0 || intType > 3)
+                    {
+                        std::string errMsg = "Event type is not supported: " + common::LexicalCast<std::string>(intType);
+                        throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, errMsg.c_str());
+                    }
+
+                    eventType = static_cast<CacheEntryEventType::Type>(intType);
+                }
+
+                /** Old value. */
+                V oldVal;
+    
+                /** Indicates whether old value exists */
+                bool hasOldValue;
+
+                /** Event type. */
+                CacheEntryEventType::Type eventType;
+            };
+        }
+    }
+}
+
+#endif //_IGNITE_THIN_CACHE_EVENT_CACHE_ENTRY_EVENT
diff --git a/modules/platforms/cpp/thin-client/include/ignite/thin/cache/event/cache_entry_event_listener.h b/modules/platforms/cpp/thin-client/include/ignite/thin/cache/event/cache_entry_event_listener.h
new file mode 100644
index 0000000..e2829d9
--- /dev/null
+++ b/modules/platforms/cpp/thin-client/include/ignite/thin/cache/event/cache_entry_event_listener.h
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::thin::cache::event::CacheEntryEventListener class.
+ */
+
+#ifndef _IGNITE_THIN_CACHE_EVENT_CACHE_ENTRY_EVENT_LISTENER
+#define _IGNITE_THIN_CACHE_EVENT_CACHE_ENTRY_EVENT_LISTENER
+
+#include <stdint.h>
+
+#include <ignite/thin/cache/event/cache_entry_event.h>
+
+namespace ignite
+{
+    namespace thin
+    {
+        namespace cache
+        {
+            namespace event
+            {
+                /**
+                 * Cache entry event listener.
+                 */
+                template<typename K, typename V>
+                class CacheEntryEventListener
+                {
+                public:
+                    /**
+                     * Default constructor.
+                     */
+                    CacheEntryEventListener()
+                    {
+                        // No-op.
+                    }
+
+                    /**
+                     * Destructor.
+                     */
+                    virtual ~CacheEntryEventListener()
+                    {
+                        // No-op.
+                    }
+
+                    /**
+                     * Event callback.
+                     *
+                     * @param evts Events.
+                     * @param num Events number.
+                     */
+                    virtual void OnEvent(const CacheEntryEvent<K, V>* evts, uint32_t num) = 0;
+
+                    /**
+                     * Disconnected callback.
+                     *
+                     * Called if channel was disconnected. This also means that continuous query was closed and no more
+                     * events will be provided for this listener.
+                     */
+                    virtual void OnDisconnected() = 0;
+                };
+            }
+        }
+    }
+}
+
+#endif //_IGNITE_THIN_CACHE_EVENT_CACHE_ENTRY_EVENT_LISTENER
\ No newline at end of file
diff --git a/modules/platforms/cpp/thin-client/include/ignite/thin/cache/query/continuous/continuous_query_client.h b/modules/platforms/cpp/thin-client/include/ignite/thin/cache/query/continuous/continuous_query_client.h
new file mode 100644
index 0000000..10e66ca
--- /dev/null
+++ b/modules/platforms/cpp/thin-client/include/ignite/thin/cache/query/continuous/continuous_query_client.h
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::thin::cache::query::continuous::ContinuousQueryClient class.
+ */
+
+#ifndef _IGNITE_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_CLIENT
+#define _IGNITE_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_CLIENT
+
+#include <ignite/reference.h>
+
+#include <ignite/thin/cache/event/cache_entry_event_listener.h>
+
+namespace ignite
+{
+    namespace thin
+    {
+        namespace cache
+        {
+            namespace query
+            {
+                namespace continuous
+                {
+                    /**
+                     * Continuous query client.
+                     *
+                     * Continuous query client allow to register a listener for cache update events. On any update to
+                     * the related cache an event is sent to the client that has executed the query and listener is
+                     * notified on that client.
+                     *
+                     * Continuous query can either be executed on the whole topology or only on local node.
+                     *
+                     * To execute the query over the cache use method
+                     * ignite::thin::cache::CacheClient::QueryContinuous().
+                     */
+                    template<typename K, typename V>
+                    class ContinuousQueryClient
+                    {
+                    public:
+                        /**
+                         * Default value for the buffer size.
+                         */
+                        enum { DEFAULT_BUFFER_SIZE = 1 };
+
+                        /**
+                         * Default value for the time interval.
+                         */
+                        enum { DEFAULT_TIME_INTERVAL = 0 };
+
+                        /**
+                         * Destructor.
+                         */
+                        ~ContinuousQueryClient()
+                        {
+                            // No-op.
+                        }
+
+                        /**
+                         * Constructor.
+                         *
+                         * @param lsnr Event listener. Invoked on the node where continuous query execution has been
+                         * started.
+                         */
+                        explicit ContinuousQueryClient(Reference<event::CacheEntryEventListener<K, V> > lsnr) :
+                            bufferSize(DEFAULT_BUFFER_SIZE),
+                            timeInterval(DEFAULT_TIME_INTERVAL),
+                            includeExpired(false),
+                            listener(lsnr)
+                        {
+                            // No-op.
+                        }
+
+                        /**
+                         * Set buffer size.
+                         *
+                         * When a cache update happens, entry is first put into a buffer. Entries from buffer will be
+                         * sent to the master node only if the buffer is full or time provided via SetTimeInterval is
+                         * exceeded.
+                         *
+                         * @param val Buffer size.
+                         */
+                        void SetBufferSize(int32_t val)
+                        {
+                            bufferSize = val;
+                        }
+
+                        /**
+                         * Get buffer size.
+                         *
+                         * When a cache update happens, entry is first put into a buffer. Entries from buffer will be
+                         * sent to the master node only if the buffer is full or time provided via SetTimeInterval is
+                         * exceeded.
+                         *
+                         * @return Buffer size.
+                         */
+                        int32_t GetBufferSize() const
+                        {
+                            return bufferSize;
+                        }
+
+                        /**
+                         * Set time interval.
+                         *
+                         * When a cache update happens, entry is first put into a buffer. Entries from buffer are sent
+                         * to the master node only if the buffer is full (its size can be changed via SetBufferSize) or
+                         * time provided via this method is exceeded.
+                         *
+                         * Default value is DEFAULT_TIME_INTERVAL, i.e. 0, which means that time check is disabled and
+                         * entries will be sent only when buffer is full.
+                         *
+                         * @param val Time interval in miliseconds.
+                         */
+                        void SetTimeInterval(int64_t val)
+                        {
+                            timeInterval = val;
+                        }
+
+                        /**
+                         * Get time interval.
+                         *
+                         * When a cache update happens, entry is first put into a buffer. Entries from buffer are sent
+                         * to the master node only if the buffer is full (its size can be changed via SetBufferSize) or
+                         * time provided via this method is exceeded.
+                         *
+                         * Default value is DEFAULT_TIME_INTERVAL, i.e. 0, which means that time check is disabled and
+                         * entries will be sent only when buffer is full.
+                         *
+                         * @return Time interval.
+                         */
+                        int64_t GetTimeInterval() const
+                        {
+                            return timeInterval;
+                        }
+
+                        /**
+                         * Sets a value indicating whether to notify about Expired events.
+                         *
+                         * If @c true, then the listener will get notifications about expired cache entries. Otherwise,
+                         * only Created, Updated, and Removed events will be passed to the listener.
+                         *
+                         * Defaults to @c false.
+                         *
+                         * @param val Flag value.
+                         */
+                        void SetIncludeExpired(bool val)
+                        {
+                            includeExpired = val;
+                        }
+
+                        /**
+                         * Gets a value indicating whether to notify about Expired events.
+                         *
+                         * If @c true, then the listener will get notifications about expired cache entries. Otherwise,
+                         * only Created, Updated, and Removed events will be passed to the listener.
+                         *
+                         * Defaults to @c false.
+                         *
+                         * @return Flag value.
+                         */
+                        bool GetIncludeExpired() const
+                        {
+                            return includeExpired;
+                        }
+
+                        /**
+                         * Set cache entry event listener.
+                         *
+                         * @param lsnr Cache entry event listener. Invoked on the
+                         *     node where continuous query execution has been
+                         *     started.
+                         */
+                        void SetListener(Reference<event::CacheEntryEventListener<K, V> > lsnr)
+                        {
+                            listener = lsnr;
+                        }
+
+                        /**
+                         * Get cache entry event listener.
+                         *
+                         * @return Cache entry event listener.
+                         */
+                        const event::CacheEntryEventListener<K, V>& GetListener() const
+                        {
+                            return *listener.Get();
+                        }
+
+                        /**
+                         * Get cache entry event listener.
+                         *
+                         * @return Cache entry event listener.
+                         */
+                        event::CacheEntryEventListener<K, V>& GetListener()
+                        {
+                            return *listener.Get();
+                        }
+
+                    private:
+                        /** Buffer size. */
+                        int32_t bufferSize;
+
+                        /** Time interval. */
+                        int64_t timeInterval;
+
+                        /** Include expired. */
+                        bool includeExpired;
+
+                        /** Listener. */
+                        Reference<event::CacheEntryEventListener<K, V> > listener;
+                    };
+                }
+            }
+        }
+    }
+}
+
+#endif //_IGNITE_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_CLIENT
\ No newline at end of file
diff --git a/modules/platforms/cpp/thin-client/include/ignite/thin/cache/query/continuous/continuous_query_handle.h b/modules/platforms/cpp/thin-client/include/ignite/thin/cache/query/continuous/continuous_query_handle.h
new file mode 100644
index 0000000..348132e
--- /dev/null
+++ b/modules/platforms/cpp/thin-client/include/ignite/thin/cache/query/continuous/continuous_query_handle.h
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::thin::cache::query::continuous::ContinuousQueryHandleClient class.
+ */
+
+#ifndef _IGNITE_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE_CLIENT
+#define _IGNITE_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE_CLIENT
+
+#include <ignite/common/concurrent.h>
+
+namespace ignite
+{
+    namespace thin
+    {
+        namespace cache
+        {
+            namespace query
+            {
+                namespace continuous
+                {
+                    /**
+                     * Continuous query handle client.
+                     *
+                     * Once destructed, continuous query is stopped and event listener stops getting event.
+                     */
+                    class ContinuousQueryHandleClient
+                    {
+                    public:
+                        /**
+                         * Default constructor.
+                         */
+                        ContinuousQueryHandleClient() :
+                            impl()
+                        {
+                            // No-op.
+                        }
+
+                        /**
+                         * Constructor.
+                         *
+                         * Internal method. Should not be used by user.
+                         *
+                         * @param impl Implementation.
+                         */
+                        ContinuousQueryHandleClient(const common::concurrent::SharedPointer<void>& impl) :
+                            impl(impl)
+                        {
+                            // No-op.
+                        }
+
+                    private:
+                        /** Implementation delegate. */
+                        common::concurrent::SharedPointer<void> impl;
+                    };
+                }
+            }
+        }
+    }
+}
+
+#endif //_IGNITE_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE_CLIENT
\ No newline at end of file
diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp
index 74219e9..100645a 100644
--- a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp
@@ -20,6 +20,7 @@
 #include "impl/response_status.h"
 #include "impl/message.h"
 #include "impl/cache/cache_client_impl.h"
+#include "impl/cache/query/continuous/continuous_query_notification_handler.h"
 #include "impl/transactions/transactions_impl.h"
 
 using namespace ignite::impl::thin::transactions;
@@ -51,9 +52,7 @@ namespace ignite
 
                 CacheClientImpl::~CacheClientImpl()
                 {
-                    DataRouter* router0 = router.Get();
-                    if (router0)
-                        router0->Close();
+                    // No-op.
                 }
 
                 template<typename ReqT, typename RspT>
@@ -161,7 +160,7 @@ namespace ignite
 
                 void CacheClientImpl::Put(const WritableKey& key, const Writable& value)
                 {
-                    Cache2ValueRequest<RequestType::CACHE_PUT> req(id, binary, key, value);
+                    Cache2ValueRequest<MessageType::CACHE_PUT> req(id, binary, key, value);
                     Response rsp;
 
                     TransactionalSyncCacheKeyMessage(key, req, rsp);
@@ -169,7 +168,7 @@ namespace ignite
 
                 void CacheClientImpl::Get(const WritableKey& key, Readable& value)
                 {
-                    CacheValueRequest<RequestType::CACHE_GET> req(id, binary, key);
+                    CacheValueRequest<MessageType::CACHE_GET> req(id, binary, key);
                     CacheValueResponse rsp(value);
 
                     TransactionalSyncCacheKeyMessage(key, req, rsp);
@@ -177,7 +176,7 @@ namespace ignite
 
                 void CacheClientImpl::PutAll(const Writable & pairs)
                 {
-                    CacheValueRequest<RequestType::CACHE_PUT_ALL> req(id, binary, pairs);
+                    CacheValueRequest<MessageType::CACHE_PUT_ALL> req(id, binary, pairs);
                     Response rsp;
 
                     TransactionalSyncMessage(req, rsp);
@@ -185,7 +184,7 @@ namespace ignite
 
                 void CacheClientImpl::GetAll(const Writable& keys, Readable& pairs)
                 {
-                    CacheValueRequest<RequestType::CACHE_GET_ALL> req(id, binary, keys);
+                    CacheValueRequest<MessageType::CACHE_GET_ALL> req(id, binary, keys);
                     CacheValueResponse rsp(pairs);
 
                     TransactionalSyncMessage(req, rsp);
@@ -193,7 +192,7 @@ namespace ignite
 
                 bool CacheClientImpl::Replace(const WritableKey& key, const Writable& value)
                 {
-                    Cache2ValueRequest<RequestType::CACHE_REPLACE> req(id, binary, key, value);
+                    Cache2ValueRequest<MessageType::CACHE_REPLACE> req(id, binary, key, value);
                     BoolResponse rsp;
 
                     TransactionalSyncCacheKeyMessage(key, req, rsp);
@@ -203,7 +202,7 @@ namespace ignite
 
                 bool CacheClientImpl::ContainsKey(const WritableKey& key)
                 {
-                    CacheValueRequest<RequestType::CACHE_CONTAINS_KEY> req(id, binary, key);
+                    CacheValueRequest<MessageType::CACHE_CONTAINS_KEY> req(id, binary, key);
                     BoolResponse rsp;
 
                     TransactionalSyncCacheKeyMessage(key, req, rsp);
@@ -213,7 +212,7 @@ namespace ignite
 
                 bool CacheClientImpl::ContainsKeys(const Writable& keys)
                 {
-                    CacheValueRequest<RequestType::CACHE_CONTAINS_KEYS> req(id, binary, keys);
+                    CacheValueRequest<MessageType::CACHE_CONTAINS_KEYS> req(id, binary, keys);
                     BoolResponse rsp;
 
                     TransactionalSyncMessage(req, rsp);
@@ -233,7 +232,7 @@ namespace ignite
 
                 bool CacheClientImpl::Remove(const WritableKey& key)
                 {
-                    CacheValueRequest<RequestType::CACHE_REMOVE_KEY> req(id, binary, key);
+                    CacheValueRequest<MessageType::CACHE_REMOVE_KEY> req(id, binary, key);
                     BoolResponse rsp;
 
                     TransactionalSyncCacheKeyMessage(key, req, rsp);
@@ -243,7 +242,7 @@ namespace ignite
 
                 bool CacheClientImpl::Remove(const WritableKey& key, const Writable& val)
                 {
-                    Cache2ValueRequest<RequestType::CACHE_REMOVE_IF_EQUALS> req(id, binary, key, val);
+                    Cache2ValueRequest<MessageType::CACHE_REMOVE_IF_EQUALS> req(id, binary, key, val);
                     BoolResponse rsp;
 
                     TransactionalSyncCacheKeyMessage(key, req, rsp);
@@ -253,7 +252,7 @@ namespace ignite
 
                 void CacheClientImpl::RemoveAll(const Writable& keys)
                 {
-                    CacheValueRequest<RequestType::CACHE_REMOVE_KEYS> req(id, binary, keys);
+                    CacheValueRequest<MessageType::CACHE_REMOVE_KEYS> req(id, binary, keys);
                     Response rsp;
 
                     TransactionalSyncMessage(req, rsp);
@@ -261,7 +260,7 @@ namespace ignite
 
                 void CacheClientImpl::RemoveAll()
                 {
-                    CacheRequest<RequestType::CACHE_REMOVE_ALL> req(id, binary);
+                    CacheRequest<MessageType::CACHE_REMOVE_ALL> req(id, binary);
                     Response rsp;
 
                     TransactionalSyncMessage(req, rsp);
@@ -269,7 +268,7 @@ namespace ignite
 
                 void CacheClientImpl::Clear(const WritableKey& key)
                 {
-                    CacheValueRequest<RequestType::CACHE_CLEAR_KEY> req(id, binary, key);
+                    CacheValueRequest<MessageType::CACHE_CLEAR_KEY> req(id, binary, key);
                     Response rsp;
 
                     TransactionalSyncCacheKeyMessage(key, req, rsp);
@@ -277,7 +276,7 @@ namespace ignite
 
                 void CacheClientImpl::Clear()
                 {
-                    CacheRequest<RequestType::CACHE_CLEAR> req(id, binary);
+                    CacheRequest<MessageType::CACHE_CLEAR> req(id, binary);
                     Response rsp;
 
                     TransactionalSyncMessage(req, rsp);
@@ -285,7 +284,7 @@ namespace ignite
 
                 void CacheClientImpl::ClearAll(const Writable& keys)
                 {
-                    CacheValueRequest<RequestType::CACHE_CLEAR_KEYS> req(id, binary, keys);
+                    CacheValueRequest<MessageType::CACHE_CLEAR_KEYS> req(id, binary, keys);
                     Response rsp;
 
                     TransactionalSyncMessage(req, rsp);
@@ -293,7 +292,7 @@ namespace ignite
 
                 void CacheClientImpl::LocalPeek(const WritableKey& key, Readable& value)
                 {
-                    CacheValueRequest<RequestType::CACHE_LOCAL_PEEK> req(id, binary, key);
+                    CacheValueRequest<MessageType::CACHE_LOCAL_PEEK> req(id, binary, key);
                     CacheValueResponse rsp(value);
 
                     TransactionalSyncCacheKeyMessage(key, req, rsp);
@@ -301,7 +300,7 @@ namespace ignite
 
                 bool CacheClientImpl::Replace(const WritableKey& key, const Writable& oldVal, const Writable& newVal)
                 {
-                    Cache3ValueRequest<RequestType::CACHE_REPLACE_IF_EQUALS> req(id, binary, key, oldVal, newVal);
+                    Cache3ValueRequest<MessageType::CACHE_REPLACE_IF_EQUALS> req(id, binary, key, oldVal, newVal);
                     BoolResponse rsp;
 
                     TransactionalSyncCacheKeyMessage(key, req, rsp);
@@ -311,7 +310,7 @@ namespace ignite
 
                 void CacheClientImpl::GetAndPut(const WritableKey& key, const Writable& valIn, Readable& valOut)
                 {
-                    Cache2ValueRequest<RequestType::CACHE_GET_AND_PUT> req(id, binary, key, valIn);
+                    Cache2ValueRequest<MessageType::CACHE_GET_AND_PUT> req(id, binary, key, valIn);
                     CacheValueResponse rsp(valOut);
 
                     TransactionalSyncCacheKeyMessage(key, req, rsp);
@@ -319,7 +318,7 @@ namespace ignite
 
                 void CacheClientImpl::GetAndRemove(const WritableKey& key, Readable& valOut)
                 {
-                    CacheValueRequest<RequestType::CACHE_GET_AND_REMOVE> req(id, binary, key);
+                    CacheValueRequest<MessageType::CACHE_GET_AND_REMOVE> req(id, binary, key);
                     CacheValueResponse rsp(valOut);
 
                     TransactionalSyncCacheKeyMessage(key, req, rsp);
@@ -327,7 +326,7 @@ namespace ignite
 
                 void CacheClientImpl::GetAndReplace(const WritableKey& key, const Writable& valIn, Readable& valOut)
                 {
-                    Cache2ValueRequest<RequestType::CACHE_GET_AND_REPLACE> req(id, binary, key, valIn);
+                    Cache2ValueRequest<MessageType::CACHE_GET_AND_REPLACE> req(id, binary, key, valIn);
                     CacheValueResponse rsp(valOut);
 
                     TransactionalSyncCacheKeyMessage(key, req, rsp);
@@ -335,7 +334,7 @@ namespace ignite
 
                 bool CacheClientImpl::PutIfAbsent(const WritableKey& key, const Writable& val)
                 {
-                    Cache2ValueRequest<RequestType::CACHE_PUT_IF_ABSENT> req(id, binary, key, val);
+                    Cache2ValueRequest<MessageType::CACHE_PUT_IF_ABSENT> req(id, binary, key, val);
                     BoolResponse rsp;
 
                     TransactionalSyncCacheKeyMessage(key, req, rsp);
@@ -345,7 +344,7 @@ namespace ignite
 
                 void CacheClientImpl::GetAndPutIfAbsent(const WritableKey& key, const Writable& valIn, Readable& valOut)
                 {
-                    Cache2ValueRequest<RequestType::CACHE_GET_AND_PUT_IF_ABSENT> req(id, binary, key, valIn);
+                    Cache2ValueRequest<MessageType::CACHE_GET_AND_PUT_IF_ABSENT> req(id, binary, key, valIn);
                     CacheValueResponse rsp(valOut);
 
                     TransactionalSyncCacheKeyMessage(key, req, rsp);
@@ -369,6 +368,25 @@ namespace ignite
 
                     return cursorImpl;
                 }
+
+                query::continuous::SP_ContinuousQueryHandleClientImpl CacheClientImpl::QueryContinuous(
+                        const query::continuous::SP_ContinuousQueryClientHolderBase& continuousQuery)
+                {
+                    const query::continuous::ContinuousQueryClientHolderBase& cq = *continuousQuery.Get();
+
+                    ContinuousQueryRequest req(id, cq.GetBufferSize(), cq.GetTimeInterval(), cq.GetIncludeExpired());
+                    ContinuousQueryResponse rsp;
+
+                    SP_DataChannel channel = SyncMessage(req, rsp);
+
+                    query::continuous::SP_ContinuousQueryNotificationHandler handler(
+                            new query::continuous::ContinuousQueryNotificationHandler(*channel.Get(), continuousQuery));
+
+                    channel.Get()->RegisterNotificationHandler(rsp.GetQueryId(), handler);
+
+                    return query::continuous::SP_ContinuousQueryHandleClientImpl(
+                        new query::continuous::ContinuousQueryHandleClientImpl(channel, rsp.GetQueryId()));
+                }
             }
         }
     }
diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h
index 0df1ae3..4c82592 100644
--- a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h
+++ b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h
@@ -23,9 +23,12 @@
 
 #include <ignite/thin/cache/query/query_sql_fields.h>
 
+#include <ignite/impl/thin/cache/continuous/continuous_query_client_holder.h>
+
 #include "impl/data_router.h"
 #include "impl/transactions/transactions_impl.h"
 #include "impl/cache/query/query_fields_cursor_impl.h"
+#include "impl/cache/query/continuous/continuous_query_handle_impl.h"
 
 namespace ignite
 {
@@ -298,6 +301,15 @@ namespace ignite
                      */
                     query::SP_QueryFieldsCursorImpl Query(const ignite::thin::cache::query::SqlFieldsQuery &qry);
 
+                    /**
+                     * Starts the continuous query execution
+                     *
+                     * @param continuousQuery Continuous query.
+                     * @return Query handle. Once all instances are destroyed query execution stopped.
+                     */
+                    query::continuous::SP_ContinuousQueryHandleClientImpl QueryContinuous(
+                            const query::continuous::SP_ContinuousQueryClientHolderBase& continuousQuery);
+
                 private:
                     /**
                      * Synchronously send request message and receive response.
diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_proxy.cpp b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_proxy.cpp
index a5795ee..eab6ba0 100644
--- a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_proxy.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_proxy.cpp
@@ -15,12 +15,18 @@
  * limitations under the License.
  */
 
+#include <ignite/thin/cache/query/continuous/continuous_query_handle.h>
+
 #include <ignite/impl/thin/cache/cache_client_proxy.h>
 #include <impl/cache/cache_client_impl.h>
 
 using namespace ignite::impl::thin;
 using namespace cache;
 
+using ignite::thin::cache::query::SqlFieldsQuery;
+using ignite::thin::cache::query::QueryFieldsCursor;
+using ignite::thin::cache::query::continuous::ContinuousQueryHandleClient;
+
 namespace
 {
     using namespace ignite::common::concurrent;
@@ -150,13 +156,18 @@ namespace ignite
                     GetCacheImpl(impl).GetAndPutIfAbsent(key, valIn, valOut);
                 }
 
-                ignite::thin::cache::query::QueryFieldsCursor CacheClientProxy::Query(
-                        const ignite::thin::cache::query::SqlFieldsQuery &qry)
+                QueryFieldsCursor CacheClientProxy::Query(const ignite::thin::cache::query::SqlFieldsQuery &qry)
                 {
                     query::SP_QueryFieldsCursorImpl cursorImpl = GetCacheImpl(impl).Query(qry);
 
                     return ignite::thin::cache::query::QueryFieldsCursor(cursorImpl);
                 }
+
+                ContinuousQueryHandleClient CacheClientProxy::QueryContinuous(
+                    const query::continuous::SP_ContinuousQueryClientHolderBase &continuousQuery)
+                {
+                    return ContinuousQueryHandleClient(GetCacheImpl(impl).QueryContinuous(continuousQuery));
+                }
             }
         }
     }
diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/query/continuous/continuous_query_handle_impl.h b/modules/platforms/cpp/thin-client/src/impl/cache/query/continuous/continuous_query_handle_impl.h
new file mode 100644
index 0000000..2843d30
--- /dev/null
+++ b/modules/platforms/cpp/thin-client/src/impl/cache/query/continuous/continuous_query_handle_impl.h
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_IMPL_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE_CLIENT
+#define _IGNITE_IMPL_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE_CLIENT
+
+#include <ignite/common/common.h>
+#include <ignite/common/concurrent.h>
+
+#include "impl/data_channel.h"
+#include "impl/message.h"
+
+namespace ignite
+{
+    namespace impl
+    {
+        namespace thin
+        {
+            namespace cache
+            {
+                namespace query
+                {
+                    namespace continuous
+                    {
+                        /**
+                         * Continuous query handle client implementation.
+                         *
+                         * Once destructed, continuous query is stopped and event listener stops getting event.
+                         */
+                        class ContinuousQueryHandleClientImpl
+                        {
+                        public:
+                            /**
+                             * Default constructor.
+                             *
+                             * @param channel Channel.
+                             * @param queryId Query ID.
+                             */
+                            ContinuousQueryHandleClientImpl(SP_DataChannel channel, int64_t queryId) :
+                                channel(channel),
+                                queryId(queryId)
+                            {
+                                // No-op.
+                            }
+
+                            /**
+                             * Destructor.
+                             */
+                            virtual ~ContinuousQueryHandleClientImpl()
+                            {
+                                assert(channel.IsValid());
+
+                                DataChannel& channel0 = *channel.Get();
+
+                                channel0.DeregisterNotificationHandler(queryId);
+                                channel0.CloseResource(queryId);
+                            }
+
+                        private:
+                            /** Data channel. */
+                            SP_DataChannel channel;
+
+                            /** Query ID. */
+                            int64_t queryId;
+                        };
+
+                        /** Shared pointer to ContinuousQueryHandleClientImpl. */
+                        typedef common::concurrent::SharedPointer<ContinuousQueryHandleClientImpl> SP_ContinuousQueryHandleClientImpl;
+                    }
+                }
+            }
+        }
+    }
+}
+
+#endif //_IGNITE_IMPL_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE_CLIENT
\ No newline at end of file
diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/query/continuous/continuous_query_notification_handler.cpp b/modules/platforms/cpp/thin-client/src/impl/cache/query/continuous/continuous_query_notification_handler.cpp
new file mode 100644
index 0000000..96449dd
--- /dev/null
+++ b/modules/platforms/cpp/thin-client/src/impl/cache/query/continuous/continuous_query_notification_handler.cpp
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "impl/data_channel.h"
+#include "impl/message.h"
+
+#include "impl/cache/query/continuous/continuous_query_notification_handler.h"
+
+namespace ignite
+{
+    namespace impl
+    {
+        namespace thin
+        {
+            namespace cache
+            {
+                namespace query
+                {
+                    namespace continuous
+                    {
+                        ContinuousQueryNotificationHandler::ContinuousQueryNotificationHandler(DataChannel& channel,
+                            const SP_ContinuousQueryClientHolderBase& continuousQuery) :
+                            continuousQuery(continuousQuery),
+                            channel(channel)
+                        {
+                            // No-op.
+                        }
+
+                        ContinuousQueryNotificationHandler::~ContinuousQueryNotificationHandler()
+                        {
+                            // No-op.
+                        }
+
+                        void ContinuousQueryNotificationHandler::OnNotification(const network::DataBuffer& msg)
+                        {
+                            ClientCacheEntryEventNotification notification(*continuousQuery.Get());
+                            channel.DeserializeMessage(msg, notification);
+                        }
+
+                        void ContinuousQueryNotificationHandler::OnDisconnected()
+                        {
+                            continuousQuery.Get()->OnDisconnected();
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/query/continuous/continuous_query_notification_handler.h b/modules/platforms/cpp/thin-client/src/impl/cache/query/continuous/continuous_query_notification_handler.h
new file mode 100644
index 0000000..b1bc955
--- /dev/null
+++ b/modules/platforms/cpp/thin-client/src/impl/cache/query/continuous/continuous_query_notification_handler.h
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_IMPL_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_NOTIFICATION_HANDLER
+#define _IGNITE_IMPL_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_NOTIFICATION_HANDLER
+
+#include <ignite/common/common.h>
+#include <ignite/common/concurrent.h>
+
+#include <ignite/binary/binary_raw_reader.h>
+
+#include <ignite/impl/interop/interop_input_stream.h>
+#include <ignite/impl/binary/binary_reader_impl.h>
+#include <ignite/impl/thin/cache/continuous/continuous_query_client_holder.h>
+
+#include "impl/notification_handler.h"
+
+namespace ignite
+{
+    namespace impl
+    {
+        namespace thin
+        {
+            namespace cache
+            {
+                namespace query
+                {
+                    namespace continuous
+                    {
+                        /**
+                         * Continuous query notification handler.
+                         */
+                        class ContinuousQueryNotificationHandler : public NotificationHandler
+                        {
+                        public:
+                            /**
+                             * Constructor.
+                             *
+                             * @param channel Channel.
+                             * @param continuousQuery Continuous Query.
+                             */
+                            ContinuousQueryNotificationHandler(DataChannel& channel,
+                                const SP_ContinuousQueryClientHolderBase& continuousQuery);
+
+                            /**
+                             * Destructor.
+                             */
+                            virtual ~ContinuousQueryNotificationHandler();
+
+                            /**
+                             * Handle notification.
+                             *
+                             * @param msg Message.
+                             * @return @c true if processing complete.
+                             */
+                            virtual void OnNotification(const network::DataBuffer& msg);
+
+                            /**
+                             * Disconnected callback.
+                             *
+                             * Called if channel was disconnected.
+                             */
+                            virtual void OnDisconnected();
+
+                        private:
+                            /** Query. */
+                            SP_ContinuousQueryClientHolderBase continuousQuery;
+
+                            /** Channel. */
+                            DataChannel& channel;
+                        };
+
+                        /** Shared pointer to ContinuousQueryHandleClientImpl. */
+                        typedef common::concurrent::SharedPointer<ContinuousQueryNotificationHandler> SP_ContinuousQueryNotificationHandler;
+                    }
+                }
+            }
+        }
+    }
+}
+
+#endif //_IGNITE_IMPL_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_NOTIFICATION_HANDLER
\ No newline at end of file
diff --git a/modules/platforms/cpp/thin-client/src/impl/channel_state_handler.h b/modules/platforms/cpp/thin-client/src/impl/channel_state_handler.h
index 14b4712..e8954dc 100644
--- a/modules/platforms/cpp/thin-client/src/impl/channel_state_handler.h
+++ b/modules/platforms/cpp/thin-client/src/impl/channel_state_handler.h
@@ -52,6 +52,14 @@ namespace ignite
                  * @param err Error.
                  */
                 virtual void OnHandshakeError(uint64_t id, const IgniteError& err) = 0;
+
+                /**
+                 * Called if notification handling failed.
+                 *
+                 * @param id Channel ID.
+                 * @param err Error.
+                 */
+                virtual void OnNotificationHandlingError(uint64_t id, const IgniteError& err) = 0;
             };
         }
     }
diff --git a/modules/platforms/cpp/thin-client/src/impl/compute/compute_client_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/compute/compute_client_impl.cpp
index c7904ed..53de78f 100644
--- a/modules/platforms/cpp/thin-client/src/impl/compute/compute_client_impl.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/compute/compute_client_impl.cpp
@@ -51,7 +51,7 @@ namespace
             // No-op.
         }
 
-        virtual bool OnNotification(const network::DataBuffer& msg)
+        virtual void OnNotification(const network::DataBuffer& msg)
         {
             ComputeTaskFinishedNotification notification(res);
             channel.DeserializeMessage(msg, notification);
@@ -59,12 +59,15 @@ namespace
             if (notification.IsFailure())
             {
                 promise.SetError(IgniteError(IgniteError::IGNITE_ERR_COMPUTE_EXECUTION_REJECTED,
-                    notification.GetErrorMessage().c_str()));
+                    notification.GetError().c_str()));
             }
             else
                 promise.SetValue();
+        }
 
-            return true;
+        virtual void OnDisconnected()
+        {
+            promise.SetError(IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE, "Connection closed"));
         }
 
         /**
@@ -111,6 +114,8 @@ namespace ignite
                     channel.Get()->RegisterNotificationHandler(rsp.GetNotificationId(), handler);
 
                     handler.Get()->GetFuture().GetValue();
+
+                    channel.Get()->DeregisterNotificationHandler(rsp.GetNotificationId());
                 }
             }
         }
diff --git a/modules/platforms/cpp/thin-client/src/impl/data_channel.cpp b/modules/platforms/cpp/thin-client/src/impl/data_channel.cpp
index 2bdd6dc..9c7194a 100644
--- a/modules/platforms/cpp/thin-client/src/impl/data_channel.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/data_channel.cpp
@@ -57,7 +57,8 @@ namespace ignite
                 const ignite::network::SP_AsyncClientPool& asyncPool,
                 const ignite::thin::IgniteClientConfiguration& cfg,
                 binary::BinaryTypeManager& typeMgr,
-                ChannelStateHandler& stateHandler
+                ChannelStateHandler& stateHandler,
+                common::ThreadPool& userThreadPool
             ) :
                 stateHandler(stateHandler),
                 handshakePerformed(false),
@@ -68,14 +69,15 @@ namespace ignite
                 typeMgr(typeMgr),
                 currentVersion(VERSION_DEFAULT),
                 reqIdCounter(0),
-                responseMutex()
+                responseMutex(),
+                userThreadPool(userThreadPool)
             {
                 // No-op.
             }
 
             DataChannel::~DataChannel()
             {
-                Close();
+                Close(0);
             }
 
             void DataChannel::StartHandshake()
@@ -83,10 +85,9 @@ namespace ignite
                 DoHandshake(VERSION_DEFAULT);
             }
 
-            void DataChannel::Close()
+            void DataChannel::Close(const IgniteError* err)
             {
-                asyncPool.Get()->Close(id, 0);
-                handlerMap.clear();
+                asyncPool.Get()->Close(id, err);
             }
 
             void DataChannel::SyncMessage(Request &req, Response &rsp, int32_t timeout)
@@ -187,13 +188,16 @@ namespace ignite
 
                 if (flags & Flag::NOTIFICATION)
                 {
-                    common::concurrent::CsLockGuard lock(handlerMutex);
+                    common::SP_ThreadPoolTask task;
+                    {
+                        common::concurrent::CsLockGuard lock(handlerMutex);
 
-                    NotificationHandlerHolder& holder = handlerMap[rspId];
-                    holder.ProcessNotification(msg);
+                        NotificationHandlerHolder& holder = handlerMap[rspId];
+                        task = holder.ProcessNotification(msg, id, stateHandler);
+                    }
 
-                    if (holder.IsProcessingComplete())
-                        handlerMap.erase(rspId);
+                    if (task.IsValid())
+                        userThreadPool.Dispatch(task);
                 }
                 else
                 {
@@ -217,9 +221,13 @@ namespace ignite
 
                 NotificationHandlerHolder& holder = handlerMap[notId];
                 holder.SetHandler(handler);
+            }
 
-                if (holder.IsProcessingComplete())
-                    handlerMap.erase(notId);
+            void DataChannel::DeregisterNotificationHandler(int64_t notId)
+            {
+                common::concurrent::CsLockGuard lock(handlerMutex);
+
+                handlerMap.erase(notId);
             }
 
             bool DataChannel::DoHandshake(const ProtocolVersion& propVer)
@@ -241,7 +249,7 @@ namespace ignite
                 binary::BinaryWriterImpl writer(&outStream, 0);
 
                 int32_t lenPos = outStream.Reserve(4);
-                writer.WriteInt8(RequestType::HANDSHAKE);
+                writer.WriteInt8(MessageType::HANDSHAKE);
 
                 writer.WriteInt16(propVer.GetMajor());
                 writer.WriteInt16(propVer.GetMinor());
@@ -337,6 +345,18 @@ namespace ignite
                 return supportedVersions.find(ver) != supportedVersions.end();
             }
 
+            void DataChannel::DeserializeMessage(const network::DataBuffer &data, Response &msg)
+            {
+                interop::InteropInputStream inStream(data.GetInputStream());
+
+                // Skipping size (4 bytes) and reqId (8 bytes)
+                inStream.Ignore(12);
+
+                binary::BinaryReaderImpl reader(&inStream);
+
+                msg.Read(reader, currentVersion);
+            }
+
             void DataChannel::FailPendingRequests(const IgniteError* err)
             {
                 IgniteError defaultErr(IgniteError::IGNITE_ERR_NETWORK_FAILURE, "Connection was closed");
@@ -352,9 +372,39 @@ namespace ignite
                     responseMap.clear();
                 }
 
+                {
+                    common::concurrent::CsLockGuard lock(handlerMutex);
+
+                    for (NotificationHandlerMap::iterator it = handlerMap.begin(); it != handlerMap.end(); ++it)
+                    {
+                        common::SP_ThreadPoolTask task = it->second.ProcessClosed();
+
+                        if (task.IsValid())
+                            userThreadPool.Dispatch(task);
+                    }
+                }
+
                 if (!handshakePerformed)
                     stateHandler.OnHandshakeError(id, *err);
             }
+
+            void DataChannel::CloseResource(int64_t resourceId)
+            {
+                ResourceCloseRequest req(resourceId);
+                Response rsp;
+
+                try
+                {
+                    SyncMessage(req, rsp, config.GetConnectionTimeout());
+                }
+                catch (const IgniteError& err)
+                {
+                    // Network failure means connection is closed or broken, which means
+                    // that all resources were freed automatically.
+                    if (err.GetCode() != IgniteError::IGNITE_ERR_NETWORK_FAILURE)
+                        throw;
+                }
+            }
         }
     }
 }
diff --git a/modules/platforms/cpp/thin-client/src/impl/data_channel.h b/modules/platforms/cpp/thin-client/src/impl/data_channel.h
index aef02d0..5e09b50 100644
--- a/modules/platforms/cpp/thin-client/src/impl/data_channel.h
+++ b/modules/platforms/cpp/thin-client/src/impl/data_channel.h
@@ -26,6 +26,7 @@
 #include <ignite/thin/ignite_client_configuration.h>
 
 #include <ignite/common/concurrent.h>
+#include <ignite/common/thread_pool.h>
 #include <ignite/network/socket_client.h>
 #include <ignite/network/async_client_pool.h>
 
@@ -107,13 +108,15 @@ namespace ignite
                  * @param cfg Configuration.
                  * @param typeMgr Type manager.
                  * @param stateHandler State handler.
+                 * @param userThreadPool Thread pool to use to dispatch tasks that can run user code.
                  */
                 DataChannel(uint64_t id,
                     const network::EndPoint& addr,
                     const ignite::network::SP_AsyncClientPool& asyncPool,
                     const ignite::thin::IgniteClientConfiguration& cfg,
                     binary::BinaryTypeManager& typeMgr,
-                    ChannelStateHandler& stateHandler);
+                    ChannelStateHandler& stateHandler,
+                    common::ThreadPool& userThreadPool);
 
                 /**
                  * Destructor.
@@ -129,8 +132,10 @@ namespace ignite
 
                 /**
                  * Close connection.
+                 *
+                 * @param err Error.
                  */
-                void Close();
+                void Close(const IgniteError* err);
 
                 /**
                  * Synchronously send request message and receive response. Uses provided timeout.
@@ -151,12 +156,20 @@ namespace ignite
 
                 /**
                  * Register handler for the notification.
+                 *
                  * @param notId Notification ID.
                  * @param handler Handler.
                  */
                 void RegisterNotificationHandler(int64_t notId, const SP_NotificationHandler& handler);
 
                 /**
+                 * Deregister handler for the notification.
+                 *
+                 * @param notId Notification ID.
+                 */
+                void DeregisterNotificationHandler(int64_t notId);
+
+                /**
                  * Get remote node.
                  * @return Node.
                  */
@@ -176,22 +189,10 @@ namespace ignite
 
                 /**
                  * Deserialize message received by this channel.
-                 * @tparam T Message type.
                  * @param data Data.
                  * @param msg Message.
                  */
-                template<typename T>
-                void DeserializeMessage(const network::DataBuffer& data, T& msg)
-                {
-                    interop::InteropInputStream inStream(data.GetInputStream());
-
-                    // Skipping size (4 bytes) and reqId (8 bytes)
-                    inStream.Ignore(12);
-
-                    binary::BinaryReaderImpl reader(&inStream);
-
-                    msg.Read(reader, currentVersion);
-                }
+                void DeserializeMessage(const network::DataBuffer& data, Response& msg);
 
                 /**
                  * Fail all pending requests.
@@ -200,6 +201,13 @@ namespace ignite
                  */
                 void FailPendingRequests(const IgniteError* err);
 
+                /**
+                 * Close remote resource.
+                 *
+                 * @param resourceId Resource ID.
+                 */
+                void CloseResource(int64_t resourceId);
+
             private:
                 IGNITE_NO_COPY_ASSIGNMENT(DataChannel);
 
@@ -306,6 +314,9 @@ namespace ignite
 
                 /** Notification handlers. */
                 NotificationHandlerMap handlerMap;
+
+                /** Thread pool to dispatch user code execution. */
+                common::ThreadPool& userThreadPool;
             };
 
             /** Shared pointer type. */
diff --git a/modules/platforms/cpp/thin-client/src/impl/data_router.cpp b/modules/platforms/cpp/thin-client/src/impl/data_router.cpp
index ea1d36b..78580ca 100644
--- a/modules/platforms/cpp/thin-client/src/impl/data_router.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/data_router.cpp
@@ -41,7 +41,8 @@ namespace ignite
         namespace thin
         {
             DataRouter::DataRouter(const ignite::thin::IgniteClientConfiguration& cfg) :
-                config(cfg)
+                config(cfg),
+                userThreadPool(0)
             {
                 srand(common::GetRandSeed());
 
@@ -93,6 +94,7 @@ namespace ignite
                     asyncPool.Get()->SetHandler(this);
                 }
 
+                userThreadPool.Start();
                 asyncPool.Get()->Start(ranges, config.GetConnectionsLimit());
 
                 bool connected = EnsureConnected(config.GetConnectionTimeout());
@@ -109,6 +111,8 @@ namespace ignite
                     asyncPool.Get()->SetHandler(0);
                     asyncPool.Get()->Stop();
                 }
+
+                userThreadPool.Stop();
             }
 
             bool DataRouter::EnsureConnected(int32_t timeout)
@@ -140,7 +144,7 @@ namespace ignite
 
             void DataRouter::OnConnectionSuccess(const network::EndPoint& addr, uint64_t id)
             {
-                SP_DataChannel channel(new DataChannel(id, addr, asyncPool, config, typeMgr, *this));
+                SP_DataChannel channel(new DataChannel(id, addr, asyncPool, config, typeMgr, *this, userThreadPool));
 
                 {
                     common::concurrent::CsLockGuard lock(channelsMutex);
@@ -173,13 +177,9 @@ namespace ignite
                 {
                     common::concurrent::CsLockGuard lock(channelsMutex);
 
-                    connectedChannels.erase(id);
-
-                    ChannelsIdMap::iterator it = channels.find(id);
-                    if (it == channels.end())
-                        return;
+                    channel = FindChannelLocked(id);
 
-                    channel = it->second;
+                    connectedChannels.erase(id);
                     InvalidateChannelLocked(channel);
                 }
 
@@ -188,14 +188,7 @@ namespace ignite
 
             void DataRouter::OnMessageReceived(uint64_t id, const network::DataBuffer& msg)
             {
-                SP_DataChannel channel;
-                {
-                    common::concurrent::CsLockGuard lock(channelsMutex);
-
-                    ChannelsIdMap::iterator it = channels.find(id);
-                    if (it != channels.end())
-                        channel = it->second;
-                }
+                SP_DataChannel channel = FindChannel(id);
 
                 if (channel.IsValid())
                     channel.Get()->ProcessMessage(msg);
@@ -214,12 +207,7 @@ namespace ignite
                 connectedChannels.insert(id);
                 channelsWaitPoint.NotifyAll();
 
-                SP_DataChannel channel;
-
-                ChannelsIdMap::iterator it = channels.find(id);
-                if (it != channels.end())
-                    channel = it->second;
-
+                SP_DataChannel channel = FindChannelLocked(id);
                 if (channel.IsValid())
                 {
                     const IgniteNode& node = channel.Get()->GetNode();
@@ -238,6 +226,14 @@ namespace ignite
                 channelsWaitPoint.NotifyAll();
             }
 
+            void DataRouter::OnNotificationHandlingError(uint64_t id, const IgniteError &err)
+            {
+                SP_DataChannel channel = FindChannel(id);
+
+                if (channel.IsValid())
+                    channel.Get()->Close(&err);
+            }
+
             SP_DataChannel DataRouter::SyncMessage(Request &req, Response &rsp)
             {
                 SP_DataChannel channel = GetRandomChannel();
@@ -423,6 +419,21 @@ namespace ignite
 
                 std::random_shuffle(ranges.begin(), ranges.end());
             }
+
+            SP_DataChannel DataRouter::FindChannel(uint64_t id)
+            {
+                common::concurrent::CsLockGuard lock(channelsMutex);
+                return FindChannelLocked(id);
+            }
+
+            SP_DataChannel DataRouter::FindChannelLocked(uint64_t id)
+            {
+                ChannelsIdMap::iterator it = channels.find(id);
+                if (it != channels.end())
+                    return it->second;
+
+                return SP_DataChannel();
+            }
         }
     }
 }
diff --git a/modules/platforms/cpp/thin-client/src/impl/data_router.h b/modules/platforms/cpp/thin-client/src/impl/data_router.h
index d11cfc5..aa31f39 100644
--- a/modules/platforms/cpp/thin-client/src/impl/data_router.h
+++ b/modules/platforms/cpp/thin-client/src/impl/data_router.h
@@ -29,6 +29,7 @@
 #include <ignite/thin/ignite_client_configuration.h>
 
 #include <ignite/common/concurrent.h>
+#include <ignite/common/thread_pool.h>
 #include <ignite/common/promise.h>
 #include <ignite/network/end_point.h>
 #include <ignite/network/tcp_range.h>
@@ -148,6 +149,14 @@ namespace ignite
                 virtual void OnHandshakeError(uint64_t id, const IgniteError& err);
 
                 /**
+                 * Called if notification handling failed.
+                 *
+                 * @param id Channel ID.
+                 * @param err Error.
+                 */
+                virtual void OnNotificationHandlingError(uint64_t id, const IgniteError& err);
+
+                /**
                  * Synchronously send request message and receive response.
                  *
                  * @param req Request message.
@@ -313,6 +322,23 @@ namespace ignite
                  */
                 void CheckHandshakeErrorLocked();
 
+                /**
+                 * Find channel by ID.
+                 *
+                 * @param id Channel ID
+                 * @return Channel or null if is not present.
+                 */
+                SP_DataChannel FindChannel(uint64_t id);
+
+                /**
+                 * Find channel by ID.
+                 * @warning May only be called when lock is held!
+                 *
+                 * @param id Channel ID
+                 * @return Channel or null if is not present.
+                 */
+                SP_DataChannel FindChannelLocked(uint64_t id);
+
                 /** Configuration. */
                 ignite::thin::IgniteClientConfiguration config;
 
@@ -348,6 +374,9 @@ namespace ignite
 
                 /** Cache affinity manager. */
                 affinity::AffinityManager affinityManager;
+
+                /** Thread pool to dispatch user code execution. */
+                common::ThreadPool userThreadPool;
             };
 
             /** Shared pointer type. */
diff --git a/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.cpp
index 6ca9ab4..f153579 100644
--- a/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.cpp
@@ -41,7 +41,9 @@ namespace ignite
 
             IgniteClientImpl::~IgniteClientImpl()
             {
-                // No-op.
+                DataRouter* router0 = router.Get();
+                if (router0)
+                    router0->Close();
             }
 
             void IgniteClientImpl::Start()
@@ -109,7 +111,7 @@ namespace ignite
 
             void IgniteClientImpl::GetCacheNames(std::vector<std::string>& cacheNames)
             {
-                RequestAdapter<RequestType::CACHE_GET_NAMES> req;
+                RequestAdapter<MessageType::CACHE_GET_NAMES> req;
                 GetCacheNamesResponse rsp(cacheNames);
 
                 router.Get()->SyncMessage(req, rsp);
diff --git a/modules/platforms/cpp/thin-client/src/impl/message.cpp b/modules/platforms/cpp/thin-client/src/impl/message.cpp
index 0c0e6ee..8390e2d 100644
--- a/modules/platforms/cpp/thin-client/src/impl/message.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/message.cpp
@@ -20,6 +20,7 @@
 
 #include <ignite/impl/thin/writable.h>
 #include <ignite/impl/thin/readable.h>
+#include <ignite/impl/thin/cache/continuous/continuous_query_client_holder.h>
 
 #include "impl/response_status.h"
 #include "impl/data_channel.h"
@@ -37,6 +38,11 @@ namespace ignite
                 // No-op.
             }
 
+            void ResourceCloseRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolVersion&) const
+            {
+                writer.WriteInt64(id);
+            }
+
             void CachePartitionsRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolVersion&) const
             {
                 writer.WriteInt32(static_cast<int32_t>(cacheIds.size()));
@@ -116,30 +122,6 @@ namespace ignite
                 return (flags & Flag::FAILURE) != 0;
             }
 
-            ClientCacheNodePartitionsResponse::ClientCacheNodePartitionsResponse(
-                std::vector<NodePartitions>& nodeParts):
-                nodeParts(nodeParts)
-            {
-                // No-op.
-            }
-
-            ClientCacheNodePartitionsResponse::~ClientCacheNodePartitionsResponse()
-            {
-                // No-op.
-            }
-
-            void ClientCacheNodePartitionsResponse::ReadOnSuccess(
-                binary::BinaryReaderImpl& reader, const ProtocolVersion&)
-            {
-                int32_t num = reader.ReadInt32();
-
-                nodeParts.clear();
-                nodeParts.resize(static_cast<size_t>(num));
-
-                for (int32_t i = 0; i < num; ++i)
-                    nodeParts[i].Read(reader);
-            }
-
             CachePartitionsResponse::CachePartitionsResponse(std::vector<PartitionAwarenessGroup>& groups) :
                 groups(groups)
             {
@@ -274,7 +256,7 @@ namespace ignite
             }
 
             CacheGetSizeRequest::CacheGetSizeRequest(int32_t cacheId, bool binary, int32_t peekModes) :
-                CacheRequest<RequestType::CACHE_GET_SIZE>(cacheId, binary),
+                CacheRequest<MessageType::CACHE_GET_SIZE>(cacheId, binary),
                 peekModes(peekModes)
             {
                 // No-op.
@@ -282,7 +264,7 @@ namespace ignite
 
             void CacheGetSizeRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolVersion& ver) const
             {
-                CacheRequest<RequestType::CACHE_GET_SIZE>::Write(writer, ver);
+                CacheRequest<MessageType::CACHE_GET_SIZE>::Write(writer, ver);
 
                 if (peekModes & ignite::thin::cache::CachePeekMode::ALL)
                 {
@@ -335,7 +317,7 @@ namespace ignite
                 int32_t cacheId,
                 const ignite::thin::cache::query::SqlFieldsQuery &qry
                 ) :
-                CacheRequest<RequestType::QUERY_SQL_FIELDS>(cacheId, false),
+                CacheRequest<MessageType::QUERY_SQL_FIELDS>(cacheId, false),
                 qry(qry)
             {
                 // No-op.
@@ -343,7 +325,7 @@ namespace ignite
 
             void SqlFieldsQueryRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolVersion& ver) const
             {
-                CacheRequest<RequestType::QUERY_SQL_FIELDS>::Write(writer, ver);
+                CacheRequest<MessageType::QUERY_SQL_FIELDS>::Write(writer, ver);
 
                 if (qry.schema.empty())
                     writer.WriteNull();
@@ -395,11 +377,28 @@ namespace ignite
                 writer.WriteInt64(cursorId);
             }
 
-            void SqlFieldsCursorGetPageResponse::ReadOnSuccess(binary::BinaryReaderImpl&reader, const ProtocolVersion&)
+            void SqlFieldsCursorGetPageResponse::ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolVersion&)
             {
                 cursorPage.Get()->Read(reader);
             }
 
+            void ContinuousQueryRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolVersion& ver) const
+            {
+                CacheRequest<MessageType::QUERY_CONTINUOUS>::Write(writer, ver);
+
+                writer.WriteInt32(pageSize);
+                writer.WriteInt64(timeInterval);
+                writer.WriteBool(includeExpired);
+
+                // TODO: IGNITE-16291: Implement remote filters for Continuous Queries.
+                writer.WriteNull();
+            }
+
+            void ContinuousQueryResponse::ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolVersion&)
+            {
+                queryId = reader.ReadInt64();
+            }
+
             void ComputeTaskExecuteRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolVersion&) const
             {
                 // To be changed when Cluster API is implemented.
@@ -417,31 +416,15 @@ namespace ignite
                 taskId = reader.ReadInt64();
             }
 
-            void ComputeTaskFinishedNotification::Read(binary::BinaryReaderImpl& reader, const ProtocolVersion&)
+            void ComputeTaskFinishedNotification::ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolVersion&)
             {
-                int16_t flags = reader.ReadInt16();
-                if (!(flags & Flag::NOTIFICATION))
-                {
-                    IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_GENERIC, "Was expecting notification but got "
-                        "different kind of message", "flags", flags)
-                }
-
-                int16_t opCode = reader.ReadInt16();
-                if (opCode != RequestType::COMPUTE_TASK_FINISHED)
-                {
-                    IGNITE_ERROR_FORMATTED_2(IgniteError::IGNITE_ERR_GENERIC, "Unexpected notification type",
-                        "expected", (int)RequestType::COMPUTE_TASK_FINISHED, "actual", opCode)
-                }
+                result.Read(reader);
+            }
 
-                if (flags & Flag::FAILURE)
-                {
-                    status = reader.ReadInt32();
-                    reader.ReadString(errorMessage);
-                }
-                else
-                {
-                    result.Read(reader);
-                }
+            void ClientCacheEntryEventNotification::ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolVersion&)
+            {
+                ignite::binary::BinaryRawReader reader0(&reader);
+                query.ReadAndProcessEvents(reader0);
             }
         }
     }
diff --git a/modules/platforms/cpp/thin-client/src/impl/message.h b/modules/platforms/cpp/thin-client/src/impl/message.h
index 4f9e920..dad00c7 100644
--- a/modules/platforms/cpp/thin-client/src/impl/message.h
+++ b/modules/platforms/cpp/thin-client/src/impl/message.h
@@ -53,6 +53,18 @@ namespace ignite
             /* Forward declaration. */
             class Writable;
 
+            namespace cache
+            {
+                namespace query
+                {
+                    namespace continuous
+                    {
+                        /* Forward declaration. */
+                        class ContinuousQueryClientHolderBase;
+                    }
+                }
+            }
+
             struct ClientType
             {
                 enum Type
@@ -61,7 +73,7 @@ namespace ignite
                 };
             };
 
-            struct RequestType
+            struct MessageType
             {
                 enum Type
                 {
@@ -158,6 +170,12 @@ namespace ignite
                     /** SQL fields query get next cursor page request. */
                     QUERY_SQL_FIELDS_CURSOR_GET_PAGE = 2005,
 
+                    /** Continuous query. */
+                    QUERY_CONTINUOUS = 2006,
+
+                    /** Continuous query notification event. */
+                    QUERY_CONTINUOUS_EVENT_NOTIFICATION = 2007,
+
                     /** Get binary type info. */
                     GET_BINARY_TYPE = 3002,
 
@@ -292,7 +310,44 @@ namespace ignite
             /**
              * Cache partitions request.
              */
-            class CachePartitionsRequest : public RequestAdapter<RequestType::CACHE_PARTITIONS>
+            class ResourceCloseRequest : public RequestAdapter<MessageType::RESOURCE_CLOSE>
+            {
+            public:
+                /**
+                 * Constructor.
+                 *
+                 * @param id Resource ID.
+                 */
+                ResourceCloseRequest(int64_t id) :
+                    id(id)
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Destructor.
+                 */
+                virtual ~ResourceCloseRequest()
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Write request using provided writer.
+                 *
+                 * @param writer Writer.
+                 */
+                virtual void Write(binary::BinaryWriterImpl& writer, const ProtocolVersion&) const;
+
+            private:
+                /** Resource ID. */
+                const int64_t id;
+            };
+
+            /**
+             * Cache partitions request.
+             */
+            class CachePartitionsRequest : public RequestAdapter<MessageType::CACHE_PARTITIONS>
             {
             public:
                 /**
@@ -324,7 +379,7 @@ namespace ignite
             /**
              * Get or create cache request.
              */
-            class GetOrCreateCacheWithNameRequest : public RequestAdapter<RequestType::CACHE_GET_OR_CREATE_WITH_NAME>
+            class GetOrCreateCacheWithNameRequest : public RequestAdapter<MessageType::CACHE_GET_OR_CREATE_WITH_NAME>
             {
             public:
                 /**
@@ -357,7 +412,7 @@ namespace ignite
             /**
              * Get or create cache request.
              */
-            class CreateCacheWithNameRequest : public RequestAdapter<RequestType::CACHE_CREATE_WITH_NAME>
+            class CreateCacheWithNameRequest : public RequestAdapter<MessageType::CACHE_CREATE_WITH_NAME>
             {
             public:
                 /**
@@ -390,7 +445,7 @@ namespace ignite
             /**
              * Destroy cache request.
              */
-            class DestroyCacheRequest : public RequestAdapter<RequestType::CACHE_DESTROY>
+            class DestroyCacheRequest : public RequestAdapter<MessageType::CACHE_DESTROY>
             {
             public:
                 /**
@@ -504,7 +559,7 @@ namespace ignite
             /**
              * Cache get size request.
              */
-            class CacheGetSizeRequest : public CacheRequest<RequestType::CACHE_GET_SIZE>
+            class CacheGetSizeRequest : public CacheRequest<MessageType::CACHE_GET_SIZE>
             {
             public:
                 /**
@@ -698,7 +753,7 @@ namespace ignite
             /**
              * Tx start request.
              */
-            class TxStartRequest : public RequestAdapter<RequestType::OP_TX_START>
+            class TxStartRequest : public RequestAdapter<MessageType::OP_TX_START>
             {
             public:
                 /**
@@ -755,7 +810,7 @@ namespace ignite
             /**
              * Tx end request.
              */
-            class TxEndRequest : public RequestAdapter<RequestType::OP_TX_END>
+            class TxEndRequest : public RequestAdapter<MessageType::OP_TX_END>
             {
             public:
                 /**
@@ -800,7 +855,7 @@ namespace ignite
             /**
              * Cache get binary type request.
              */
-            class BinaryTypeGetRequest : public RequestAdapter<RequestType::GET_BINARY_TYPE>
+            class BinaryTypeGetRequest : public RequestAdapter<MessageType::GET_BINARY_TYPE>
             {
             public:
                 /**
@@ -837,7 +892,7 @@ namespace ignite
             /**
              * Cache put binary type request.
              */
-            class BinaryTypePutRequest : public RequestAdapter<RequestType::PUT_BINARY_TYPE>
+            class BinaryTypePutRequest : public RequestAdapter<MessageType::PUT_BINARY_TYPE>
             {
             public:
                 /**
@@ -874,7 +929,7 @@ namespace ignite
             /**
              * Cache SQL fields query request.
              */
-            class SqlFieldsQueryRequest : public CacheRequest<RequestType::QUERY_SQL_FIELDS>
+            class SqlFieldsQueryRequest : public CacheRequest<MessageType::QUERY_SQL_FIELDS>
             {
             public:
                 /**
@@ -908,7 +963,7 @@ namespace ignite
             /**
              * Cache SQL fields cursor get page request.
              */
-            class SqlFieldsCursorGetPageRequest : public RequestAdapter<RequestType::QUERY_SQL_FIELDS_CURSOR_GET_PAGE>
+            class SqlFieldsCursorGetPageRequest : public RequestAdapter<MessageType::QUERY_SQL_FIELDS_CURSOR_GET_PAGE>
             {
             public:
                 /**
@@ -943,9 +998,58 @@ namespace ignite
             };
 
             /**
+             * Continuous query request.
+             */
+            class ContinuousQueryRequest : public CacheRequest<MessageType::QUERY_CONTINUOUS>
+            {
+            public:
+                /**
+                 * Constructor.
+                 *
+                 * @param cacheId Cache ID.
+                 * @param pageSize Page size.
+                 * @param timeInterval Time interval.
+                 * @param includeExpired Include expired.
+                 */
+                explicit ContinuousQueryRequest(int32_t cacheId, int32_t pageSize, int64_t timeInterval, bool includeExpired) :
+                    CacheRequest(cacheId, false),
+                    pageSize(pageSize),
+                    timeInterval(timeInterval),
+                    includeExpired(includeExpired)
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Destructor.
+                 */
+                virtual ~ContinuousQueryRequest()
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Write request using provided writer.
+                 * @param writer Writer.
+                 * @param ver Version.
+                 */
+                virtual void Write(binary::BinaryWriterImpl& writer, const ProtocolVersion& ver) const;
+
+            private:
+                /** Page size. */
+                const int32_t pageSize;
+
+                /** Time interval. */
+                const int64_t timeInterval;
+
+                /** Include expired. */
+                const bool includeExpired;
+            };
+
+            /**
              * Compute task execute request.
              */
-            class ComputeTaskExecuteRequest : public RequestAdapter<RequestType::COMPUTE_TASK_EXECUTE>
+            class ComputeTaskExecuteRequest : public RequestAdapter<MessageType::COMPUTE_TASK_EXECUTE>
             {
             public:
                 /**
@@ -1016,7 +1120,7 @@ namespace ignite
                  * @param reader Reader.
                  * @param ver Protocol version.
                  */
-                void Read(binary::BinaryReaderImpl& reader, const ProtocolVersion& ver);
+                virtual void Read(binary::BinaryReaderImpl& reader, const ProtocolVersion& ver);
 
                 /**
                  * Get request processing status.
@@ -1072,7 +1176,6 @@ namespace ignite
                     // No-op.
                 }
 
-            private:
                 /** Flags. */
                 int16_t flags;
 
@@ -1089,36 +1192,6 @@ namespace ignite
             /**
              * Cache node list request.
              */
-            class ClientCacheNodePartitionsResponse : public Response
-            {
-            public:
-                /**
-                 * Constructor.
-                 *
-                 * @param nodeParts Node partitions.
-                 */
-                ClientCacheNodePartitionsResponse(std::vector<NodePartitions>& nodeParts);
-
-                /**
-                 * Destructor.
-                 */
-                virtual ~ClientCacheNodePartitionsResponse();
-
-                /**
-                 * Read data if response status is ResponseStatus::SUCCESS.
-                 *
-                 * @param reader Reader.
-                 */
-                virtual void ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolVersion&);
-
-            private:
-                /** Node partitions. */
-                std::vector<NodePartitions>& nodeParts;
-            };
-
-            /**
-             * Cache node list request.
-             */
             class CachePartitionsResponse : public Response
             {
             public:
@@ -1524,6 +1597,50 @@ namespace ignite
             };
 
             /**
+             * Cache Continuous Query response.
+             */
+            class ContinuousQueryResponse : public Response
+            {
+            public:
+                /**
+                 * Constructor.
+                 */
+                ContinuousQueryResponse() :
+                    queryId(0)
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Destructor.
+                 */
+                virtual ~ContinuousQueryResponse()
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Get cursor page.
+                 * @return Cursor page.
+                 */
+                int64_t GetQueryId() const
+                {
+                    return queryId;
+                }
+
+                /**
+                 * Read data if response status is ResponseStatus::SUCCESS.
+                 *
+                 * @param reader Reader.
+                 */
+                virtual void ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolVersion&);
+
+            private:
+                /** Query ID. */
+                int64_t queryId;
+            };
+
+            /**
              * Compute task execute response.
              */
             class ComputeTaskExecuteResponse : public Response
@@ -1570,17 +1687,107 @@ namespace ignite
             /**
              * Compute task finished notification.
              */
-            class ComputeTaskFinishedNotification
+            class Notification : public Response
+            {
+            public:
+                /**
+                 * Constructor.
+                 */
+                Notification()
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Destructor.
+                 */
+                virtual ~Notification()
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Read notification data.
+                 *
+                 * @param reader Reader.
+                 */
+                virtual void Read(binary::BinaryReaderImpl& reader, const ProtocolVersion& ver)
+                {
+                    flags = reader.ReadInt16();
+
+                    int16_t readOpCode = reader.ReadInt16();
+                    if (readOpCode != GetOperationCode())
+                    {
+                        IGNITE_ERROR_FORMATTED_2(IgniteError::IGNITE_ERR_GENERIC, "Unexpected notification type",
+                             "expected", GetOperationCode(), "actual", readOpCode)
+                    }
+
+                    if (IsFailure())
+                    {
+                        status = reader.ReadInt32();
+                        reader.ReadString(error);
+
+                        return;
+                    }
+
+                    ReadOnSuccess(reader, ver);
+                }
+
+                /**
+                 * Get operation code.
+                 *
+                 * @return Operation code.
+                 */
+                virtual int16_t GetOperationCode() const = 0;
+
+            protected:
+                /**
+                 * Read data if response status is ResponseStatus::SUCCESS.
+                 *
+                 * @param reader Reader.
+                 * @param ver Version.
+                 */
+                virtual void ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolVersion& ver) = 0;
+            };
+
+            /**
+             * Request adapter.
+             *
+             * @tparam OpCode Operation code.
+             */
+            template<int16_t OpCode>
+            class NotificationAdapter : public Notification
             {
             public:
-                typedef ComputeTaskExecuteResponse ResponseType;
+                /**
+                 * Destructor.
+                 */
+                virtual ~NotificationAdapter()
+                {
+                    // No-op.
+                }
 
                 /**
+                 * Get operation code.
+                 *
+                 * @return Operation code.
+                 */
+                virtual int16_t GetOperationCode() const
+                {
+                    return OpCode;
+                }
+            };
+
+            /**
+             * Compute task finished notification.
+             */
+            class ComputeTaskFinishedNotification : public NotificationAdapter<MessageType::COMPUTE_TASK_FINISHED>
+            {
+            public:
+                /**
                  * Constructor.
                  */
-                ComputeTaskFinishedNotification(Readable& result) :
-                    status(0),
-                    errorMessage(),
+                explicit ComputeTaskFinishedNotification(Readable& result) :
                     result(result)
                 {
                     // No-op.
@@ -1595,39 +1802,52 @@ namespace ignite
                 }
 
                 /**
-                 * Check if the message is failure.
-                 * @return @c true on failure.
+                 * Read data if response status is ResponseStatus::SUCCESS.
+                 *
+                 * @param reader Reader.
+                 * @param ver Version.
+                 */
+                virtual void ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolVersion& ver);
+
+            private:
+                /** Result. */
+                Readable& result;
+            };
+
+            /**
+             * Continuous query notification.
+             */
+            class ClientCacheEntryEventNotification : public NotificationAdapter<MessageType::QUERY_CONTINUOUS_EVENT_NOTIFICATION>
+            {
+            public:
+                /**
+                 * Constructor.
                  */
-                bool IsFailure() const
+                explicit ClientCacheEntryEventNotification(cache::query::continuous::ContinuousQueryClientHolderBase& query) :
+                    query(query)
                 {
-                    return !errorMessage.empty();
+                    // No-op.
                 }
 
                 /**
-                 * Get error message.
-                 * @return Error message.
+                 * Destructor.
                  */
-                const std::string& GetErrorMessage() const
+                virtual ~ClientCacheEntryEventNotification()
                 {
-                    return errorMessage;
+                    // No-op.
                 }
 
                 /**
-                 * Read response using provided reader.
+                 * Read data if response status is ResponseStatus::SUCCESS.
+                 *
                  * @param reader Reader.
-                 * @param ver Protocol version.
+                 * @param ver Version.
                  */
-                void Read(binary::BinaryReaderImpl& reader, const ProtocolVersion& ver);
+                virtual void ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolVersion& ver);
 
             private:
-                /** Status. */
-                int32_t status;
-
-                /** Error message. */
-                std::string errorMessage;
-
                 /** Result. */
-                Readable& result;
+                cache::query::continuous::ContinuousQueryClientHolderBase& query;
             };
         }
     }
diff --git a/modules/platforms/cpp/thin-client/src/impl/notification_handler.h b/modules/platforms/cpp/thin-client/src/impl/notification_handler.h
index c2acc5b..de9e88c 100644
--- a/modules/platforms/cpp/thin-client/src/impl/notification_handler.h
+++ b/modules/platforms/cpp/thin-client/src/impl/notification_handler.h
@@ -23,6 +23,7 @@
 #include <vector>
 
 #include <ignite/ignite_error.h>
+#include <ignite/common/thread_pool.h>
 #include <ignite/network/data_buffer.h>
 
 #include <ignite/impl/interop/interop_memory.h>
@@ -51,12 +52,137 @@ namespace ignite
                  * @param msg Message.
                  * @return @c true if processing complete.
                  */
-                virtual bool OnNotification(const network::DataBuffer& msg) = 0;
+                virtual void OnNotification(const network::DataBuffer& msg) = 0;
+
+                /**
+                 * Disconnected callback.
+                 *
+                 * Called if channel was disconnected.
+                 */
+                virtual void OnDisconnected() = 0;
             };
 
             /** Shared pointer to notification handler. */
             typedef common::concurrent::SharedPointer<NotificationHandler> SP_NotificationHandler;
 
+            /**
+             * Task that handles notification
+             */
+            class HandleNotificationTask : public common::ThreadPoolTask
+            {
+            public:
+                /**
+                 * Constructor.
+                 *
+                 * @param msg Message.
+                 * @param handler Notification handler.
+                 * @param channelId Channel ID.
+                 * @param channelStateHandler Channel state handler.
+                 */
+                HandleNotificationTask(
+                    const network::DataBuffer& msg,
+                    const SP_NotificationHandler& handler,
+                    uint64_t channelId,
+                    ChannelStateHandler& channelStateHandler
+                ) :
+                    msg(msg),
+                    handler(handler),
+                    channelId(channelId),
+                    channelStateHandler(channelStateHandler)
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Destructor.
+                 */
+                virtual ~HandleNotificationTask()
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Execute task.
+                 */
+                virtual void Execute()
+                {
+                    handler.Get()->OnNotification(msg);
+                }
+
+                /**
+                 * Called if error occurred during task processing.
+                 *
+                 * @param err Error.
+                 */
+                virtual void OnError(const IgniteError& err)
+                {
+                    channelStateHandler.OnNotificationHandlingError(channelId, err);
+                }
+
+            private:
+                /** Message. */
+                network::DataBuffer msg;
+
+                /** Handler. */
+                SP_NotificationHandler handler;
+
+                /** Channel ID. */
+                uint64_t channelId;
+
+                /** Channel state handler. */
+                ChannelStateHandler& channelStateHandler;
+            };
+
+            /**
+             * Task that handles connection closing
+             */
+            class DisconnectedTask : public common::ThreadPoolTask
+            {
+            public:
+                /**
+                 * Constructor.
+                 *
+                 * @param handler Notification handler.
+                 */
+                explicit DisconnectedTask(const SP_NotificationHandler& handler) :
+                    handler(handler)
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Destructor.
+                 */
+                virtual ~DisconnectedTask()
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Execute task.
+                 */
+                virtual void Execute()
+                {
+                    handler.Get()->OnDisconnected();
+                }
+
+                /**
+                 * Called if error occurred during task processing.
+                 *
+                 * @param err Error.
+                 */
+                virtual void OnError(const IgniteError&)
+                {
+                    // No-op. Connection already closed so there is not much we can do.
+                    // TODO: Add logging here once it's implemented.
+                }
+
+            private:
+                /** Handler. */
+                SP_NotificationHandler handler;
+            };
+
+
             /** Notification handler. */
             class NotificationHandlerHolder
             {
@@ -68,9 +194,9 @@ namespace ignite
                  * Default constructor.
                  */
                 NotificationHandlerHolder() :
+                    disconnected(false),
                     queue(),
-                    handler(),
-                    complete(false)
+                    handler()
                 {
                     // No-op.
                 }
@@ -87,16 +213,37 @@ namespace ignite
                  * Process notification.
                  *
                  * @param msg Notification message to process.
+                 * @param channelId Channel ID.
+                 * @param channelStateHandler Channel state handler.
+                 * @return Task for dispatching if handler is present and null otherwise.
                  */
-                void ProcessNotification(const network::DataBuffer& msg)
+                common::SP_ThreadPoolTask ProcessNotification(const network::DataBuffer& msg,
+                    uint64_t channelId, ChannelStateHandler& channelStateHandler)
                 {
-                    if (complete)
-                        return;
+                    network::DataBuffer notification(msg.Clone());
 
                     if (handler.IsValid())
-                        complete = handler.Get()->OnNotification(msg);
-                    else
-                        queue.push_back(msg.Clone());
+                        return common::SP_ThreadPoolTask(
+                            new HandleNotificationTask(notification, handler, channelId, channelStateHandler));
+
+                    queue.push_back(notification);
+
+                    return common::SP_ThreadPoolTask();
+                }
+
+                /**
+                 * Process disconnect.
+                 *
+                 * @return Task for dispatching if handler is present and null otherwise.
+                 */
+                common::SP_ThreadPoolTask ProcessClosed()
+                {
+                    disconnected = true;
+
+                    if (handler.IsValid())
+                        return common::SP_ThreadPoolTask(new DisconnectedTask(handler));
+
+                    return common::SP_ThreadPoolTask();
                 }
 
                 /**
@@ -104,38 +251,31 @@ namespace ignite
                  *
                  * @param handler Notification handler.
                  */
-                void SetHandler(const SP_NotificationHandler& handler)
+                void SetHandler(const SP_NotificationHandler& handler0)
                 {
-                    if (this->handler.IsValid())
+                    if (handler.IsValid())
                         throw IgniteError(IgniteError::IGNITE_ERR_GENERIC,
                             "Internal error: handler is already set for the notification");
 
-                    this->handler = handler;
+                    handler = handler0;
                     for (MessageQueue::iterator it = queue.begin(); it != queue.end(); ++it)
-                        complete = complete || this->handler.Get()->OnNotification(*it);
+                        handler.Get()->OnNotification(*it);
 
                     queue.clear();
-                }
 
-                /**
-                 * Check whether processing complete.
-                 *
-                 * @return @c true if processing complete.
-                 */
-                bool IsProcessingComplete() const
-                {
-                    return complete;
+                    if (disconnected)
+                        handler.Get()->OnDisconnected();
                 }
 
             private:
+                /** Disconnected flag. */
+                bool disconnected;
+
                 /** Notification queue. */
                 MessageQueue queue;
 
                 /** Notification handler. */
                 SP_NotificationHandler handler;
-
-                /** Processing complete. */
-                bool complete;
             };
         }
     }