You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by is...@apache.org on 2022/01/14 10:12:37 UTC
[ignite] branch master updated: IGNITE-15766 C++ thin Continuous Queries (#9732)
This is an automated email from the ASF dual-hosted git repository.
isapego pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new fe23f1b IGNITE-15766 C++ thin Continuous Queries (#9732)
fe23f1b is described below
commit fe23f1b0ca08a6741253f2d6ac10dfd488befb7b
Author: Igor Sapego <is...@apache.org>
AuthorDate: Fri Jan 14 12:48:46 2022 +0300
IGNITE-15766 C++ thin Continuous Queries (#9732)
---
modules/platforms/cpp/common/CMakeLists.txt | 1 +
.../cpp/common/include/ignite/common/concurrent.h | 2 +-
.../cpp/common/include/ignite/common/thread_pool.h | 214 +++++++++
.../os/linux/include/ignite/common/concurrent_os.h | 9 +
.../common/os/linux/src/common/concurrent_os.cpp | 9 +
.../os/win/include/ignite/common/concurrent_os.h | 9 +
.../cpp/common/os/win/src/common/concurrent_os.cpp | 11 +-
.../cpp/common/src/common/thread_pool.cpp | 216 +++++++++
.../cache/query/continuous/continuous_query_impl.h | 6 +-
.../platforms/cpp/thin-client-test/CMakeLists.txt | 1 +
.../config/cache-query-continuous-32.xml | 48 ++
.../config/cache-query-continuous-default.xml | 98 ++++
.../config/cache-query-continuous.xml | 30 ++
.../thin-client-test/src/continuous_query_test.cpp | 528 +++++++++++++++++++++
modules/platforms/cpp/thin-client/CMakeLists.txt | 1 +
.../ignite/impl/thin/cache/cache_client_proxy.h | 11 +
.../continuous/continuous_query_client_holder.h | 182 +++++++
.../thin/cache/query/query_fields_cursor_impl.h | 147 ------
.../impl/thin/cache/query/query_fields_row_impl.h | 197 --------
.../include/ignite/thin/cache/cache_client.h | 21 +-
.../include/ignite/thin/cache/cache_entry.h | 168 +++++++
.../ignite/thin/cache/event/cache_entry_event.h | 209 ++++++++
.../thin/cache/event/cache_entry_event_listener.h | 82 ++++
.../query/continuous/continuous_query_client.h | 232 +++++++++
.../query/continuous/continuous_query_handle.h | 78 +++
.../src/impl/cache/cache_client_impl.cpp | 66 ++-
.../thin-client/src/impl/cache/cache_client_impl.h | 12 +
.../src/impl/cache/cache_client_proxy.cpp | 15 +-
.../continuous/continuous_query_handle_impl.h | 90 ++++
.../continuous_query_notification_handler.cpp | 63 +++
.../continuous_query_notification_handler.h | 96 ++++
.../thin-client/src/impl/channel_state_handler.h | 8 +
.../src/impl/compute/compute_client_impl.cpp | 11 +-
.../cpp/thin-client/src/impl/data_channel.cpp | 78 ++-
.../cpp/thin-client/src/impl/data_channel.h | 41 +-
.../cpp/thin-client/src/impl/data_router.cpp | 55 ++-
.../cpp/thin-client/src/impl/data_router.h | 29 ++
.../thin-client/src/impl/ignite_client_impl.cpp | 6 +-
.../platforms/cpp/thin-client/src/impl/message.cpp | 87 ++--
.../platforms/cpp/thin-client/src/impl/message.h | 356 +++++++++++---
.../thin-client/src/impl/notification_handler.h | 190 +++++++-
41 files changed, 3136 insertions(+), 577 deletions(-)
diff --git a/modules/platforms/cpp/common/CMakeLists.txt b/modules/platforms/cpp/common/CMakeLists.txt
index 5469dd6..6e8cad6 100644
--- a/modules/platforms/cpp/common/CMakeLists.txt
+++ b/modules/platforms/cpp/common/CMakeLists.txt
@@ -25,6 +25,7 @@ set(SOURCES src/common/big_integer.cpp
src/common/bits.cpp
src/common/concurrent.cpp
src/common/decimal.cpp
+ src/common/thread_pool.cpp
src/common/utils.cpp
src/date.cpp
src/ignite_error.cpp
diff --git a/modules/platforms/cpp/common/include/ignite/common/concurrent.h b/modules/platforms/cpp/common/include/ignite/common/concurrent.h
index 290efd7..54bf583 100644
--- a/modules/platforms/cpp/common/include/ignite/common/concurrent.h
+++ b/modules/platforms/cpp/common/include/ignite/common/concurrent.h
@@ -293,7 +293,7 @@ namespace ignite
*
* @return Raw pointer.
*/
- const T* Get() const
+ T* Get() const
{
return ptr;
}
diff --git a/modules/platforms/cpp/common/include/ignite/common/thread_pool.h b/modules/platforms/cpp/common/include/ignite/common/thread_pool.h
new file mode 100644
index 0000000..ef05213
--- /dev/null
+++ b/modules/platforms/cpp/common/include/ignite/common/thread_pool.h
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::common::ThreadPool class.
+ */
+
+#ifndef _IGNITE_COMMON_THREAD_POOL
+#define _IGNITE_COMMON_THREAD_POOL
+
+#include <deque>
+#include <vector>
+
+#include <ignite/ignite_error.h>
+
+#include <ignite/common/common.h>
+#include <ignite/common/concurrent.h>
+
+namespace ignite
+{
+ namespace common
+ {
+ /**
+ * Thread pool task.
+ */
+ class ThreadPoolTask
+ {
+ public:
+ /**
+ * Destructor.
+ */
+ virtual ~ThreadPoolTask()
+ {
+ // No-op.
+ }
+
+ /**
+ * Execute task.
+ */
+ virtual void Execute() = 0;
+
+ /**
+ * Called if error occurred during task processing.
+ *
+ * @param err Error.
+ */
+ virtual void OnError(const IgniteError& err) = 0;
+ };
+
+ /** Shared pointer to thread pool task. */
+ typedef concurrent::SharedPointer< ThreadPoolTask > SP_ThreadPoolTask;
+
+ /**
+ * Thread Pool.
+ */
+ class ThreadPool
+ {
+ public:
+ /**
+ * Constructor.
+ *
+ * @param threadsNum Number of threads. If set to 0 current number of processors is used.
+ */
+ IGNITE_IMPORT_EXPORT explicit ThreadPool(uint32_t threadsNum);
+
+ /**
+ * Destructor.
+ */
+ IGNITE_IMPORT_EXPORT virtual ~ThreadPool();
+
+ /**
+ * Start threads in pool.
+ */
+ IGNITE_IMPORT_EXPORT void Start();
+
+ /**
+ * Stop threads in pool.
+ *
+ * @warning Once stopped it can not be restarted.
+ */
+ IGNITE_IMPORT_EXPORT void Stop();
+
+ /**
+ * Dispatch task.
+ *
+ * @param task Task.
+ */
+ IGNITE_IMPORT_EXPORT void Dispatch(const SP_ThreadPoolTask& task);
+
+ private:
+ IGNITE_NO_COPY_ASSIGNMENT(ThreadPool);
+
+ /**
+ * Task queue.
+ */
+ class TaskQueue
+ {
+ public:
+ /**
+ * Constructor.
+ */
+ TaskQueue();
+
+ /**
+ * Destructor.
+ */
+ ~TaskQueue();
+
+ /**
+ * Push a new task to the queue.
+ *
+ * @param task Task. Should not be null.
+ */
+ void Push(const SP_ThreadPoolTask& task);
+
+ /**
+ * Pop a task from the queue.
+ *
+ * @return New task or null when unblocked.
+ */
+ SP_ThreadPoolTask Pop();
+
+ /**
+ * Unblock queue. When unblocked queue will not block or return new tasks.
+ */
+ void Unblock();
+
+ private:
+ /** If true queue will not block. */
+ volatile bool unblocked;
+
+ /** Tasks queue. */
+ std::deque< SP_ThreadPoolTask > tasks;
+
+ /** Critical section. */
+ concurrent::CriticalSection mutex;
+
+ /** Condition variable. */
+ concurrent::ConditionVariable waitPoint;
+ };
+
+ /**
+ * Worker thread.
+ */
+ class WorkerThread : public concurrent::Thread
+ {
+ public:
+ /**
+ * Constructor.
+ *
+ * @param taskQueue Task queue.
+ */
+ explicit WorkerThread(TaskQueue& taskQueue);
+
+ /**
+ * Destructor.
+ */
+ ~WorkerThread();
+
+ private:
+ /**
+ * Run thread.
+ */
+ virtual void Run();
+
+ /** Task queue. */
+ TaskQueue& taskQueue;
+ };
+
+ /**
+ * Handle an error that occurred during task execution.
+ *
+ * @param task Task.
+ * @param err Error.
+ */
+ static void HandleTaskError(ThreadPoolTask &task, const IgniteError &err);
+
+ /** Shared pointer to thread pool worker thread. */
+ typedef concurrent::SharedPointer< WorkerThread > SP_WorkerThread;
+
+ /** Started flag. */
+ bool started;
+
+ /** Stopped flag. */
+ bool stopped;
+
+ /** Task queue. */
+ TaskQueue queue;
+
+ /** Worker Threads. */
+ std::vector<SP_WorkerThread> threads;
+
+ /** Critical section. */
+ concurrent::CriticalSection mutex;
+ };
+ }
+}
+
+#endif //_IGNITE_COMMON_THREAD_POOL
diff --git a/modules/platforms/cpp/common/os/linux/include/ignite/common/concurrent_os.h b/modules/platforms/cpp/common/os/linux/include/ignite/common/concurrent_os.h
index 43ffb02..c8f7293 100644
--- a/modules/platforms/cpp/common/os/linux/include/ignite/common/concurrent_os.h
+++ b/modules/platforms/cpp/common/os/linux/include/ignite/common/concurrent_os.h
@@ -724,6 +724,8 @@ namespace ignite
virtual void Join();
private:
+ IGNITE_NO_COPY_ASSIGNMENT(Thread);
+
/**
* Routine.
* @param lpParam Param.
@@ -734,6 +736,13 @@ namespace ignite
/** Thread handle. */
pthread_t thread;
};
+
+ /**
+ * Get number of logical processors in system.
+ *
+ * @return Number of logical processors.
+ */
+ IGNITE_IMPORT_EXPORT uint32_t GetNumberOfProcessors();
}
}
}
diff --git a/modules/platforms/cpp/common/os/linux/src/common/concurrent_os.cpp b/modules/platforms/cpp/common/os/linux/src/common/concurrent_os.cpp
index ff36c55..0370b57 100644
--- a/modules/platforms/cpp/common/os/linux/src/common/concurrent_os.cpp
+++ b/modules/platforms/cpp/common/os/linux/src/common/concurrent_os.cpp
@@ -15,6 +15,8 @@
* limitations under the License.
*/
+#include <sys/sysinfo.h>
+
#include "ignite/common/concurrent_os.h"
namespace ignite
@@ -236,6 +238,13 @@ namespace ignite
{
pthread_join(thread, 0);
}
+
+ uint32_t GetNumberOfProcessors()
+ {
+ int res = get_nprocs();
+
+ return static_cast<uint32_t>(res < 0 ? 0 : res);
+ }
}
}
}
diff --git a/modules/platforms/cpp/common/os/win/include/ignite/common/concurrent_os.h b/modules/platforms/cpp/common/os/win/include/ignite/common/concurrent_os.h
index 4c0e4a2..25fa5c2 100644
--- a/modules/platforms/cpp/common/os/win/include/ignite/common/concurrent_os.h
+++ b/modules/platforms/cpp/common/os/win/include/ignite/common/concurrent_os.h
@@ -633,6 +633,8 @@ namespace ignite
virtual void Join();
private:
+ IGNITE_NO_COPY_ASSIGNMENT(Thread);
+
/**
* Routine.
* @param lpParam Param.
@@ -643,6 +645,13 @@ namespace ignite
/** Thread handle. */
HANDLE handle;
};
+
+ /**
+ * Get number of logical processors in system.
+ *
+ * @return Number of logical processors.
+ */
+ IGNITE_IMPORT_EXPORT uint32_t GetNumberOfProcessors();
}
}
}
diff --git a/modules/platforms/cpp/common/os/win/src/common/concurrent_os.cpp b/modules/platforms/cpp/common/os/win/src/common/concurrent_os.cpp
index 8d2d29b..02b6834 100644
--- a/modules/platforms/cpp/common/os/win/src/common/concurrent_os.cpp
+++ b/modules/platforms/cpp/common/os/win/src/common/concurrent_os.cpp
@@ -216,7 +216,8 @@ namespace ignite
Thread::~Thread()
{
- CloseHandle(handle);
+ if (handle)
+ CloseHandle(handle);
}
DWORD Thread::ThreadRoutine(LPVOID lpParam)
@@ -239,6 +240,14 @@ namespace ignite
{
WaitForSingleObject(handle, INFINITE);
}
+
+ uint32_t GetNumberOfProcessors()
+ {
+ SYSTEM_INFO info;
+ GetSystemInfo(&info);
+
+ return static_cast<uint32_t>(info.dwNumberOfProcessors < 0 ? 0 : info.dwNumberOfProcessors);
+ }
}
}
}
diff --git a/modules/platforms/cpp/common/src/common/thread_pool.cpp b/modules/platforms/cpp/common/src/common/thread_pool.cpp
new file mode 100644
index 0000000..456c7eb
--- /dev/null
+++ b/modules/platforms/cpp/common/src/common/thread_pool.cpp
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <exception>
+#include <iostream>
+
+#include <ignite/common/thread_pool.h>
+
+namespace ignite
+{
+ namespace common
+ {
+ ThreadPool::ThreadPool(uint32_t threadsNum):
+ started(false),
+ stopped(false),
+ threads()
+ {
+ uint32_t threadToStart = threadsNum != 0 ? threadsNum : concurrent::GetNumberOfProcessors();
+ if (!threadToStart)
+ threadToStart = 2;
+
+ threads.reserve(threadToStart);
+ for (uint32_t i = 0; i < threadToStart; ++i)
+ {
+ SP_WorkerThread thread(new WorkerThread(queue));
+ threads.push_back(thread);
+ }
+ }
+
+ ThreadPool::~ThreadPool()
+ {
+ Stop();
+ }
+
+ void ThreadPool::Start()
+ {
+ concurrent::CsLockGuard guard(mutex);
+
+ if (started)
+ return;
+
+ started = true;
+
+ for (std::vector<SP_WorkerThread>::iterator it = threads.begin(); it != threads.end(); ++it)
+ it->Get()->Start();
+ }
+
+ void ThreadPool::Stop()
+ {
+ concurrent::CsLockGuard guard(mutex);
+
+ if (stopped || !started)
+ return;
+
+ stopped = true;
+
+ queue.Unblock();
+
+ for (std::vector<SP_WorkerThread>::iterator it = threads.begin(); it != threads.end(); ++it)
+ it->Get()->Join();
+ }
+
+ void ThreadPool::Dispatch(const SP_ThreadPoolTask& task)
+ {
+ queue.Push(task);
+ }
+
+ ThreadPool::TaskQueue::TaskQueue() :
+ unblocked(false)
+ {
+ // No-op.
+ }
+
+ ThreadPool::TaskQueue::~TaskQueue()
+ {
+ // No-op.
+ }
+
+ void ThreadPool::TaskQueue::Push(const SP_ThreadPoolTask &task)
+ {
+ if (!task.IsValid())
+ return;
+
+ concurrent::CsLockGuard guard(mutex);
+
+ if (unblocked)
+ {
+ IgniteError err(IgniteError::IGNITE_ERR_GENERIC, "Execution thread pool is stopped");
+ HandleTaskError(*task.Get(), err);
+
+ return;
+ }
+
+ tasks.push_back(task);
+
+ waitPoint.NotifyOne();
+ }
+
+ SP_ThreadPoolTask ThreadPool::TaskQueue::Pop()
+ {
+ concurrent::CsLockGuard guard(mutex);
+ if (unblocked)
+ return SP_ThreadPoolTask();
+
+ while (tasks.empty())
+ {
+ waitPoint.Wait(mutex);
+
+ if (unblocked)
+ return SP_ThreadPoolTask();
+ }
+
+ SP_ThreadPoolTask res = tasks.front();
+ tasks.pop_front();
+ return res;
+ }
+
+ void ThreadPool::TaskQueue::Unblock()
+ {
+ concurrent::CsLockGuard guard(mutex);
+ unblocked = true;
+
+ IgniteError err(IgniteError::IGNITE_ERR_GENERIC, "Execution thread pool is stopped");
+ for (std::deque< SP_ThreadPoolTask >::iterator it = tasks.begin(); it != tasks.end(); ++it)
+ HandleTaskError(*it->Get(), err);
+
+ tasks.clear();
+
+ waitPoint.NotifyAll();
+ }
+
+ ThreadPool::WorkerThread::WorkerThread(TaskQueue& taskQueue) :
+ taskQueue(taskQueue)
+ {
+ // No-op.
+ }
+
+ ThreadPool::WorkerThread::~WorkerThread()
+ {
+ // No-op.
+ }
+
+ void ThreadPool::WorkerThread::Run()
+ {
+ while (true)
+ {
+ SP_ThreadPoolTask task = taskQueue.Pop();
+
+ // Queue is unblocked and workers should stop.
+ if (!task.IsValid())
+ break;
+
+ ThreadPoolTask &task0 = *task.Get();
+
+ try
+ {
+ task0.Execute();
+ }
+ catch (const IgniteError& err)
+ {
+ HandleTaskError(task0, err);
+ }
+ catch (const std::exception& err)
+ {
+ IgniteError err0(IgniteError::IGNITE_ERR_STD, err.what());
+ HandleTaskError(task0, err0);
+ }
+ catch (...)
+ {
+ IgniteError err(IgniteError::IGNITE_ERR_UNKNOWN, "Unknown error occurred when executing task");
+ HandleTaskError(task0, err);
+ }
+ }
+ }
+
+ void ThreadPool::HandleTaskError(ThreadPoolTask &task, const IgniteError &err)
+ {
+ try
+ {
+ task.OnError(err);
+
+ return;
+ }
+ catch (const IgniteError& err0)
+ {
+ std::cerr << "Exception is thrown during handling of exception: "
+ << err0.what() << "Aborting execution" << std::endl;
+ }
+ catch (const std::exception& err0)
+ {
+ std::cerr << "Exception is thrown during handling of exception: "
+ << err0.what() << "Aborting execution" << std::endl;
+ }
+ catch (...)
+ {
+ std::cerr << "Unknown exception is thrown during handling of exception. Aborting execution" << std::endl;
+ }
+
+ std::terminate();
+ }
+ }
+}
diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h b/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h
index d2bf241..ce1a46e 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h
@@ -158,7 +158,7 @@ namespace ignite
* means that time check is disabled and entries will be
* sent only when buffer is full.
*
- * @param val Time interval in miliseconds.
+ * @param val Time interval in milliseconds.
*/
void SetTimeInterval(int64_t val)
{
@@ -223,7 +223,7 @@ namespace ignite
int32_t bufferSize;
/**
- * Time interval in miliseconds. When a cache update
+ * Time interval in milliseconds. When a cache update
* happens, entry is first put into a buffer. Entries from
* buffer will be sent to the master node only if the buffer
* is full (its size can be changed via SetBufferSize) or
@@ -374,7 +374,7 @@ namespace ignite
* Constructor.
*/
template<typename F>
- RemoteFilterHolder(const Reference<F>& filter):
+ explicit RemoteFilterHolder(const Reference<F>& filter):
ContinuousQueryImplBase(false, new event::CacheEntryEventFilterHolder<F>(filter))
{
// No-op.
diff --git a/modules/platforms/cpp/thin-client-test/CMakeLists.txt b/modules/platforms/cpp/thin-client-test/CMakeLists.txt
index 824edd0..17858fe 100644
--- a/modules/platforms/cpp/thin-client-test/CMakeLists.txt
+++ b/modules/platforms/cpp/thin-client-test/CMakeLists.txt
@@ -32,6 +32,7 @@ set(SOURCES src/teamcity/teamcity_boost.cpp
src/teamcity/teamcity_messages.cpp
src/cache_client_test.cpp
src/compute_client_test.cpp
+ src/continuous_query_test.cpp
src/test_utils.cpp
src/ignite_client_test.cpp
src/interop_test.cpp
diff --git a/modules/platforms/cpp/thin-client-test/config/cache-query-continuous-32.xml b/modules/platforms/cpp/thin-client-test/config/cache-query-continuous-32.xml
new file mode 100644
index 0000000..1a3629e
--- /dev/null
+++ b/modules/platforms/cpp/thin-client-test/config/cache-query-continuous-32.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd">
+ <import resource="cache-query-continuous-default.xml"/>
+
+ <bean parent="grid.cfg">
+ <property name="memoryConfiguration">
+ <bean class="org.apache.ignite.configuration.MemoryConfiguration">
+ <property name="systemCacheInitialSize" value="#{10 * 1024 * 1024}"/>
+ <property name="systemCacheMaxSize" value="#{40 * 1024 * 1024}"/>
+ <property name="defaultMemoryPolicyName" value="dfltPlc"/>
+
+ <property name="memoryPolicies">
+ <list>
+ <bean class="org.apache.ignite.configuration.MemoryPolicyConfiguration">
+ <property name="name" value="dfltPlc"/>
+ <property name="maxSize" value="#{100 * 1024 * 1024}"/>
+ <property name="initialSize" value="#{10 * 1024 * 1024}"/>
+ </bean>
+ </list>
+ </property>
+ </bean>
+ </property>
+ </bean>
+</beans>
diff --git a/modules/platforms/cpp/thin-client-test/config/cache-query-continuous-default.xml b/modules/platforms/cpp/thin-client-test/config/cache-query-continuous-default.xml
new file mode 100644
index 0000000..08e9e5f
--- /dev/null
+++ b/modules/platforms/cpp/thin-client-test/config/cache-query-continuous-default.xml
@@ -0,0 +1,98 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean abstract="true" id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="localHost" value="127.0.0.1"/>
+ <property name="connectorConfiguration"><null/></property>
+
+ <property name="clientConnectorConfiguration">
+ <bean class="org.apache.ignite.configuration.ClientConnectorConfiguration">
+ <property name="host" value="127.0.0.1"/>
+ <property name="port" value="11110"/>
+ <property name="portRange" value="10"/>
+ </bean>
+ </property>
+
+ <property name="cacheConfiguration">
+ <list>
+ <bean parent="cache-template">
+ <property name="name" value="transactional_no_backup"/>
+ </bean>
+
+ <bean parent="cache-template">
+ <property name="name" value="with_expiry"/>
+ <property name="expiryPolicyFactory">
+ <bean class="javax.cache.expiry.CreatedExpiryPolicy" factory-method="factoryOf">
+ <constructor-arg>
+ <bean class="javax.cache.expiry.Duration">
+ <constructor-arg value="MILLISECONDS"/>
+ <constructor-arg value="500"/>
+ </bean>
+ </constructor-arg>
+ </bean>
+ </property>
+ </bean>
+ </list>
+ </property>
+
+ <property name="discoverySpi">
+ <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+ <property name="ipFinder">
+ <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+ <property name="addresses">
+ <list>
+ <!-- In distributed environment, replace with actual host IP address. -->
+ <value>127.0.0.1:47500</value>
+ </list>
+ </property>
+ </bean>
+ </property>
+ <property name="socketTimeout" value="300" />
+ </bean>
+ </property>
+ </bean>
+
+ <bean id="cache-template" abstract="true" class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="cacheMode" value="PARTITIONED"/>
+ <property name="atomicityMode" value="TRANSACTIONAL"/>
+ <property name="writeSynchronizationMode" value="FULL_SYNC"/>
+ <property name="backups" value="0"/>
+ <property name="queryEntities">
+ <list>
+ <bean class="org.apache.ignite.cache.QueryEntity">
+ <property name="keyType" value="java.lang.Integer"/>
+ <property name="valueType" value="TestEntry"/>
+
+ <property name="fields">
+ <map>
+ <entry key="value" value="java.lang.Integer"/>
+ </map>
+ </property>
+ </bean>
+ </list>
+ </property>
+ </bean>
+</beans>
diff --git a/modules/platforms/cpp/thin-client-test/config/cache-query-continuous.xml b/modules/platforms/cpp/thin-client-test/config/cache-query-continuous.xml
new file mode 100644
index 0000000..1c4e275
--- /dev/null
+++ b/modules/platforms/cpp/thin-client-test/config/cache-query-continuous.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd">
+ <import resource="cache-query-continuous-default.xml"/>
+
+ <bean parent="grid.cfg"/>
+</beans>
diff --git a/modules/platforms/cpp/thin-client-test/src/continuous_query_test.cpp b/modules/platforms/cpp/thin-client-test/src/continuous_query_test.cpp
new file mode 100644
index 0000000..80c0912
--- /dev/null
+++ b/modules/platforms/cpp/thin-client-test/src/continuous_query_test.cpp
@@ -0,0 +1,528 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <deque>
+
+#include <boost/test/unit_test.hpp>
+#include <boost/optional.hpp>
+
+#include <ignite/common/concurrent.h>
+
+#include <ignite/ignition.h>
+#include <ignite/thin/ignite_client.h>
+
+#include <test_utils.h>
+
+using namespace ignite;
+using namespace ignite::thin;
+using namespace ignite::thin::cache;
+using namespace ignite::thin::cache::event;
+using namespace ignite::thin::cache::query;
+using namespace ignite::thin::cache::query::continuous;
+
+using namespace boost::unit_test;
+
+
+/**
+ * Very simple concurrent queue implementation.
+ */
+template<typename T>
+class ConcurrentQueue
+{
+public:
+ /**
+ * Constructor.
+ */
+ ConcurrentQueue()
+ {
+ // No-op.
+ }
+
+ /**
+ * Push next element to queue.
+ *
+ * @param val Value to push.
+ */
+ void Push(const T& val)
+ {
+ common::concurrent::CsLockGuard guard(mutex);
+
+ queue.push_back(val);
+
+ cv.NotifyOne();
+ }
+
+ /**
+ * Pull element from the queue with the specified timeout.
+ *
+ * @param val Value is placed there on success.
+ * @param timeout Timeout in ms.
+ * @return True on success and false on timeout.
+ */
+ bool Pull(T& val, int32_t timeout)
+ {
+ common::concurrent::CsLockGuard guard(mutex);
+
+ if (queue.empty())
+ {
+ bool notified = cv.WaitFor(mutex, timeout);
+
+ if (!notified)
+ return false;
+ }
+
+ assert(!queue.empty());
+
+ val = queue.front();
+
+ queue.pop_front();
+
+ return true;
+ }
+
+private:
+ /** Mutex. */
+ common::concurrent::CriticalSection mutex;
+
+ /** Condition variable. */
+ common::concurrent::ConditionVariable cv;
+
+ /** Queue. */
+ std::deque<T> queue;
+};
+
+/**
+ * Test listener class. Stores events it has been notified about in concurrent
+ * queue so they can be checked later.
+ */
+template<typename K, typename V>
+class Listener : public CacheEntryEventListener<K, V>
+{
+public:
+ enum { DEFAULT_WAIT_TIMEOUT = 1000 };
+
+ /**
+ * Default constructor.
+ */
+ Listener() :
+ disconnected(false),
+ handlingDelay(0)
+ {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param handlingDelay Handling delay.
+ */
+ Listener(int32_t handlingDelay) :
+ disconnected(false),
+ handlingDelay(handlingDelay)
+ {
+ // No-op.
+ }
+
+ /**
+ * Event callback.
+ *
+ * @param evts Events.
+ * @param num Events number.
+ */
+ virtual void OnEvent(const CacheEntryEvent<K, V>* evts, uint32_t num)
+ {
+ for (uint32_t i = 0; i < num; ++i)
+ {
+ if (handlingDelay)
+ boost::this_thread::sleep_for(boost::chrono::milliseconds(handlingDelay));
+
+ eventQueue.Push(evts[i]);
+ }
+ }
+
+ /**
+ * Disconnected callback.
+ *
+ * Called if channel was disconnected. This also means that continuous query was closed and no more
+ * events will be provided for this listener.
+ */
+ virtual void OnDisconnected()
+ {
+ common::concurrent::CsLockGuard guard(disconnectedMutex);
+
+ disconnected = true;
+ disconnectedCv.NotifyAll();
+ }
+
+ /**
+ * Check that next received event contains specific values.
+ *
+ * @param key Key.
+ * @param oldVal Old value.
+ * @param val Current value.
+ * @param eType Event type.
+ */
+ void CheckNextEvent(const K& key, boost::optional<V> oldVal, boost::optional<V> val, CacheEntryEventType::Type eType)
+ {
+ CacheEntryEvent<K, V> event;
+ bool success = eventQueue.Pull(event, DEFAULT_WAIT_TIMEOUT);
+
+ BOOST_REQUIRE(success);
+
+ BOOST_CHECK_EQUAL(event.GetKey(), key);
+ BOOST_CHECK_EQUAL(event.HasOldValue(), oldVal.is_initialized());
+ BOOST_CHECK_EQUAL(event.HasValue(), val.is_initialized());
+ BOOST_CHECK_EQUAL(event.GetEventType(), eType);
+
+ if (oldVal && event.HasOldValue())
+ BOOST_CHECK_EQUAL(event.GetOldValue().value, oldVal->value);
+
+ if (val && event.HasValue())
+ BOOST_CHECK_EQUAL(event.GetValue().value, val->value);
+ }
+
+ /**
+ * Check that there is no nex event in specified period of time.
+ *
+ * @param millis Time span in milliseconds.
+ */
+ void CheckNoEvent(int32_t millis = DEFAULT_WAIT_TIMEOUT)
+ {
+ CacheEntryEvent<K, V> event;
+ bool success = eventQueue.Pull(event, millis);
+
+ BOOST_CHECK(!success);
+ }
+
+ /**
+ * Check that next event is the cache entry expiry event.
+ *
+ * @param key Key.
+ */
+ void CheckExpired(const K& key)
+ {
+ CacheEntryEvent<K, V> event;
+ bool success = eventQueue.Pull(event, DEFAULT_WAIT_TIMEOUT);
+
+ BOOST_CHECK(success);
+
+ BOOST_CHECK_EQUAL(event.GetKey(), key);
+ BOOST_CHECK_EQUAL(event.GetEventType(), CacheEntryEventType::EXPIRED);
+ }
+
+ /**
+ * Make sure that channel is disconnected within specified time.
+ *
+ * @param millis Time span in milliseconds.
+ */
+ void CheckDisconnected(int32_t millis = DEFAULT_WAIT_TIMEOUT)
+ {
+ common::concurrent::CsLockGuard guard(disconnectedMutex);
+
+ if (disconnected)
+ return;
+
+ disconnectedCv.WaitFor(disconnectedMutex, millis);
+ BOOST_CHECK(disconnected);
+ }
+
+private:
+ /** Disconnected Mutex. */
+ common::concurrent::CriticalSection disconnectedMutex;
+
+ /** Disconnected Condition variable. */
+ common::concurrent::ConditionVariable disconnectedCv;
+
+ /** Disconnected flag. */
+ bool disconnected;
+
+ /** Handling delay. */
+ int32_t handlingDelay;
+
+ /** Events queue. */
+ ConcurrentQueue< CacheEntryEvent<K, V> > eventQueue;
+};
+
+/*
+ * Test entry.
+ */
+struct TestEntry
+{
+ /*
+ * Default constructor.
+ */
+ TestEntry() : value(0)
+ {
+ // No-op.
+ }
+
+ /*
+ * Constructor.
+ */
+ explicit TestEntry(int32_t val) : value(val)
+ {
+ // No-op.
+ }
+
+ /* Value */
+ int32_t value;
+};
+
+namespace ignite
+{
+ namespace binary
+ {
+ template<>
+ struct BinaryType<TestEntry> : BinaryTypeDefaultAll<TestEntry>
+ {
+ static void GetTypeName(std::string& dst)
+ {
+ dst = "TestEntry";
+ }
+
+ static void Write(BinaryWriter& writer, const TestEntry& obj)
+ {
+ writer.WriteInt32("value", obj.value);
+ }
+
+ static void Read(BinaryReader& reader, TestEntry& dst)
+ {
+ dst.value = reader.ReadInt32("value");
+ }
+ };
+ }
+}
+
+/**
+ * Test setup fixture.
+ */
+class ContinuousQueryTestSuiteFixture
+{
+public:
+ /**
+ * Constructor.
+ */
+ ContinuousQueryTestSuiteFixture() :
+ node(ignite_test::StartCrossPlatformServerNode("cache-query-continuous.xml", "node-01")),
+ client(),
+ cache()
+ {
+ client = StartClient();
+ cache = GetDefaultCache(client);
+ }
+
+ /**
+ * Destructor.
+ */
+ ~ContinuousQueryTestSuiteFixture()
+ {
+ Ignition::StopAll(false);
+
+ node = Ignite();
+ }
+
+ /**
+ * Start new client.
+ */
+ static IgniteClient StartClient()
+ {
+ IgniteClientConfiguration cfg;
+ cfg.SetEndPoints("127.0.0.1:11110");
+
+ return IgniteClient::Start(cfg);
+ }
+
+ /**
+ * Get test cache using client.
+ *
+ * @param client Client to use.
+ */
+ static CacheClient<int32_t, TestEntry> GetDefaultCache(IgniteClient& client)
+ {
+ return client.GetOrCreateCache<int32_t, TestEntry>("transactional_no_backup");
+ }
+
+ /**
+ * Get cache with configured expiry policy using client.
+ *
+ * @param client Client to use.
+ */
+ static CacheClient<int32_t, TestEntry> GetExpiryCache(IgniteClient& client)
+ {
+ return client.GetOrCreateCache<int32_t, TestEntry>("with_expiry");
+ }
+
+protected:
+ /** Node. */
+ Ignite node;
+
+ /** Client. */
+ IgniteClient client;
+
+ /** Cache client. */
+ CacheClient<int32_t, TestEntry> cache;
+};
+
+void CheckEvents(CacheClient<int32_t, TestEntry>& cache, Listener<int32_t, TestEntry>& listener)
+{
+ cache.Put(1, TestEntry(10));
+ listener.CheckNextEvent(1, boost::none, TestEntry(10), CacheEntryEventType::CREATED);
+
+ cache.Put(1, TestEntry(20));
+ listener.CheckNextEvent(1, TestEntry(10), TestEntry(20), CacheEntryEventType::UPDATED);
+
+ cache.Put(2, TestEntry(20));
+ listener.CheckNextEvent(2, boost::none, TestEntry(20), CacheEntryEventType::CREATED);
+
+ cache.Remove(1);
+ listener.CheckNextEvent(1, TestEntry(20), TestEntry(20), CacheEntryEventType::REMOVED);
+}
+
+BOOST_FIXTURE_TEST_SUITE(ContinuousQueryTestSuite, ContinuousQueryTestSuiteFixture)
+
+BOOST_AUTO_TEST_CASE(TestBasic)
+{
+ Listener<int32_t, TestEntry> listener;
+
+ ContinuousQueryClient<int32_t, TestEntry> qry(MakeReference(listener));
+
+ ContinuousQueryHandleClient handle = cache.QueryContinuous(qry);
+
+ CheckEvents(cache, listener);
+}
+
+BOOST_AUTO_TEST_CASE(TestExpiredQuery)
+{
+ Listener<int32_t, TestEntry> listener;
+ ContinuousQueryHandleClient handle;
+
+ {
+ // Query scope.
+ ContinuousQueryClient<int32_t, TestEntry> qry(MakeReference(listener));
+
+ handle = cache.QueryContinuous(qry);
+ }
+
+ // Query is destroyed here.
+
+ CheckEvents(cache, listener);
+}
+
+BOOST_AUTO_TEST_CASE(TestExpiredEventsReceived)
+{
+ cache = GetExpiryCache(client);
+
+ Listener<int32_t, TestEntry> listener;
+
+ ContinuousQueryClient<int32_t, TestEntry> qry(MakeReference(listener));
+ qry.SetIncludeExpired(true);
+
+ ContinuousQueryHandleClient handle = cache.QueryContinuous(qry);
+
+ cache.Put(1, TestEntry(10));
+ listener.CheckNextEvent(1, boost::none, TestEntry(10), CacheEntryEventType::CREATED);
+ listener.CheckNoEvent(100);
+ listener.CheckExpired(1);
+}
+
+BOOST_AUTO_TEST_CASE(TestExpiredEventsNotReceived)
+{
+ cache = GetExpiryCache(client);
+
+ Listener<int32_t, TestEntry> listener;
+
+ ContinuousQueryClient<int32_t, TestEntry> qry(MakeReference(listener));
+ qry.SetIncludeExpired(false);
+
+ ContinuousQueryHandleClient handle = cache.QueryContinuous(qry);
+
+ cache.Put(1, TestEntry(10));
+ listener.CheckNextEvent(1, boost::none, TestEntry(10), CacheEntryEventType::CREATED);
+ listener.CheckNoEvent();
+}
+
+BOOST_AUTO_TEST_CASE(TestGetSetBufferSize)
+{
+ typedef ContinuousQueryClient<int32_t, TestEntry> QueryType;
+ Listener<int32_t, TestEntry> listener;
+
+ ContinuousQueryClient<int32_t, TestEntry> qry(MakeReference(listener));
+
+ BOOST_CHECK_EQUAL(qry.GetBufferSize(), (int32_t) QueryType::DEFAULT_BUFFER_SIZE);
+
+ qry.SetBufferSize(2 * QueryType::DEFAULT_BUFFER_SIZE);
+
+ BOOST_CHECK_EQUAL(qry.GetBufferSize(), 2 * QueryType::DEFAULT_BUFFER_SIZE);
+
+ ContinuousQueryHandleClient handle = cache.QueryContinuous(qry);
+
+ BOOST_CHECK_EQUAL(qry.GetBufferSize(), 2 * QueryType::DEFAULT_BUFFER_SIZE);
+
+ CheckEvents(cache, listener);
+}
+
+BOOST_AUTO_TEST_CASE(TestGetSetTimeInterval)
+{
+ typedef ContinuousQueryClient<int32_t, TestEntry> QueryType;
+ Listener<int32_t, TestEntry> listener;
+
+ ContinuousQueryClient<int32_t, TestEntry> qry(MakeReference(listener));
+
+ qry.SetBufferSize(10);
+
+ BOOST_CHECK_EQUAL(qry.GetTimeInterval(), static_cast<int>(QueryType::DEFAULT_TIME_INTERVAL));
+
+ qry.SetTimeInterval(500);
+
+ BOOST_CHECK_EQUAL(qry.GetTimeInterval(), 500);
+
+ ContinuousQueryHandleClient handle = cache.QueryContinuous(qry);
+
+ BOOST_CHECK_EQUAL(qry.GetTimeInterval(), 500);
+
+ CheckEvents(cache, listener);
+}
+
+BOOST_AUTO_TEST_CASE(TestPublicPrivateConstantsConsistence)
+{
+ typedef ContinuousQueryClient<int32_t, TestEntry> QueryType;
+ typedef impl::cache::query::continuous::ContinuousQueryImpl<int, TestEntry> QueryImplType;
+
+ BOOST_CHECK_EQUAL(static_cast<int>(QueryImplType::DEFAULT_TIME_INTERVAL),
+ static_cast<int>(QueryType::DEFAULT_TIME_INTERVAL));
+
+ BOOST_CHECK_EQUAL(static_cast<int>(QueryImplType::DEFAULT_BUFFER_SIZE),
+ static_cast<int>(QueryType::DEFAULT_BUFFER_SIZE));
+}
+
+BOOST_AUTO_TEST_CASE(TestLongEventsProcessingDisconnect)
+{
+ boost::shared_ptr< Listener<int32_t, TestEntry> > listener(new Listener<int32_t, TestEntry>(200));
+
+ ContinuousQueryClient<int32_t, TestEntry> qry(MakeReferenceFromSmartPointer(listener));
+
+ ContinuousQueryHandleClient handle = cache.QueryContinuous(qry);
+
+ for (int32_t i = 0; i < 20; ++i)
+ cache.Put(i, TestEntry(i * 10));
+
+ Ignition::Stop(node.GetName(), true);
+
+ listener->CheckDisconnected();
+}
+
+BOOST_AUTO_TEST_SUITE_END()
diff --git a/modules/platforms/cpp/thin-client/CMakeLists.txt b/modules/platforms/cpp/thin-client/CMakeLists.txt
index 29dadca..20d52ec 100644
--- a/modules/platforms/cpp/thin-client/CMakeLists.txt
+++ b/modules/platforms/cpp/thin-client/CMakeLists.txt
@@ -30,6 +30,7 @@ set(SOURCES src/impl/data_channel.cpp
src/impl/affinity/affinity_topology_version.cpp
src/impl/affinity/affinity_assignment.cpp
src/impl/affinity/affinity_manager.cpp
+ src/impl/cache/query/continuous/continuous_query_notification_handler.cpp
src/impl/remote_type_updater.cpp
src/impl/message.cpp
src/impl/cache/cache_client_proxy.cpp
diff --git a/modules/platforms/cpp/thin-client/include/ignite/impl/thin/cache/cache_client_proxy.h b/modules/platforms/cpp/thin-client/include/ignite/impl/thin/cache/cache_client_proxy.h
index da5f42b..01ee395 100644
--- a/modules/platforms/cpp/thin-client/include/ignite/impl/thin/cache/cache_client_proxy.h
+++ b/modules/platforms/cpp/thin-client/include/ignite/impl/thin/cache/cache_client_proxy.h
@@ -23,6 +23,8 @@
#include <ignite/thin/cache/query/query_fields_cursor.h>
#include <ignite/thin/cache/query/query_sql_fields.h>
+#include <ignite/impl/thin/cache/continuous/continuous_query_client_holder.h>
+
namespace ignite
{
namespace impl
@@ -298,6 +300,15 @@ namespace ignite
const ignite::thin::cache::query::SqlFieldsQuery& qry);
/**
+ * Starts the continuous query execution
+ *
+ * @param continuousQuery Continuous query.
+ * @return Query handle. Once all instances are destroyed query execution stopped.
+ */
+ ignite::thin::cache::query::continuous::ContinuousQueryHandleClient QueryContinuous(
+ const query::continuous::SP_ContinuousQueryClientHolderBase& continuousQuery);
+
+ /**
* Get from CacheClient.
* Use for testing purposes only.
*/
diff --git a/modules/platforms/cpp/thin-client/include/ignite/impl/thin/cache/continuous/continuous_query_client_holder.h b/modules/platforms/cpp/thin-client/include/ignite/impl/thin/cache/continuous/continuous_query_client_holder.h
new file mode 100644
index 0000000..13dee90
--- /dev/null
+++ b/modules/platforms/cpp/thin-client/include/ignite/impl/thin/cache/continuous/continuous_query_client_holder.h
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_IMPL_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_CLIENT
+#define _IGNITE_IMPL_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_CLIENT
+
+#include <ignite/common/concurrent.h>
+
+#include <ignite/thin/cache/query/continuous/continuous_query_client.h>
+
+namespace ignite
+{
+ namespace impl
+ {
+ namespace thin
+ {
+ namespace cache
+ {
+ namespace query
+ {
+ namespace continuous
+ {
+ /**
+ * Continuous query client holder.
+ */
+ class ContinuousQueryClientHolderBase
+ {
+ public:
+ /**
+ * Destructor.
+ */
+ virtual ~ContinuousQueryClientHolderBase()
+ {
+ // No-op.
+ }
+
+ /**
+ * Read and process cache events.
+ *
+ * @param reader Reader to use.
+ */
+ virtual void ReadAndProcessEvents(ignite::binary::BinaryRawReader& reader) = 0;
+
+ /**
+ * Get buffer size.
+ *
+ * @return Buffer size.
+ */
+ virtual int32_t GetBufferSize() const = 0;
+
+ /**
+ * Get time interval.
+ *
+ * @return Time interval.
+ */
+ virtual int64_t GetTimeInterval() const = 0;
+
+ /**
+ * Gets a value indicating whether to notify about Expired events.
+ *
+ * @return Flag value.
+ */
+ virtual bool GetIncludeExpired() const = 0;
+
+ /**
+ * Disconnected callback.
+ *
+ * Called if channel was disconnected.
+ */
+ virtual void OnDisconnected() = 0;
+ };
+
+ /** Shared pointer to ContinuousQueryClientHolderBase. */
+ typedef common::concurrent::SharedPointer<ContinuousQueryClientHolderBase> SP_ContinuousQueryClientHolderBase;
+
+ /**
+ * Continuous query client holder.
+ */
+ template<typename K, typename V>
+ class ContinuousQueryClientHolder : public ContinuousQueryClientHolderBase
+ {
+ public:
+ /** Wrapped continuous query type. */
+ typedef ignite::thin::cache::query::continuous::ContinuousQueryClient<K, V> ContinuousQuery;
+
+ /**
+ * Constructor.
+ *
+ * @param continuousQuery
+ */
+ ContinuousQueryClientHolder(const ContinuousQuery& continuousQuery) :
+ continuousQuery(continuousQuery)
+ {
+ // No-op.
+ }
+
+ /**
+ * Read and process cache events.
+ *
+ * @param reader Reader to use.
+ */
+ virtual void ReadAndProcessEvents(ignite::binary::BinaryRawReader& reader)
+ {
+ // Number of events.
+ int32_t cnt = reader.ReadInt32();
+
+ // Storing events here.
+ std::vector< ignite::thin::cache::CacheEntryEvent<K, V> > events;
+ events.resize(cnt);
+
+ for (int32_t i = 0; i < cnt; ++i)
+ events[i].Read(reader);
+
+ continuousQuery.GetListener().OnEvent(events.data(), cnt);
+ }
+
+ /**
+ * Get buffer size.
+ *
+ * @return Buffer size.
+ */
+ virtual int32_t GetBufferSize() const
+ {
+ return continuousQuery.GetBufferSize();
+ }
+
+ /**
+ * Get time interval.
+ *
+ * @return Time interval.
+ */
+ virtual int64_t GetTimeInterval() const
+ {
+ return continuousQuery.GetTimeInterval();
+ }
+
+ /**
+ * Gets a value indicating whether to notify about Expired events.
+ *
+ * @return Flag value.
+ */
+ virtual bool GetIncludeExpired() const
+ {
+ return continuousQuery.GetIncludeExpired();
+ }
+
+ /**
+ * Disconnected callback.
+ *
+ * Called if channel was disconnected.
+ */
+ virtual void OnDisconnected()
+ {
+ continuousQuery.GetListener().OnDisconnected();
+ }
+
+ private:
+ /** Continuous query. */
+ ContinuousQuery continuousQuery;
+ };
+ }
+ }
+ }
+ }
+ }
+}
+
+#endif //_IGNITE_IMPL_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_CLIENT
\ No newline at end of file
diff --git a/modules/platforms/cpp/thin-client/include/ignite/impl/thin/cache/query/query_fields_cursor_impl.h b/modules/platforms/cpp/thin-client/include/ignite/impl/thin/cache/query/query_fields_cursor_impl.h
deleted file mode 100644
index 13af4cb..0000000
--- a/modules/platforms/cpp/thin-client/include/ignite/impl/thin/cache/query/query_fields_cursor_impl.h
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef _IGNITE_IMPL_THIN_CACHE_QUERY_QUERY_FIELDS_CURSOR_IMPL
-#define _IGNITE_IMPL_THIN_CACHE_QUERY_QUERY_FIELDS_CURSOR_IMPL
-
-#include <ignite/ignite_error.h>
-
-#include "ignite/impl/ignite_environment.h"
-#include "ignite/impl/operations.h"
-#include "ignite/impl/cache/query/query_batch.h"
-
-namespace ignite
-{
- namespace impl
- {
- namespace thin
- {
- namespace cache
- {
- namespace query
- {
- class QueryFieldsRowImpl;
-
- /**
- * Query cursor implementation.
- */
- class IGNITE_IMPORT_EXPORT QueryCursorImpl
- {
- public:
- /**
- * Constructor.
- *
- * @param env Environment.
- * @param javaRef Java reference.
- */
- QueryCursorImpl(ignite::common::concurrent::SharedPointer<IgniteEnvironment> env, jobject javaRef);
-
- /**
- * Destructor.
- */
- ~QueryCursorImpl();
-
- /**
- * Check whether next result exists.
- *
- * @param err Error.
- * @return True if exists.
- */
- bool HasNext(IgniteError& err);
-
- /**
- * Get next object.
- *
- * @param op Operation.
- * @param err Error.
- */
- void GetNext(OutputOperation& op, IgniteError& err);
-
- /**
- * Get next row.
- *
- * @param err Error.
- * @return Output row.
- */
- QueryFieldsRowImpl* GetNextRow(IgniteError& err);
-
- /**
- * Get all cursor entries.
- *
- * @param op Operation.
- * @param err Error.
- */
- void GetAll(OutputOperation& op, IgniteError& err);
-
- /**
- * Get all cursor entries.
- *
- * @param op Operation.
- */
- void GetAll(OutputOperation& op);
-
- private:
- /** Environment. */
- ignite::common::concurrent::SharedPointer<impl::IgniteEnvironment> env;
-
- /** Handle to Java object. */
- jobject javaRef;
-
- /** Current result batch. */
- QueryBatch* batch;
-
- /** Whether cursor has no more elements available. */
- bool endReached;
-
- /** Whether iteration methods were called. */
- bool iterCalled;
-
- /** Whether GetAll() method was called. */
- bool getAllCalled;
-
- IGNITE_NO_COPY_ASSIGNMENT(QueryCursorImpl);
-
- /**
- * Create Java-side iterator if needed.
- *
- * @param err Error.
- * @return True in case of success, false if an error is thrown.
- */
- bool CreateIteratorIfNeeded(IgniteError& err);
-
- /**
- * Get next result batch if update is needed.
- *
- * @param err Error.
- * @return True if operation has been successful.
- */
- bool GetNextBatchIfNeeded(IgniteError& err);
-
- /**
- * Check whether Java-side iterator has next element.
- *
- * @param err Error.
- * @return True if the next element is available.
- */
- bool IteratorHasNext(IgniteError& err);
- };
- }
- }
- }
- }
-}
-
-#endif //_IGNITE_IMPL_THIN_CACHE_QUERY_QUERY_FIELDS_CURSOR_IMPL
\ No newline at end of file
diff --git a/modules/platforms/cpp/thin-client/include/ignite/impl/thin/cache/query/query_fields_row_impl.h b/modules/platforms/cpp/thin-client/include/ignite/impl/thin/cache/query/query_fields_row_impl.h
deleted file mode 100644
index 3c08901..0000000
--- a/modules/platforms/cpp/thin-client/include/ignite/impl/thin/cache/query/query_fields_row_impl.h
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef _IGNITE_IMPL_CACHE_QUERY_CACHE_QUERY_FIELDS_ROW_IMPL
-#define _IGNITE_IMPL_CACHE_QUERY_CACHE_QUERY_FIELDS_ROW_IMPL
-
-#include <ignite/common/concurrent.h>
-#include <ignite/ignite_error.h>
-
-namespace ignite
-{
- namespace impl
- {
- namespace cache
- {
- namespace query
- {
- /**
- * Query fields cursor implementation.
- */
- class IGNITE_IMPORT_EXPORT QueryFieldsRowImpl
- {
- public:
- typedef common::concurrent::SharedPointer<interop::InteropMemory> SP_InteropMemory;
-
- /**
- * Constructor.
- *
- * @param mem Memory containig row data.
- */
- QueryFieldsRowImpl(SP_InteropMemory mem, int32_t rowBegin, int32_t columnNum) :
- mem(mem),
- stream(mem.Get()),
- reader(&stream),
- columnNum(columnNum),
- processed(0)
- {
- stream.Position(rowBegin);
- }
-
- /**
- * Check whether next entry exists.
- *
- * @return True if next entry exists.
- */
- bool HasNext()
- {
- IgniteError err;
-
- bool res = HasNext(err);
-
- IgniteError::ThrowIfNeeded(err);
-
- return res;
- }
-
- /**
- * Check whether next entry exists.
- *
- * @param err Error.
- * @return True if next entry exists.
- */
- bool HasNext(IgniteError& err)
- {
- if (IsValid())
- return processed < columnNum;
- else
- {
- err = IgniteError(IgniteError::IGNITE_ERR_GENERIC,
- "Instance is not usable (did you check for error?).");
-
- return false;
- }
- }
-
- /**
- * Get next entry.
- *
- * @return Next entry.
- */
- template<typename T>
- T GetNext()
- {
- IgniteError err;
-
- QueryFieldsRowImpl res = GetNext<T>(err);
-
- IgniteError::ThrowIfNeeded(err);
-
- return res;
- }
-
- /**
- * Get next entry.
- *
- * @param err Error.
- * @return Next entry.
- */
- template<typename T>
- T GetNext(IgniteError& err)
- {
- if (IsValid()) {
- ++processed;
- return reader.ReadTopObject<T>();
- }
- else
- {
- err = IgniteError(IgniteError::IGNITE_ERR_GENERIC,
- "Instance is not usable (did you check for error?).");
-
- return T();
- }
- }
-
- /**
- * Get next entry assuming it's an array of 8-byte signed
- * integers. Maps to "byte[]" type in Java.
- *
- * @param dst Array to store data to.
- * @param len Expected length of array.
- * @return Actual amount of elements read. If "len" argument is less than actual
- * array size or resulting array is set to null, nothing will be written
- * to resulting array and returned value will contain required array length.
- * -1 will be returned in case array in stream was null.
- */
- int32_t GetNextInt8Array(int8_t* dst, int32_t len)
- {
- if (IsValid()) {
-
- int32_t actualLen = reader.ReadInt8Array(dst, len);
-
- if (actualLen == 0 || (dst && len >= actualLen))
- ++processed;
-
- return actualLen;
- }
- else
- {
- throw IgniteError(IgniteError::IGNITE_ERR_GENERIC,
- "Instance is not usable (did you check for error?).");
- }
- }
-
- /**
- * Check if the instance is valid.
- *
- * Invalid instance can be returned if some of the previous
- * operations have resulted in a failure. For example invalid
- * instance can be returned by not-throwing version of method
- * in case of error. Invalid instances also often can be
- * created using default constructor.
- *
- * @return True if the instance is valid and can be used.
- */
- bool IsValid()
- {
- return mem.Get() != 0;
- }
-
- private:
- /** Row memory. */
- SP_InteropMemory mem;
-
- /** Row data stream. */
- interop::InteropInputStream stream;
-
- /** Row data reader. */
- binary::BinaryReaderImpl reader;
-
- /** Number of elements in a row. */
- int32_t columnNum;
-
- /** Number of elements that have been read by now. */
- int32_t processed;
-
- IGNITE_NO_COPY_ASSIGNMENT(QueryFieldsRowImpl);
- };
- }
- }
- }
-}
-
-#endif //_IGNITE_IMPL_CACHE_QUERY_CACHE_QUERY_FIELDS_ROW_IMPL
diff --git a/modules/platforms/cpp/thin-client/include/ignite/thin/cache/cache_client.h b/modules/platforms/cpp/thin-client/include/ignite/thin/cache/cache_client.h
index e53ad98..544010e 100644
--- a/modules/platforms/cpp/thin-client/include/ignite/thin/cache/cache_client.h
+++ b/modules/platforms/cpp/thin-client/include/ignite/thin/cache/cache_client.h
@@ -27,12 +27,15 @@
#include <ignite/thin/cache/query/query_fields_cursor.h>
#include <ignite/thin/cache/query/query_sql_fields.h>
+#include <ignite/thin/cache/query/continuous/continuous_query_client.h>
+#include <ignite/thin/cache/query/continuous/continuous_query_handle.h>
#include <ignite/impl/thin/writable.h>
#include <ignite/impl/thin/writable_key.h>
#include <ignite/impl/thin/readable.h>
#include <ignite/impl/thin/cache/cache_client_proxy.h>
+#include <ignite/impl/thin/cache/continuous/continuous_query_client_holder.h>
namespace ignite
{
@@ -72,7 +75,7 @@ namespace ignite
*
* @param impl Implementation.
*/
- CacheClient(common::concurrent::SharedPointer<void> impl) :
+ explicit CacheClient(const common::concurrent::SharedPointer<void>& impl) :
proxy(impl)
{
// No-op.
@@ -597,6 +600,22 @@ namespace ignite
}
/**
+ * Starts the continuous query execution
+ *
+ * @param continuousQuery Continuous query.
+ * @return Query handle. Once all instances are destroyed query execution stopped.
+ */
+ query::continuous::ContinuousQueryHandleClient QueryContinuous(
+ query::continuous::ContinuousQueryClient<K, V> continuousQuery)
+ {
+ using namespace impl::thin::cache::query::continuous;
+
+ SP_ContinuousQueryClientHolderBase holder(new ContinuousQueryClientHolder<K, V>(continuousQuery));
+
+ return proxy.QueryContinuous(holder);
+ }
+
+ /**
* Refresh affinity mapping.
*
* @deprecated Does nothing since Apache Ignite 2.8. Affinity mapping is refreshed automatically now.
diff --git a/modules/platforms/cpp/thin-client/include/ignite/thin/cache/cache_entry.h b/modules/platforms/cpp/thin-client/include/ignite/thin/cache/cache_entry.h
new file mode 100644
index 0000000..ef52814
--- /dev/null
+++ b/modules/platforms/cpp/thin-client/include/ignite/thin/cache/cache_entry.h
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::thin::cache::CacheEntry class.
+ */
+
+#ifndef _IGNITE_THIN_CACHE_CACHE_ENTRY
+#define _IGNITE_THIN_CACHE_CACHE_ENTRY
+
+#include <utility>
+#include <ignite/common/common.h>
+
+namespace ignite
+{
+ namespace thin
+ {
+ namespace cache
+ {
+ /**
+ * %Cache entry class template.
+ *
+ * Both key and value types should be default-constructable,
+ * copy-constructable and assignable.
+ */
+ template<typename K, typename V>
+ class CacheEntry
+ {
+ public:
+ /**
+ * Default constructor.
+ *
+ * Creates instance with both key and value default-constructed.
+ */
+ CacheEntry() :
+ key(),
+ val(),
+ hasValue(false)
+ {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param key Key.
+ * @param val Value.
+ */
+ CacheEntry(const K& key, const V& val) :
+ key(key),
+ val(val),
+ hasValue(true)
+ {
+ // No-op.
+ }
+
+ /**
+ * Copy constructor.
+ *
+ * @param other Other instance.
+ */
+ CacheEntry(const CacheEntry& other) :
+ key(other.key),
+ val(other.val),
+ hasValue(other.hasValue)
+ {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param p Pair.
+ */
+ CacheEntry(const std::pair<K, V>& p) :
+ key(p.first),
+ val(p.second),
+ hasValue(true)
+ {
+ // No-op.
+ }
+
+
+ /**
+ * Destructor.
+ */
+ virtual ~CacheEntry()
+ {
+ // No-op.
+ }
+
+ /**
+ * Assignment operator.
+ *
+ * @param other Other instance.
+ */
+ CacheEntry& operator=(const CacheEntry& other)
+ {
+ if (this != &other)
+ {
+ key = other.key;
+ val = other.val;
+ hasValue = other.hasValue;
+ }
+
+ return *this;
+ }
+
+ /**
+ * Get key.
+ *
+ * @return Key.
+ */
+ const K& GetKey() const
+ {
+ return key;
+ }
+
+ /**
+ * Get value.
+ *
+ * @return Value.
+ */
+ const V& GetValue() const
+ {
+ return val;
+ }
+
+ /**
+ * Check if the value exists.
+ *
+ * @return True, if the value exists.
+ */
+ bool HasValue() const
+ {
+ return hasValue;
+ }
+
+ protected:
+ /** Key. */
+ K key;
+
+ /** Value. */
+ V val;
+
+ /** Indicates whether value exists */
+ bool hasValue;
+ };
+ }
+ }
+}
+
+#endif //_IGNITE_THIN_CACHE_CACHE_ENTRY
diff --git a/modules/platforms/cpp/thin-client/include/ignite/thin/cache/event/cache_entry_event.h b/modules/platforms/cpp/thin-client/include/ignite/thin/cache/event/cache_entry_event.h
new file mode 100644
index 0000000..d906b4b
--- /dev/null
+++ b/modules/platforms/cpp/thin-client/include/ignite/thin/cache/event/cache_entry_event.h
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::thin::cache::event::CacheEntryEvent class.
+ */
+
+#ifndef _IGNITE_THIN_CACHE_EVENT_CACHE_ENTRY_EVENT
+#define _IGNITE_THIN_CACHE_EVENT_CACHE_ENTRY_EVENT
+
+#include <ignite/binary/binary_raw_reader.h>
+#include <ignite/thin/cache/cache_entry.h>
+
+namespace ignite
+{
+ namespace impl
+ {
+ namespace thin
+ {
+ namespace cache
+ {
+ namespace query
+ {
+ namespace continuous
+ {
+ template<typename K, typename V>
+ class ContinuousQueryClientHolder;
+ }
+ }
+ }
+ }
+ }
+ namespace thin
+ {
+ namespace cache
+ {
+ /**
+ * Cache event type.
+ */
+ struct CacheEntryEventType
+ {
+ enum Type
+ {
+ /** An event type indicating that the cache entry was created. */
+ CREATED = 0,
+
+ /** An event type indicating that the cache entry was updated. i.e. a previous */
+ UPDATED = 1,
+
+ /** An event type indicating that the cache entry was removed. */
+ REMOVED = 2,
+
+ /** An event type indicating that the cache entry was removed by expiration policy. */
+ EXPIRED = 3
+ };
+ };
+
+ /**
+ * Cache entry event class template.
+ *
+ * Both key and value types should be default-constructable,
+ * copy-constructable and assignable.
+ */
+ template<typename K, typename V>
+ class CacheEntryEvent : public CacheEntry<K, V>
+ {
+ friend class ignite::impl::thin::cache::query::continuous::ContinuousQueryClientHolder<K, V>;
+
+ public:
+ /**
+ * Default constructor.
+ *
+ * Creates instance with all fields default-constructed.
+ */
+ CacheEntryEvent() :
+ CacheEntry<K, V>(),
+ oldVal(),
+ hasOldValue(false),
+ eventType(CacheEntryEventType::CREATED)
+ {
+ // No-op.
+ }
+
+ /**
+ * Copy constructor.
+ *
+ * @param other Other instance.
+ */
+ CacheEntryEvent(const CacheEntryEvent<K, V>& other) :
+ CacheEntry<K, V>(other),
+ oldVal(other.oldVal),
+ hasOldValue(other.hasOldValue),
+ eventType(other.eventType)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~CacheEntryEvent()
+ {
+ // No-op.
+ }
+
+ /**
+ * Assignment operator.
+ *
+ * @param other Other instance.
+ * @return *this.
+ */
+ CacheEntryEvent& operator=(const CacheEntryEvent<K, V>& other)
+ {
+ if (this != &other)
+ {
+ CacheEntry<K, V>::operator=(other);
+
+ oldVal = other.oldVal;
+ hasOldValue = other.hasOldValue;
+ eventType = other.eventType;
+ }
+
+ return *this;
+ }
+
+ /**
+ * Get event type.
+ *
+ * @see CacheEntryEventType for details on events you can receive.
+ *
+ * @return Event type.
+ */
+ CacheEntryEventType::Type GetEventType() const
+ {
+ return eventType;
+ };
+
+ /**
+ * Get old value.
+ *
+ * @return Old value.
+ */
+ const V& GetOldValue() const
+ {
+ return oldVal;
+ }
+
+ /**
+ * Check if the old value exists.
+ *
+ * @return True, if the old value exists.
+ */
+ bool HasOldValue() const
+ {
+ return hasOldValue;
+ }
+
+ private:
+ /**
+ * Reads cache event using provided raw reader.
+ *
+ * @param reader Reader to use.
+ */
+ void Read(binary::BinaryRawReader& reader)
+ {
+ this->key = reader.ReadObject<K>();
+
+ this->hasOldValue = reader.TryReadObject(this->oldVal);
+ this->hasValue = reader.TryReadObject(this->val);
+
+ int8_t intType = reader.ReadInt8();
+ if (intType < 0 || intType > 3)
+ {
+ std::string errMsg = "Event type is not supported: " + common::LexicalCast<std::string>(intType);
+ throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, errMsg.c_str());
+ }
+
+ eventType = static_cast<CacheEntryEventType::Type>(intType);
+ }
+
+ /** Old value. */
+ V oldVal;
+
+ /** Indicates whether old value exists */
+ bool hasOldValue;
+
+ /** Event type. */
+ CacheEntryEventType::Type eventType;
+ };
+ }
+ }
+}
+
+#endif //_IGNITE_THIN_CACHE_EVENT_CACHE_ENTRY_EVENT
diff --git a/modules/platforms/cpp/thin-client/include/ignite/thin/cache/event/cache_entry_event_listener.h b/modules/platforms/cpp/thin-client/include/ignite/thin/cache/event/cache_entry_event_listener.h
new file mode 100644
index 0000000..e2829d9
--- /dev/null
+++ b/modules/platforms/cpp/thin-client/include/ignite/thin/cache/event/cache_entry_event_listener.h
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::thin::cache::event::CacheEntryEventListener class.
+ */
+
+#ifndef _IGNITE_THIN_CACHE_EVENT_CACHE_ENTRY_EVENT_LISTENER
+#define _IGNITE_THIN_CACHE_EVENT_CACHE_ENTRY_EVENT_LISTENER
+
+#include <stdint.h>
+
+#include <ignite/thin/cache/event/cache_entry_event.h>
+
+namespace ignite
+{
+ namespace thin
+ {
+ namespace cache
+ {
+ namespace event
+ {
+ /**
+ * Cache entry event listener.
+ */
+ template<typename K, typename V>
+ class CacheEntryEventListener
+ {
+ public:
+ /**
+ * Default constructor.
+ */
+ CacheEntryEventListener()
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~CacheEntryEventListener()
+ {
+ // No-op.
+ }
+
+ /**
+ * Event callback.
+ *
+ * @param evts Events.
+ * @param num Events number.
+ */
+ virtual void OnEvent(const CacheEntryEvent<K, V>* evts, uint32_t num) = 0;
+
+ /**
+ * Disconnected callback.
+ *
+ * Called if channel was disconnected. This also means that continuous query was closed and no more
+ * events will be provided for this listener.
+ */
+ virtual void OnDisconnected() = 0;
+ };
+ }
+ }
+ }
+}
+
+#endif //_IGNITE_THIN_CACHE_EVENT_CACHE_ENTRY_EVENT_LISTENER
\ No newline at end of file
diff --git a/modules/platforms/cpp/thin-client/include/ignite/thin/cache/query/continuous/continuous_query_client.h b/modules/platforms/cpp/thin-client/include/ignite/thin/cache/query/continuous/continuous_query_client.h
new file mode 100644
index 0000000..10e66ca
--- /dev/null
+++ b/modules/platforms/cpp/thin-client/include/ignite/thin/cache/query/continuous/continuous_query_client.h
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::thin::cache::query::continuous::ContinuousQueryClient class.
+ */
+
+#ifndef _IGNITE_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_CLIENT
+#define _IGNITE_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_CLIENT
+
+#include <ignite/reference.h>
+
+#include <ignite/thin/cache/event/cache_entry_event_listener.h>
+
+namespace ignite
+{
+ namespace thin
+ {
+ namespace cache
+ {
+ namespace query
+ {
+ namespace continuous
+ {
+ /**
+ * Continuous query client.
+ *
+ * Continuous query client allow to register a listener for cache update events. On any update to
+ * the related cache an event is sent to the client that has executed the query and listener is
+ * notified on that client.
+ *
+ * Continuous query can either be executed on the whole topology or only on local node.
+ *
+ * To execute the query over the cache use method
+ * ignite::thin::cache::CacheClient::QueryContinuous().
+ */
+ template<typename K, typename V>
+ class ContinuousQueryClient
+ {
+ public:
+ /**
+ * Default value for the buffer size.
+ */
+ enum { DEFAULT_BUFFER_SIZE = 1 };
+
+ /**
+ * Default value for the time interval.
+ */
+ enum { DEFAULT_TIME_INTERVAL = 0 };
+
+ /**
+ * Destructor.
+ */
+ ~ContinuousQueryClient()
+ {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param lsnr Event listener. Invoked on the node where continuous query execution has been
+ * started.
+ */
+ explicit ContinuousQueryClient(Reference<event::CacheEntryEventListener<K, V> > lsnr) :
+ bufferSize(DEFAULT_BUFFER_SIZE),
+ timeInterval(DEFAULT_TIME_INTERVAL),
+ includeExpired(false),
+ listener(lsnr)
+ {
+ // No-op.
+ }
+
+ /**
+ * Set buffer size.
+ *
+ * When a cache update happens, entry is first put into a buffer. Entries from buffer will be
+ * sent to the master node only if the buffer is full or time provided via SetTimeInterval is
+ * exceeded.
+ *
+ * @param val Buffer size.
+ */
+ void SetBufferSize(int32_t val)
+ {
+ bufferSize = val;
+ }
+
+ /**
+ * Get buffer size.
+ *
+ * When a cache update happens, entry is first put into a buffer. Entries from buffer will be
+ * sent to the master node only if the buffer is full or time provided via SetTimeInterval is
+ * exceeded.
+ *
+ * @return Buffer size.
+ */
+ int32_t GetBufferSize() const
+ {
+ return bufferSize;
+ }
+
+ /**
+ * Set time interval.
+ *
+ * When a cache update happens, entry is first put into a buffer. Entries from buffer are sent
+ * to the master node only if the buffer is full (its size can be changed via SetBufferSize) or
+ * time provided via this method is exceeded.
+ *
+ * Default value is DEFAULT_TIME_INTERVAL, i.e. 0, which means that time check is disabled and
+ * entries will be sent only when buffer is full.
+ *
+ * @param val Time interval in miliseconds.
+ */
+ void SetTimeInterval(int64_t val)
+ {
+ timeInterval = val;
+ }
+
+ /**
+ * Get time interval.
+ *
+ * When a cache update happens, entry is first put into a buffer. Entries from buffer are sent
+ * to the master node only if the buffer is full (its size can be changed via SetBufferSize) or
+ * time provided via this method is exceeded.
+ *
+ * Default value is DEFAULT_TIME_INTERVAL, i.e. 0, which means that time check is disabled and
+ * entries will be sent only when buffer is full.
+ *
+ * @return Time interval.
+ */
+ int64_t GetTimeInterval() const
+ {
+ return timeInterval;
+ }
+
+ /**
+ * Sets a value indicating whether to notify about Expired events.
+ *
+ * If @c true, then the listener will get notifications about expired cache entries. Otherwise,
+ * only Created, Updated, and Removed events will be passed to the listener.
+ *
+ * Defaults to @c false.
+ *
+ * @param val Flag value.
+ */
+ void SetIncludeExpired(bool val)
+ {
+ includeExpired = val;
+ }
+
+ /**
+ * Gets a value indicating whether to notify about Expired events.
+ *
+ * If @c true, then the listener will get notifications about expired cache entries. Otherwise,
+ * only Created, Updated, and Removed events will be passed to the listener.
+ *
+ * Defaults to @c false.
+ *
+ * @return Flag value.
+ */
+ bool GetIncludeExpired() const
+ {
+ return includeExpired;
+ }
+
+ /**
+ * Set cache entry event listener.
+ *
+ * @param lsnr Cache entry event listener. Invoked on the
+ * node where continuous query execution has been
+ * started.
+ */
+ void SetListener(Reference<event::CacheEntryEventListener<K, V> > lsnr)
+ {
+ listener = lsnr;
+ }
+
+ /**
+ * Get cache entry event listener.
+ *
+ * @return Cache entry event listener.
+ */
+ const event::CacheEntryEventListener<K, V>& GetListener() const
+ {
+ return *listener.Get();
+ }
+
+ /**
+ * Get cache entry event listener.
+ *
+ * @return Cache entry event listener.
+ */
+ event::CacheEntryEventListener<K, V>& GetListener()
+ {
+ return *listener.Get();
+ }
+
+ private:
+ /** Buffer size. */
+ int32_t bufferSize;
+
+ /** Time interval. */
+ int64_t timeInterval;
+
+ /** Include expired. */
+ bool includeExpired;
+
+ /** Listener. */
+ Reference<event::CacheEntryEventListener<K, V> > listener;
+ };
+ }
+ }
+ }
+ }
+}
+
+#endif //_IGNITE_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_CLIENT
\ No newline at end of file
diff --git a/modules/platforms/cpp/thin-client/include/ignite/thin/cache/query/continuous/continuous_query_handle.h b/modules/platforms/cpp/thin-client/include/ignite/thin/cache/query/continuous/continuous_query_handle.h
new file mode 100644
index 0000000..348132e
--- /dev/null
+++ b/modules/platforms/cpp/thin-client/include/ignite/thin/cache/query/continuous/continuous_query_handle.h
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::thin::cache::query::continuous::ContinuousQueryHandleClient class.
+ */
+
+#ifndef _IGNITE_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE_CLIENT
+#define _IGNITE_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE_CLIENT
+
+#include <ignite/common/concurrent.h>
+
+namespace ignite
+{
+ namespace thin
+ {
+ namespace cache
+ {
+ namespace query
+ {
+ namespace continuous
+ {
+ /**
+ * Continuous query handle client.
+ *
+ * Once destructed, continuous query is stopped and event listener stops getting event.
+ */
+ class ContinuousQueryHandleClient
+ {
+ public:
+ /**
+ * Default constructor.
+ */
+ ContinuousQueryHandleClient() :
+ impl()
+ {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * Internal method. Should not be used by user.
+ *
+ * @param impl Implementation.
+ */
+ ContinuousQueryHandleClient(const common::concurrent::SharedPointer<void>& impl) :
+ impl(impl)
+ {
+ // No-op.
+ }
+
+ private:
+ /** Implementation delegate. */
+ common::concurrent::SharedPointer<void> impl;
+ };
+ }
+ }
+ }
+ }
+}
+
+#endif //_IGNITE_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE_CLIENT
\ No newline at end of file
diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp
index 74219e9..100645a 100644
--- a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp
@@ -20,6 +20,7 @@
#include "impl/response_status.h"
#include "impl/message.h"
#include "impl/cache/cache_client_impl.h"
+#include "impl/cache/query/continuous/continuous_query_notification_handler.h"
#include "impl/transactions/transactions_impl.h"
using namespace ignite::impl::thin::transactions;
@@ -51,9 +52,7 @@ namespace ignite
CacheClientImpl::~CacheClientImpl()
{
- DataRouter* router0 = router.Get();
- if (router0)
- router0->Close();
+ // No-op.
}
template<typename ReqT, typename RspT>
@@ -161,7 +160,7 @@ namespace ignite
void CacheClientImpl::Put(const WritableKey& key, const Writable& value)
{
- Cache2ValueRequest<RequestType::CACHE_PUT> req(id, binary, key, value);
+ Cache2ValueRequest<MessageType::CACHE_PUT> req(id, binary, key, value);
Response rsp;
TransactionalSyncCacheKeyMessage(key, req, rsp);
@@ -169,7 +168,7 @@ namespace ignite
void CacheClientImpl::Get(const WritableKey& key, Readable& value)
{
- CacheValueRequest<RequestType::CACHE_GET> req(id, binary, key);
+ CacheValueRequest<MessageType::CACHE_GET> req(id, binary, key);
CacheValueResponse rsp(value);
TransactionalSyncCacheKeyMessage(key, req, rsp);
@@ -177,7 +176,7 @@ namespace ignite
void CacheClientImpl::PutAll(const Writable & pairs)
{
- CacheValueRequest<RequestType::CACHE_PUT_ALL> req(id, binary, pairs);
+ CacheValueRequest<MessageType::CACHE_PUT_ALL> req(id, binary, pairs);
Response rsp;
TransactionalSyncMessage(req, rsp);
@@ -185,7 +184,7 @@ namespace ignite
void CacheClientImpl::GetAll(const Writable& keys, Readable& pairs)
{
- CacheValueRequest<RequestType::CACHE_GET_ALL> req(id, binary, keys);
+ CacheValueRequest<MessageType::CACHE_GET_ALL> req(id, binary, keys);
CacheValueResponse rsp(pairs);
TransactionalSyncMessage(req, rsp);
@@ -193,7 +192,7 @@ namespace ignite
bool CacheClientImpl::Replace(const WritableKey& key, const Writable& value)
{
- Cache2ValueRequest<RequestType::CACHE_REPLACE> req(id, binary, key, value);
+ Cache2ValueRequest<MessageType::CACHE_REPLACE> req(id, binary, key, value);
BoolResponse rsp;
TransactionalSyncCacheKeyMessage(key, req, rsp);
@@ -203,7 +202,7 @@ namespace ignite
bool CacheClientImpl::ContainsKey(const WritableKey& key)
{
- CacheValueRequest<RequestType::CACHE_CONTAINS_KEY> req(id, binary, key);
+ CacheValueRequest<MessageType::CACHE_CONTAINS_KEY> req(id, binary, key);
BoolResponse rsp;
TransactionalSyncCacheKeyMessage(key, req, rsp);
@@ -213,7 +212,7 @@ namespace ignite
bool CacheClientImpl::ContainsKeys(const Writable& keys)
{
- CacheValueRequest<RequestType::CACHE_CONTAINS_KEYS> req(id, binary, keys);
+ CacheValueRequest<MessageType::CACHE_CONTAINS_KEYS> req(id, binary, keys);
BoolResponse rsp;
TransactionalSyncMessage(req, rsp);
@@ -233,7 +232,7 @@ namespace ignite
bool CacheClientImpl::Remove(const WritableKey& key)
{
- CacheValueRequest<RequestType::CACHE_REMOVE_KEY> req(id, binary, key);
+ CacheValueRequest<MessageType::CACHE_REMOVE_KEY> req(id, binary, key);
BoolResponse rsp;
TransactionalSyncCacheKeyMessage(key, req, rsp);
@@ -243,7 +242,7 @@ namespace ignite
bool CacheClientImpl::Remove(const WritableKey& key, const Writable& val)
{
- Cache2ValueRequest<RequestType::CACHE_REMOVE_IF_EQUALS> req(id, binary, key, val);
+ Cache2ValueRequest<MessageType::CACHE_REMOVE_IF_EQUALS> req(id, binary, key, val);
BoolResponse rsp;
TransactionalSyncCacheKeyMessage(key, req, rsp);
@@ -253,7 +252,7 @@ namespace ignite
void CacheClientImpl::RemoveAll(const Writable& keys)
{
- CacheValueRequest<RequestType::CACHE_REMOVE_KEYS> req(id, binary, keys);
+ CacheValueRequest<MessageType::CACHE_REMOVE_KEYS> req(id, binary, keys);
Response rsp;
TransactionalSyncMessage(req, rsp);
@@ -261,7 +260,7 @@ namespace ignite
void CacheClientImpl::RemoveAll()
{
- CacheRequest<RequestType::CACHE_REMOVE_ALL> req(id, binary);
+ CacheRequest<MessageType::CACHE_REMOVE_ALL> req(id, binary);
Response rsp;
TransactionalSyncMessage(req, rsp);
@@ -269,7 +268,7 @@ namespace ignite
void CacheClientImpl::Clear(const WritableKey& key)
{
- CacheValueRequest<RequestType::CACHE_CLEAR_KEY> req(id, binary, key);
+ CacheValueRequest<MessageType::CACHE_CLEAR_KEY> req(id, binary, key);
Response rsp;
TransactionalSyncCacheKeyMessage(key, req, rsp);
@@ -277,7 +276,7 @@ namespace ignite
void CacheClientImpl::Clear()
{
- CacheRequest<RequestType::CACHE_CLEAR> req(id, binary);
+ CacheRequest<MessageType::CACHE_CLEAR> req(id, binary);
Response rsp;
TransactionalSyncMessage(req, rsp);
@@ -285,7 +284,7 @@ namespace ignite
void CacheClientImpl::ClearAll(const Writable& keys)
{
- CacheValueRequest<RequestType::CACHE_CLEAR_KEYS> req(id, binary, keys);
+ CacheValueRequest<MessageType::CACHE_CLEAR_KEYS> req(id, binary, keys);
Response rsp;
TransactionalSyncMessage(req, rsp);
@@ -293,7 +292,7 @@ namespace ignite
void CacheClientImpl::LocalPeek(const WritableKey& key, Readable& value)
{
- CacheValueRequest<RequestType::CACHE_LOCAL_PEEK> req(id, binary, key);
+ CacheValueRequest<MessageType::CACHE_LOCAL_PEEK> req(id, binary, key);
CacheValueResponse rsp(value);
TransactionalSyncCacheKeyMessage(key, req, rsp);
@@ -301,7 +300,7 @@ namespace ignite
bool CacheClientImpl::Replace(const WritableKey& key, const Writable& oldVal, const Writable& newVal)
{
- Cache3ValueRequest<RequestType::CACHE_REPLACE_IF_EQUALS> req(id, binary, key, oldVal, newVal);
+ Cache3ValueRequest<MessageType::CACHE_REPLACE_IF_EQUALS> req(id, binary, key, oldVal, newVal);
BoolResponse rsp;
TransactionalSyncCacheKeyMessage(key, req, rsp);
@@ -311,7 +310,7 @@ namespace ignite
void CacheClientImpl::GetAndPut(const WritableKey& key, const Writable& valIn, Readable& valOut)
{
- Cache2ValueRequest<RequestType::CACHE_GET_AND_PUT> req(id, binary, key, valIn);
+ Cache2ValueRequest<MessageType::CACHE_GET_AND_PUT> req(id, binary, key, valIn);
CacheValueResponse rsp(valOut);
TransactionalSyncCacheKeyMessage(key, req, rsp);
@@ -319,7 +318,7 @@ namespace ignite
void CacheClientImpl::GetAndRemove(const WritableKey& key, Readable& valOut)
{
- CacheValueRequest<RequestType::CACHE_GET_AND_REMOVE> req(id, binary, key);
+ CacheValueRequest<MessageType::CACHE_GET_AND_REMOVE> req(id, binary, key);
CacheValueResponse rsp(valOut);
TransactionalSyncCacheKeyMessage(key, req, rsp);
@@ -327,7 +326,7 @@ namespace ignite
void CacheClientImpl::GetAndReplace(const WritableKey& key, const Writable& valIn, Readable& valOut)
{
- Cache2ValueRequest<RequestType::CACHE_GET_AND_REPLACE> req(id, binary, key, valIn);
+ Cache2ValueRequest<MessageType::CACHE_GET_AND_REPLACE> req(id, binary, key, valIn);
CacheValueResponse rsp(valOut);
TransactionalSyncCacheKeyMessage(key, req, rsp);
@@ -335,7 +334,7 @@ namespace ignite
bool CacheClientImpl::PutIfAbsent(const WritableKey& key, const Writable& val)
{
- Cache2ValueRequest<RequestType::CACHE_PUT_IF_ABSENT> req(id, binary, key, val);
+ Cache2ValueRequest<MessageType::CACHE_PUT_IF_ABSENT> req(id, binary, key, val);
BoolResponse rsp;
TransactionalSyncCacheKeyMessage(key, req, rsp);
@@ -345,7 +344,7 @@ namespace ignite
void CacheClientImpl::GetAndPutIfAbsent(const WritableKey& key, const Writable& valIn, Readable& valOut)
{
- Cache2ValueRequest<RequestType::CACHE_GET_AND_PUT_IF_ABSENT> req(id, binary, key, valIn);
+ Cache2ValueRequest<MessageType::CACHE_GET_AND_PUT_IF_ABSENT> req(id, binary, key, valIn);
CacheValueResponse rsp(valOut);
TransactionalSyncCacheKeyMessage(key, req, rsp);
@@ -369,6 +368,25 @@ namespace ignite
return cursorImpl;
}
+
+ query::continuous::SP_ContinuousQueryHandleClientImpl CacheClientImpl::QueryContinuous(
+ const query::continuous::SP_ContinuousQueryClientHolderBase& continuousQuery)
+ {
+ const query::continuous::ContinuousQueryClientHolderBase& cq = *continuousQuery.Get();
+
+ ContinuousQueryRequest req(id, cq.GetBufferSize(), cq.GetTimeInterval(), cq.GetIncludeExpired());
+ ContinuousQueryResponse rsp;
+
+ SP_DataChannel channel = SyncMessage(req, rsp);
+
+ query::continuous::SP_ContinuousQueryNotificationHandler handler(
+ new query::continuous::ContinuousQueryNotificationHandler(*channel.Get(), continuousQuery));
+
+ channel.Get()->RegisterNotificationHandler(rsp.GetQueryId(), handler);
+
+ return query::continuous::SP_ContinuousQueryHandleClientImpl(
+ new query::continuous::ContinuousQueryHandleClientImpl(channel, rsp.GetQueryId()));
+ }
}
}
}
diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h
index 0df1ae3..4c82592 100644
--- a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h
+++ b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h
@@ -23,9 +23,12 @@
#include <ignite/thin/cache/query/query_sql_fields.h>
+#include <ignite/impl/thin/cache/continuous/continuous_query_client_holder.h>
+
#include "impl/data_router.h"
#include "impl/transactions/transactions_impl.h"
#include "impl/cache/query/query_fields_cursor_impl.h"
+#include "impl/cache/query/continuous/continuous_query_handle_impl.h"
namespace ignite
{
@@ -298,6 +301,15 @@ namespace ignite
*/
query::SP_QueryFieldsCursorImpl Query(const ignite::thin::cache::query::SqlFieldsQuery &qry);
+ /**
+ * Starts the continuous query execution
+ *
+ * @param continuousQuery Continuous query.
+ * @return Query handle. Once all instances are destroyed query execution stopped.
+ */
+ query::continuous::SP_ContinuousQueryHandleClientImpl QueryContinuous(
+ const query::continuous::SP_ContinuousQueryClientHolderBase& continuousQuery);
+
private:
/**
* Synchronously send request message and receive response.
diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_proxy.cpp b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_proxy.cpp
index a5795ee..eab6ba0 100644
--- a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_proxy.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_proxy.cpp
@@ -15,12 +15,18 @@
* limitations under the License.
*/
+#include <ignite/thin/cache/query/continuous/continuous_query_handle.h>
+
#include <ignite/impl/thin/cache/cache_client_proxy.h>
#include <impl/cache/cache_client_impl.h>
using namespace ignite::impl::thin;
using namespace cache;
+using ignite::thin::cache::query::SqlFieldsQuery;
+using ignite::thin::cache::query::QueryFieldsCursor;
+using ignite::thin::cache::query::continuous::ContinuousQueryHandleClient;
+
namespace
{
using namespace ignite::common::concurrent;
@@ -150,13 +156,18 @@ namespace ignite
GetCacheImpl(impl).GetAndPutIfAbsent(key, valIn, valOut);
}
- ignite::thin::cache::query::QueryFieldsCursor CacheClientProxy::Query(
- const ignite::thin::cache::query::SqlFieldsQuery &qry)
+ QueryFieldsCursor CacheClientProxy::Query(const ignite::thin::cache::query::SqlFieldsQuery &qry)
{
query::SP_QueryFieldsCursorImpl cursorImpl = GetCacheImpl(impl).Query(qry);
return ignite::thin::cache::query::QueryFieldsCursor(cursorImpl);
}
+
+ ContinuousQueryHandleClient CacheClientProxy::QueryContinuous(
+ const query::continuous::SP_ContinuousQueryClientHolderBase &continuousQuery)
+ {
+ return ContinuousQueryHandleClient(GetCacheImpl(impl).QueryContinuous(continuousQuery));
+ }
}
}
}
diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/query/continuous/continuous_query_handle_impl.h b/modules/platforms/cpp/thin-client/src/impl/cache/query/continuous/continuous_query_handle_impl.h
new file mode 100644
index 0000000..2843d30
--- /dev/null
+++ b/modules/platforms/cpp/thin-client/src/impl/cache/query/continuous/continuous_query_handle_impl.h
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_IMPL_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE_CLIENT
+#define _IGNITE_IMPL_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE_CLIENT
+
+#include <ignite/common/common.h>
+#include <ignite/common/concurrent.h>
+
+#include "impl/data_channel.h"
+#include "impl/message.h"
+
+namespace ignite
+{
+ namespace impl
+ {
+ namespace thin
+ {
+ namespace cache
+ {
+ namespace query
+ {
+ namespace continuous
+ {
+ /**
+ * Continuous query handle client implementation.
+ *
+ * Once destructed, continuous query is stopped and event listener stops getting event.
+ */
+ class ContinuousQueryHandleClientImpl
+ {
+ public:
+ /**
+ * Default constructor.
+ *
+ * @param channel Channel.
+ * @param queryId Query ID.
+ */
+ ContinuousQueryHandleClientImpl(SP_DataChannel channel, int64_t queryId) :
+ channel(channel),
+ queryId(queryId)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~ContinuousQueryHandleClientImpl()
+ {
+ assert(channel.IsValid());
+
+ DataChannel& channel0 = *channel.Get();
+
+ channel0.DeregisterNotificationHandler(queryId);
+ channel0.CloseResource(queryId);
+ }
+
+ private:
+ /** Data channel. */
+ SP_DataChannel channel;
+
+ /** Query ID. */
+ int64_t queryId;
+ };
+
+ /** Shared pointer to ContinuousQueryHandleClientImpl. */
+ typedef common::concurrent::SharedPointer<ContinuousQueryHandleClientImpl> SP_ContinuousQueryHandleClientImpl;
+ }
+ }
+ }
+ }
+ }
+}
+
+#endif //_IGNITE_IMPL_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE_CLIENT
\ No newline at end of file
diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/query/continuous/continuous_query_notification_handler.cpp b/modules/platforms/cpp/thin-client/src/impl/cache/query/continuous/continuous_query_notification_handler.cpp
new file mode 100644
index 0000000..96449dd
--- /dev/null
+++ b/modules/platforms/cpp/thin-client/src/impl/cache/query/continuous/continuous_query_notification_handler.cpp
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "impl/data_channel.h"
+#include "impl/message.h"
+
+#include "impl/cache/query/continuous/continuous_query_notification_handler.h"
+
+namespace ignite
+{
+ namespace impl
+ {
+ namespace thin
+ {
+ namespace cache
+ {
+ namespace query
+ {
+ namespace continuous
+ {
+ ContinuousQueryNotificationHandler::ContinuousQueryNotificationHandler(DataChannel& channel,
+ const SP_ContinuousQueryClientHolderBase& continuousQuery) :
+ continuousQuery(continuousQuery),
+ channel(channel)
+ {
+ // No-op.
+ }
+
+ ContinuousQueryNotificationHandler::~ContinuousQueryNotificationHandler()
+ {
+ // No-op.
+ }
+
+ void ContinuousQueryNotificationHandler::OnNotification(const network::DataBuffer& msg)
+ {
+ ClientCacheEntryEventNotification notification(*continuousQuery.Get());
+ channel.DeserializeMessage(msg, notification);
+ }
+
+ void ContinuousQueryNotificationHandler::OnDisconnected()
+ {
+ continuousQuery.Get()->OnDisconnected();
+ }
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/query/continuous/continuous_query_notification_handler.h b/modules/platforms/cpp/thin-client/src/impl/cache/query/continuous/continuous_query_notification_handler.h
new file mode 100644
index 0000000..b1bc955
--- /dev/null
+++ b/modules/platforms/cpp/thin-client/src/impl/cache/query/continuous/continuous_query_notification_handler.h
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_IMPL_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_NOTIFICATION_HANDLER
+#define _IGNITE_IMPL_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_NOTIFICATION_HANDLER
+
+#include <ignite/common/common.h>
+#include <ignite/common/concurrent.h>
+
+#include <ignite/binary/binary_raw_reader.h>
+
+#include <ignite/impl/interop/interop_input_stream.h>
+#include <ignite/impl/binary/binary_reader_impl.h>
+#include <ignite/impl/thin/cache/continuous/continuous_query_client_holder.h>
+
+#include "impl/notification_handler.h"
+
+namespace ignite
+{
+ namespace impl
+ {
+ namespace thin
+ {
+ namespace cache
+ {
+ namespace query
+ {
+ namespace continuous
+ {
+ /**
+ * Continuous query notification handler.
+ */
+ class ContinuousQueryNotificationHandler : public NotificationHandler
+ {
+ public:
+ /**
+ * Constructor.
+ *
+ * @param channel Channel.
+ * @param continuousQuery Continuous Query.
+ */
+ ContinuousQueryNotificationHandler(DataChannel& channel,
+ const SP_ContinuousQueryClientHolderBase& continuousQuery);
+
+ /**
+ * Destructor.
+ */
+ virtual ~ContinuousQueryNotificationHandler();
+
+ /**
+ * Handle notification.
+ *
+ * @param msg Message.
+ * @return @c true if processing complete.
+ */
+ virtual void OnNotification(const network::DataBuffer& msg);
+
+ /**
+ * Disconnected callback.
+ *
+ * Called if channel was disconnected.
+ */
+ virtual void OnDisconnected();
+
+ private:
+ /** Query. */
+ SP_ContinuousQueryClientHolderBase continuousQuery;
+
+ /** Channel. */
+ DataChannel& channel;
+ };
+
+ /** Shared pointer to ContinuousQueryHandleClientImpl. */
+ typedef common::concurrent::SharedPointer<ContinuousQueryNotificationHandler> SP_ContinuousQueryNotificationHandler;
+ }
+ }
+ }
+ }
+ }
+}
+
+#endif //_IGNITE_IMPL_THIN_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_NOTIFICATION_HANDLER
\ No newline at end of file
diff --git a/modules/platforms/cpp/thin-client/src/impl/channel_state_handler.h b/modules/platforms/cpp/thin-client/src/impl/channel_state_handler.h
index 14b4712..e8954dc 100644
--- a/modules/platforms/cpp/thin-client/src/impl/channel_state_handler.h
+++ b/modules/platforms/cpp/thin-client/src/impl/channel_state_handler.h
@@ -52,6 +52,14 @@ namespace ignite
* @param err Error.
*/
virtual void OnHandshakeError(uint64_t id, const IgniteError& err) = 0;
+
+ /**
+ * Called if notification handling failed.
+ *
+ * @param id Channel ID.
+ * @param err Error.
+ */
+ virtual void OnNotificationHandlingError(uint64_t id, const IgniteError& err) = 0;
};
}
}
diff --git a/modules/platforms/cpp/thin-client/src/impl/compute/compute_client_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/compute/compute_client_impl.cpp
index c7904ed..53de78f 100644
--- a/modules/platforms/cpp/thin-client/src/impl/compute/compute_client_impl.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/compute/compute_client_impl.cpp
@@ -51,7 +51,7 @@ namespace
// No-op.
}
- virtual bool OnNotification(const network::DataBuffer& msg)
+ virtual void OnNotification(const network::DataBuffer& msg)
{
ComputeTaskFinishedNotification notification(res);
channel.DeserializeMessage(msg, notification);
@@ -59,12 +59,15 @@ namespace
if (notification.IsFailure())
{
promise.SetError(IgniteError(IgniteError::IGNITE_ERR_COMPUTE_EXECUTION_REJECTED,
- notification.GetErrorMessage().c_str()));
+ notification.GetError().c_str()));
}
else
promise.SetValue();
+ }
- return true;
+ virtual void OnDisconnected()
+ {
+ promise.SetError(IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE, "Connection closed"));
}
/**
@@ -111,6 +114,8 @@ namespace ignite
channel.Get()->RegisterNotificationHandler(rsp.GetNotificationId(), handler);
handler.Get()->GetFuture().GetValue();
+
+ channel.Get()->DeregisterNotificationHandler(rsp.GetNotificationId());
}
}
}
diff --git a/modules/platforms/cpp/thin-client/src/impl/data_channel.cpp b/modules/platforms/cpp/thin-client/src/impl/data_channel.cpp
index 2bdd6dc..9c7194a 100644
--- a/modules/platforms/cpp/thin-client/src/impl/data_channel.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/data_channel.cpp
@@ -57,7 +57,8 @@ namespace ignite
const ignite::network::SP_AsyncClientPool& asyncPool,
const ignite::thin::IgniteClientConfiguration& cfg,
binary::BinaryTypeManager& typeMgr,
- ChannelStateHandler& stateHandler
+ ChannelStateHandler& stateHandler,
+ common::ThreadPool& userThreadPool
) :
stateHandler(stateHandler),
handshakePerformed(false),
@@ -68,14 +69,15 @@ namespace ignite
typeMgr(typeMgr),
currentVersion(VERSION_DEFAULT),
reqIdCounter(0),
- responseMutex()
+ responseMutex(),
+ userThreadPool(userThreadPool)
{
// No-op.
}
DataChannel::~DataChannel()
{
- Close();
+ Close(0);
}
void DataChannel::StartHandshake()
@@ -83,10 +85,9 @@ namespace ignite
DoHandshake(VERSION_DEFAULT);
}
- void DataChannel::Close()
+ void DataChannel::Close(const IgniteError* err)
{
- asyncPool.Get()->Close(id, 0);
- handlerMap.clear();
+ asyncPool.Get()->Close(id, err);
}
void DataChannel::SyncMessage(Request &req, Response &rsp, int32_t timeout)
@@ -187,13 +188,16 @@ namespace ignite
if (flags & Flag::NOTIFICATION)
{
- common::concurrent::CsLockGuard lock(handlerMutex);
+ common::SP_ThreadPoolTask task;
+ {
+ common::concurrent::CsLockGuard lock(handlerMutex);
- NotificationHandlerHolder& holder = handlerMap[rspId];
- holder.ProcessNotification(msg);
+ NotificationHandlerHolder& holder = handlerMap[rspId];
+ task = holder.ProcessNotification(msg, id, stateHandler);
+ }
- if (holder.IsProcessingComplete())
- handlerMap.erase(rspId);
+ if (task.IsValid())
+ userThreadPool.Dispatch(task);
}
else
{
@@ -217,9 +221,13 @@ namespace ignite
NotificationHandlerHolder& holder = handlerMap[notId];
holder.SetHandler(handler);
+ }
- if (holder.IsProcessingComplete())
- handlerMap.erase(notId);
+ void DataChannel::DeregisterNotificationHandler(int64_t notId)
+ {
+ common::concurrent::CsLockGuard lock(handlerMutex);
+
+ handlerMap.erase(notId);
}
bool DataChannel::DoHandshake(const ProtocolVersion& propVer)
@@ -241,7 +249,7 @@ namespace ignite
binary::BinaryWriterImpl writer(&outStream, 0);
int32_t lenPos = outStream.Reserve(4);
- writer.WriteInt8(RequestType::HANDSHAKE);
+ writer.WriteInt8(MessageType::HANDSHAKE);
writer.WriteInt16(propVer.GetMajor());
writer.WriteInt16(propVer.GetMinor());
@@ -337,6 +345,18 @@ namespace ignite
return supportedVersions.find(ver) != supportedVersions.end();
}
+ void DataChannel::DeserializeMessage(const network::DataBuffer &data, Response &msg)
+ {
+ interop::InteropInputStream inStream(data.GetInputStream());
+
+ // Skipping size (4 bytes) and reqId (8 bytes)
+ inStream.Ignore(12);
+
+ binary::BinaryReaderImpl reader(&inStream);
+
+ msg.Read(reader, currentVersion);
+ }
+
void DataChannel::FailPendingRequests(const IgniteError* err)
{
IgniteError defaultErr(IgniteError::IGNITE_ERR_NETWORK_FAILURE, "Connection was closed");
@@ -352,9 +372,39 @@ namespace ignite
responseMap.clear();
}
+ {
+ common::concurrent::CsLockGuard lock(handlerMutex);
+
+ for (NotificationHandlerMap::iterator it = handlerMap.begin(); it != handlerMap.end(); ++it)
+ {
+ common::SP_ThreadPoolTask task = it->second.ProcessClosed();
+
+ if (task.IsValid())
+ userThreadPool.Dispatch(task);
+ }
+ }
+
if (!handshakePerformed)
stateHandler.OnHandshakeError(id, *err);
}
+
+ void DataChannel::CloseResource(int64_t resourceId)
+ {
+ ResourceCloseRequest req(resourceId);
+ Response rsp;
+
+ try
+ {
+ SyncMessage(req, rsp, config.GetConnectionTimeout());
+ }
+ catch (const IgniteError& err)
+ {
+ // Network failure means connection is closed or broken, which means
+ // that all resources were freed automatically.
+ if (err.GetCode() != IgniteError::IGNITE_ERR_NETWORK_FAILURE)
+ throw;
+ }
+ }
}
}
}
diff --git a/modules/platforms/cpp/thin-client/src/impl/data_channel.h b/modules/platforms/cpp/thin-client/src/impl/data_channel.h
index aef02d0..5e09b50 100644
--- a/modules/platforms/cpp/thin-client/src/impl/data_channel.h
+++ b/modules/platforms/cpp/thin-client/src/impl/data_channel.h
@@ -26,6 +26,7 @@
#include <ignite/thin/ignite_client_configuration.h>
#include <ignite/common/concurrent.h>
+#include <ignite/common/thread_pool.h>
#include <ignite/network/socket_client.h>
#include <ignite/network/async_client_pool.h>
@@ -107,13 +108,15 @@ namespace ignite
* @param cfg Configuration.
* @param typeMgr Type manager.
* @param stateHandler State handler.
+ * @param userThreadPool Thread pool to use to dispatch tasks that can run user code.
*/
DataChannel(uint64_t id,
const network::EndPoint& addr,
const ignite::network::SP_AsyncClientPool& asyncPool,
const ignite::thin::IgniteClientConfiguration& cfg,
binary::BinaryTypeManager& typeMgr,
- ChannelStateHandler& stateHandler);
+ ChannelStateHandler& stateHandler,
+ common::ThreadPool& userThreadPool);
/**
* Destructor.
@@ -129,8 +132,10 @@ namespace ignite
/**
* Close connection.
+ *
+ * @param err Error.
*/
- void Close();
+ void Close(const IgniteError* err);
/**
* Synchronously send request message and receive response. Uses provided timeout.
@@ -151,12 +156,20 @@ namespace ignite
/**
* Register handler for the notification.
+ *
* @param notId Notification ID.
* @param handler Handler.
*/
void RegisterNotificationHandler(int64_t notId, const SP_NotificationHandler& handler);
/**
+ * Deregister handler for the notification.
+ *
+ * @param notId Notification ID.
+ */
+ void DeregisterNotificationHandler(int64_t notId);
+
+ /**
* Get remote node.
* @return Node.
*/
@@ -176,22 +189,10 @@ namespace ignite
/**
* Deserialize message received by this channel.
- * @tparam T Message type.
* @param data Data.
* @param msg Message.
*/
- template<typename T>
- void DeserializeMessage(const network::DataBuffer& data, T& msg)
- {
- interop::InteropInputStream inStream(data.GetInputStream());
-
- // Skipping size (4 bytes) and reqId (8 bytes)
- inStream.Ignore(12);
-
- binary::BinaryReaderImpl reader(&inStream);
-
- msg.Read(reader, currentVersion);
- }
+ void DeserializeMessage(const network::DataBuffer& data, Response& msg);
/**
* Fail all pending requests.
@@ -200,6 +201,13 @@ namespace ignite
*/
void FailPendingRequests(const IgniteError* err);
+ /**
+ * Close remote resource.
+ *
+ * @param resourceId Resource ID.
+ */
+ void CloseResource(int64_t resourceId);
+
private:
IGNITE_NO_COPY_ASSIGNMENT(DataChannel);
@@ -306,6 +314,9 @@ namespace ignite
/** Notification handlers. */
NotificationHandlerMap handlerMap;
+
+ /** Thread pool to dispatch user code execution. */
+ common::ThreadPool& userThreadPool;
};
/** Shared pointer type. */
diff --git a/modules/platforms/cpp/thin-client/src/impl/data_router.cpp b/modules/platforms/cpp/thin-client/src/impl/data_router.cpp
index ea1d36b..78580ca 100644
--- a/modules/platforms/cpp/thin-client/src/impl/data_router.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/data_router.cpp
@@ -41,7 +41,8 @@ namespace ignite
namespace thin
{
DataRouter::DataRouter(const ignite::thin::IgniteClientConfiguration& cfg) :
- config(cfg)
+ config(cfg),
+ userThreadPool(0)
{
srand(common::GetRandSeed());
@@ -93,6 +94,7 @@ namespace ignite
asyncPool.Get()->SetHandler(this);
}
+ userThreadPool.Start();
asyncPool.Get()->Start(ranges, config.GetConnectionsLimit());
bool connected = EnsureConnected(config.GetConnectionTimeout());
@@ -109,6 +111,8 @@ namespace ignite
asyncPool.Get()->SetHandler(0);
asyncPool.Get()->Stop();
}
+
+ userThreadPool.Stop();
}
bool DataRouter::EnsureConnected(int32_t timeout)
@@ -140,7 +144,7 @@ namespace ignite
void DataRouter::OnConnectionSuccess(const network::EndPoint& addr, uint64_t id)
{
- SP_DataChannel channel(new DataChannel(id, addr, asyncPool, config, typeMgr, *this));
+ SP_DataChannel channel(new DataChannel(id, addr, asyncPool, config, typeMgr, *this, userThreadPool));
{
common::concurrent::CsLockGuard lock(channelsMutex);
@@ -173,13 +177,9 @@ namespace ignite
{
common::concurrent::CsLockGuard lock(channelsMutex);
- connectedChannels.erase(id);
-
- ChannelsIdMap::iterator it = channels.find(id);
- if (it == channels.end())
- return;
+ channel = FindChannelLocked(id);
- channel = it->second;
+ connectedChannels.erase(id);
InvalidateChannelLocked(channel);
}
@@ -188,14 +188,7 @@ namespace ignite
void DataRouter::OnMessageReceived(uint64_t id, const network::DataBuffer& msg)
{
- SP_DataChannel channel;
- {
- common::concurrent::CsLockGuard lock(channelsMutex);
-
- ChannelsIdMap::iterator it = channels.find(id);
- if (it != channels.end())
- channel = it->second;
- }
+ SP_DataChannel channel = FindChannel(id);
if (channel.IsValid())
channel.Get()->ProcessMessage(msg);
@@ -214,12 +207,7 @@ namespace ignite
connectedChannels.insert(id);
channelsWaitPoint.NotifyAll();
- SP_DataChannel channel;
-
- ChannelsIdMap::iterator it = channels.find(id);
- if (it != channels.end())
- channel = it->second;
-
+ SP_DataChannel channel = FindChannelLocked(id);
if (channel.IsValid())
{
const IgniteNode& node = channel.Get()->GetNode();
@@ -238,6 +226,14 @@ namespace ignite
channelsWaitPoint.NotifyAll();
}
+ void DataRouter::OnNotificationHandlingError(uint64_t id, const IgniteError &err)
+ {
+ SP_DataChannel channel = FindChannel(id);
+
+ if (channel.IsValid())
+ channel.Get()->Close(&err);
+ }
+
SP_DataChannel DataRouter::SyncMessage(Request &req, Response &rsp)
{
SP_DataChannel channel = GetRandomChannel();
@@ -423,6 +419,21 @@ namespace ignite
std::random_shuffle(ranges.begin(), ranges.end());
}
+
+ SP_DataChannel DataRouter::FindChannel(uint64_t id)
+ {
+ common::concurrent::CsLockGuard lock(channelsMutex);
+ return FindChannelLocked(id);
+ }
+
+ SP_DataChannel DataRouter::FindChannelLocked(uint64_t id)
+ {
+ ChannelsIdMap::iterator it = channels.find(id);
+ if (it != channels.end())
+ return it->second;
+
+ return SP_DataChannel();
+ }
}
}
}
diff --git a/modules/platforms/cpp/thin-client/src/impl/data_router.h b/modules/platforms/cpp/thin-client/src/impl/data_router.h
index d11cfc5..aa31f39 100644
--- a/modules/platforms/cpp/thin-client/src/impl/data_router.h
+++ b/modules/platforms/cpp/thin-client/src/impl/data_router.h
@@ -29,6 +29,7 @@
#include <ignite/thin/ignite_client_configuration.h>
#include <ignite/common/concurrent.h>
+#include <ignite/common/thread_pool.h>
#include <ignite/common/promise.h>
#include <ignite/network/end_point.h>
#include <ignite/network/tcp_range.h>
@@ -148,6 +149,14 @@ namespace ignite
virtual void OnHandshakeError(uint64_t id, const IgniteError& err);
/**
+ * Called if notification handling failed.
+ *
+ * @param id Channel ID.
+ * @param err Error.
+ */
+ virtual void OnNotificationHandlingError(uint64_t id, const IgniteError& err);
+
+ /**
* Synchronously send request message and receive response.
*
* @param req Request message.
@@ -313,6 +322,23 @@ namespace ignite
*/
void CheckHandshakeErrorLocked();
+ /**
+ * Find channel by ID.
+ *
+ * @param id Channel ID
+ * @return Channel or null if is not present.
+ */
+ SP_DataChannel FindChannel(uint64_t id);
+
+ /**
+ * Find channel by ID.
+ * @warning May only be called when lock is held!
+ *
+ * @param id Channel ID
+ * @return Channel or null if is not present.
+ */
+ SP_DataChannel FindChannelLocked(uint64_t id);
+
/** Configuration. */
ignite::thin::IgniteClientConfiguration config;
@@ -348,6 +374,9 @@ namespace ignite
/** Cache affinity manager. */
affinity::AffinityManager affinityManager;
+
+ /** Thread pool to dispatch user code execution. */
+ common::ThreadPool userThreadPool;
};
/** Shared pointer type. */
diff --git a/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.cpp
index 6ca9ab4..f153579 100644
--- a/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.cpp
@@ -41,7 +41,9 @@ namespace ignite
IgniteClientImpl::~IgniteClientImpl()
{
- // No-op.
+ DataRouter* router0 = router.Get();
+ if (router0)
+ router0->Close();
}
void IgniteClientImpl::Start()
@@ -109,7 +111,7 @@ namespace ignite
void IgniteClientImpl::GetCacheNames(std::vector<std::string>& cacheNames)
{
- RequestAdapter<RequestType::CACHE_GET_NAMES> req;
+ RequestAdapter<MessageType::CACHE_GET_NAMES> req;
GetCacheNamesResponse rsp(cacheNames);
router.Get()->SyncMessage(req, rsp);
diff --git a/modules/platforms/cpp/thin-client/src/impl/message.cpp b/modules/platforms/cpp/thin-client/src/impl/message.cpp
index 0c0e6ee..8390e2d 100644
--- a/modules/platforms/cpp/thin-client/src/impl/message.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/message.cpp
@@ -20,6 +20,7 @@
#include <ignite/impl/thin/writable.h>
#include <ignite/impl/thin/readable.h>
+#include <ignite/impl/thin/cache/continuous/continuous_query_client_holder.h>
#include "impl/response_status.h"
#include "impl/data_channel.h"
@@ -37,6 +38,11 @@ namespace ignite
// No-op.
}
+ void ResourceCloseRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolVersion&) const
+ {
+ writer.WriteInt64(id);
+ }
+
void CachePartitionsRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolVersion&) const
{
writer.WriteInt32(static_cast<int32_t>(cacheIds.size()));
@@ -116,30 +122,6 @@ namespace ignite
return (flags & Flag::FAILURE) != 0;
}
- ClientCacheNodePartitionsResponse::ClientCacheNodePartitionsResponse(
- std::vector<NodePartitions>& nodeParts):
- nodeParts(nodeParts)
- {
- // No-op.
- }
-
- ClientCacheNodePartitionsResponse::~ClientCacheNodePartitionsResponse()
- {
- // No-op.
- }
-
- void ClientCacheNodePartitionsResponse::ReadOnSuccess(
- binary::BinaryReaderImpl& reader, const ProtocolVersion&)
- {
- int32_t num = reader.ReadInt32();
-
- nodeParts.clear();
- nodeParts.resize(static_cast<size_t>(num));
-
- for (int32_t i = 0; i < num; ++i)
- nodeParts[i].Read(reader);
- }
-
CachePartitionsResponse::CachePartitionsResponse(std::vector<PartitionAwarenessGroup>& groups) :
groups(groups)
{
@@ -274,7 +256,7 @@ namespace ignite
}
CacheGetSizeRequest::CacheGetSizeRequest(int32_t cacheId, bool binary, int32_t peekModes) :
- CacheRequest<RequestType::CACHE_GET_SIZE>(cacheId, binary),
+ CacheRequest<MessageType::CACHE_GET_SIZE>(cacheId, binary),
peekModes(peekModes)
{
// No-op.
@@ -282,7 +264,7 @@ namespace ignite
void CacheGetSizeRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolVersion& ver) const
{
- CacheRequest<RequestType::CACHE_GET_SIZE>::Write(writer, ver);
+ CacheRequest<MessageType::CACHE_GET_SIZE>::Write(writer, ver);
if (peekModes & ignite::thin::cache::CachePeekMode::ALL)
{
@@ -335,7 +317,7 @@ namespace ignite
int32_t cacheId,
const ignite::thin::cache::query::SqlFieldsQuery &qry
) :
- CacheRequest<RequestType::QUERY_SQL_FIELDS>(cacheId, false),
+ CacheRequest<MessageType::QUERY_SQL_FIELDS>(cacheId, false),
qry(qry)
{
// No-op.
@@ -343,7 +325,7 @@ namespace ignite
void SqlFieldsQueryRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolVersion& ver) const
{
- CacheRequest<RequestType::QUERY_SQL_FIELDS>::Write(writer, ver);
+ CacheRequest<MessageType::QUERY_SQL_FIELDS>::Write(writer, ver);
if (qry.schema.empty())
writer.WriteNull();
@@ -395,11 +377,28 @@ namespace ignite
writer.WriteInt64(cursorId);
}
- void SqlFieldsCursorGetPageResponse::ReadOnSuccess(binary::BinaryReaderImpl&reader, const ProtocolVersion&)
+ void SqlFieldsCursorGetPageResponse::ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolVersion&)
{
cursorPage.Get()->Read(reader);
}
+ void ContinuousQueryRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolVersion& ver) const
+ {
+ CacheRequest<MessageType::QUERY_CONTINUOUS>::Write(writer, ver);
+
+ writer.WriteInt32(pageSize);
+ writer.WriteInt64(timeInterval);
+ writer.WriteBool(includeExpired);
+
+ // TODO: IGNITE-16291: Implement remote filters for Continuous Queries.
+ writer.WriteNull();
+ }
+
+ void ContinuousQueryResponse::ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolVersion&)
+ {
+ queryId = reader.ReadInt64();
+ }
+
void ComputeTaskExecuteRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolVersion&) const
{
// To be changed when Cluster API is implemented.
@@ -417,31 +416,15 @@ namespace ignite
taskId = reader.ReadInt64();
}
- void ComputeTaskFinishedNotification::Read(binary::BinaryReaderImpl& reader, const ProtocolVersion&)
+ void ComputeTaskFinishedNotification::ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolVersion&)
{
- int16_t flags = reader.ReadInt16();
- if (!(flags & Flag::NOTIFICATION))
- {
- IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_GENERIC, "Was expecting notification but got "
- "different kind of message", "flags", flags)
- }
-
- int16_t opCode = reader.ReadInt16();
- if (opCode != RequestType::COMPUTE_TASK_FINISHED)
- {
- IGNITE_ERROR_FORMATTED_2(IgniteError::IGNITE_ERR_GENERIC, "Unexpected notification type",
- "expected", (int)RequestType::COMPUTE_TASK_FINISHED, "actual", opCode)
- }
+ result.Read(reader);
+ }
- if (flags & Flag::FAILURE)
- {
- status = reader.ReadInt32();
- reader.ReadString(errorMessage);
- }
- else
- {
- result.Read(reader);
- }
+ void ClientCacheEntryEventNotification::ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolVersion&)
+ {
+ ignite::binary::BinaryRawReader reader0(&reader);
+ query.ReadAndProcessEvents(reader0);
}
}
}
diff --git a/modules/platforms/cpp/thin-client/src/impl/message.h b/modules/platforms/cpp/thin-client/src/impl/message.h
index 4f9e920..dad00c7 100644
--- a/modules/platforms/cpp/thin-client/src/impl/message.h
+++ b/modules/platforms/cpp/thin-client/src/impl/message.h
@@ -53,6 +53,18 @@ namespace ignite
/* Forward declaration. */
class Writable;
+ namespace cache
+ {
+ namespace query
+ {
+ namespace continuous
+ {
+ /* Forward declaration. */
+ class ContinuousQueryClientHolderBase;
+ }
+ }
+ }
+
struct ClientType
{
enum Type
@@ -61,7 +73,7 @@ namespace ignite
};
};
- struct RequestType
+ struct MessageType
{
enum Type
{
@@ -158,6 +170,12 @@ namespace ignite
/** SQL fields query get next cursor page request. */
QUERY_SQL_FIELDS_CURSOR_GET_PAGE = 2005,
+ /** Continuous query. */
+ QUERY_CONTINUOUS = 2006,
+
+ /** Continuous query notification event. */
+ QUERY_CONTINUOUS_EVENT_NOTIFICATION = 2007,
+
/** Get binary type info. */
GET_BINARY_TYPE = 3002,
@@ -292,7 +310,44 @@ namespace ignite
/**
* Cache partitions request.
*/
- class CachePartitionsRequest : public RequestAdapter<RequestType::CACHE_PARTITIONS>
+ class ResourceCloseRequest : public RequestAdapter<MessageType::RESOURCE_CLOSE>
+ {
+ public:
+ /**
+ * Constructor.
+ *
+ * @param id Resource ID.
+ */
+ ResourceCloseRequest(int64_t id) :
+ id(id)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~ResourceCloseRequest()
+ {
+ // No-op.
+ }
+
+ /**
+ * Write request using provided writer.
+ *
+ * @param writer Writer.
+ */
+ virtual void Write(binary::BinaryWriterImpl& writer, const ProtocolVersion&) const;
+
+ private:
+ /** Resource ID. */
+ const int64_t id;
+ };
+
+ /**
+ * Cache partitions request.
+ */
+ class CachePartitionsRequest : public RequestAdapter<MessageType::CACHE_PARTITIONS>
{
public:
/**
@@ -324,7 +379,7 @@ namespace ignite
/**
* Get or create cache request.
*/
- class GetOrCreateCacheWithNameRequest : public RequestAdapter<RequestType::CACHE_GET_OR_CREATE_WITH_NAME>
+ class GetOrCreateCacheWithNameRequest : public RequestAdapter<MessageType::CACHE_GET_OR_CREATE_WITH_NAME>
{
public:
/**
@@ -357,7 +412,7 @@ namespace ignite
/**
* Get or create cache request.
*/
- class CreateCacheWithNameRequest : public RequestAdapter<RequestType::CACHE_CREATE_WITH_NAME>
+ class CreateCacheWithNameRequest : public RequestAdapter<MessageType::CACHE_CREATE_WITH_NAME>
{
public:
/**
@@ -390,7 +445,7 @@ namespace ignite
/**
* Destroy cache request.
*/
- class DestroyCacheRequest : public RequestAdapter<RequestType::CACHE_DESTROY>
+ class DestroyCacheRequest : public RequestAdapter<MessageType::CACHE_DESTROY>
{
public:
/**
@@ -504,7 +559,7 @@ namespace ignite
/**
* Cache get size request.
*/
- class CacheGetSizeRequest : public CacheRequest<RequestType::CACHE_GET_SIZE>
+ class CacheGetSizeRequest : public CacheRequest<MessageType::CACHE_GET_SIZE>
{
public:
/**
@@ -698,7 +753,7 @@ namespace ignite
/**
* Tx start request.
*/
- class TxStartRequest : public RequestAdapter<RequestType::OP_TX_START>
+ class TxStartRequest : public RequestAdapter<MessageType::OP_TX_START>
{
public:
/**
@@ -755,7 +810,7 @@ namespace ignite
/**
* Tx end request.
*/
- class TxEndRequest : public RequestAdapter<RequestType::OP_TX_END>
+ class TxEndRequest : public RequestAdapter<MessageType::OP_TX_END>
{
public:
/**
@@ -800,7 +855,7 @@ namespace ignite
/**
* Cache get binary type request.
*/
- class BinaryTypeGetRequest : public RequestAdapter<RequestType::GET_BINARY_TYPE>
+ class BinaryTypeGetRequest : public RequestAdapter<MessageType::GET_BINARY_TYPE>
{
public:
/**
@@ -837,7 +892,7 @@ namespace ignite
/**
* Cache put binary type request.
*/
- class BinaryTypePutRequest : public RequestAdapter<RequestType::PUT_BINARY_TYPE>
+ class BinaryTypePutRequest : public RequestAdapter<MessageType::PUT_BINARY_TYPE>
{
public:
/**
@@ -874,7 +929,7 @@ namespace ignite
/**
* Cache SQL fields query request.
*/
- class SqlFieldsQueryRequest : public CacheRequest<RequestType::QUERY_SQL_FIELDS>
+ class SqlFieldsQueryRequest : public CacheRequest<MessageType::QUERY_SQL_FIELDS>
{
public:
/**
@@ -908,7 +963,7 @@ namespace ignite
/**
* Cache SQL fields cursor get page request.
*/
- class SqlFieldsCursorGetPageRequest : public RequestAdapter<RequestType::QUERY_SQL_FIELDS_CURSOR_GET_PAGE>
+ class SqlFieldsCursorGetPageRequest : public RequestAdapter<MessageType::QUERY_SQL_FIELDS_CURSOR_GET_PAGE>
{
public:
/**
@@ -943,9 +998,58 @@ namespace ignite
};
/**
+ * Continuous query request.
+ */
+ class ContinuousQueryRequest : public CacheRequest<MessageType::QUERY_CONTINUOUS>
+ {
+ public:
+ /**
+ * Constructor.
+ *
+ * @param cacheId Cache ID.
+ * @param pageSize Page size.
+ * @param timeInterval Time interval.
+ * @param includeExpired Include expired.
+ */
+ explicit ContinuousQueryRequest(int32_t cacheId, int32_t pageSize, int64_t timeInterval, bool includeExpired) :
+ CacheRequest(cacheId, false),
+ pageSize(pageSize),
+ timeInterval(timeInterval),
+ includeExpired(includeExpired)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~ContinuousQueryRequest()
+ {
+ // No-op.
+ }
+
+ /**
+ * Write request using provided writer.
+ * @param writer Writer.
+ * @param ver Version.
+ */
+ virtual void Write(binary::BinaryWriterImpl& writer, const ProtocolVersion& ver) const;
+
+ private:
+ /** Page size. */
+ const int32_t pageSize;
+
+ /** Time interval. */
+ const int64_t timeInterval;
+
+ /** Include expired. */
+ const bool includeExpired;
+ };
+
+ /**
* Compute task execute request.
*/
- class ComputeTaskExecuteRequest : public RequestAdapter<RequestType::COMPUTE_TASK_EXECUTE>
+ class ComputeTaskExecuteRequest : public RequestAdapter<MessageType::COMPUTE_TASK_EXECUTE>
{
public:
/**
@@ -1016,7 +1120,7 @@ namespace ignite
* @param reader Reader.
* @param ver Protocol version.
*/
- void Read(binary::BinaryReaderImpl& reader, const ProtocolVersion& ver);
+ virtual void Read(binary::BinaryReaderImpl& reader, const ProtocolVersion& ver);
/**
* Get request processing status.
@@ -1072,7 +1176,6 @@ namespace ignite
// No-op.
}
- private:
/** Flags. */
int16_t flags;
@@ -1089,36 +1192,6 @@ namespace ignite
/**
* Cache node list request.
*/
- class ClientCacheNodePartitionsResponse : public Response
- {
- public:
- /**
- * Constructor.
- *
- * @param nodeParts Node partitions.
- */
- ClientCacheNodePartitionsResponse(std::vector<NodePartitions>& nodeParts);
-
- /**
- * Destructor.
- */
- virtual ~ClientCacheNodePartitionsResponse();
-
- /**
- * Read data if response status is ResponseStatus::SUCCESS.
- *
- * @param reader Reader.
- */
- virtual void ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolVersion&);
-
- private:
- /** Node partitions. */
- std::vector<NodePartitions>& nodeParts;
- };
-
- /**
- * Cache node list request.
- */
class CachePartitionsResponse : public Response
{
public:
@@ -1524,6 +1597,50 @@ namespace ignite
};
/**
+ * Cache Continuous Query response.
+ */
+ class ContinuousQueryResponse : public Response
+ {
+ public:
+ /**
+ * Constructor.
+ */
+ ContinuousQueryResponse() :
+ queryId(0)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~ContinuousQueryResponse()
+ {
+ // No-op.
+ }
+
+ /**
+ * Get cursor page.
+ * @return Cursor page.
+ */
+ int64_t GetQueryId() const
+ {
+ return queryId;
+ }
+
+ /**
+ * Read data if response status is ResponseStatus::SUCCESS.
+ *
+ * @param reader Reader.
+ */
+ virtual void ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolVersion&);
+
+ private:
+ /** Query ID. */
+ int64_t queryId;
+ };
+
+ /**
* Compute task execute response.
*/
class ComputeTaskExecuteResponse : public Response
@@ -1570,17 +1687,107 @@ namespace ignite
/**
* Compute task finished notification.
*/
- class ComputeTaskFinishedNotification
+ class Notification : public Response
+ {
+ public:
+ /**
+ * Constructor.
+ */
+ Notification()
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~Notification()
+ {
+ // No-op.
+ }
+
+ /**
+ * Read notification data.
+ *
+ * @param reader Reader.
+ */
+ virtual void Read(binary::BinaryReaderImpl& reader, const ProtocolVersion& ver)
+ {
+ flags = reader.ReadInt16();
+
+ int16_t readOpCode = reader.ReadInt16();
+ if (readOpCode != GetOperationCode())
+ {
+ IGNITE_ERROR_FORMATTED_2(IgniteError::IGNITE_ERR_GENERIC, "Unexpected notification type",
+ "expected", GetOperationCode(), "actual", readOpCode)
+ }
+
+ if (IsFailure())
+ {
+ status = reader.ReadInt32();
+ reader.ReadString(error);
+
+ return;
+ }
+
+ ReadOnSuccess(reader, ver);
+ }
+
+ /**
+ * Get operation code.
+ *
+ * @return Operation code.
+ */
+ virtual int16_t GetOperationCode() const = 0;
+
+ protected:
+ /**
+ * Read data if response status is ResponseStatus::SUCCESS.
+ *
+ * @param reader Reader.
+ * @param ver Version.
+ */
+ virtual void ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolVersion& ver) = 0;
+ };
+
+ /**
+ * Request adapter.
+ *
+ * @tparam OpCode Operation code.
+ */
+ template<int16_t OpCode>
+ class NotificationAdapter : public Notification
{
public:
- typedef ComputeTaskExecuteResponse ResponseType;
+ /**
+ * Destructor.
+ */
+ virtual ~NotificationAdapter()
+ {
+ // No-op.
+ }
/**
+ * Get operation code.
+ *
+ * @return Operation code.
+ */
+ virtual int16_t GetOperationCode() const
+ {
+ return OpCode;
+ }
+ };
+
+ /**
+ * Compute task finished notification.
+ */
+ class ComputeTaskFinishedNotification : public NotificationAdapter<MessageType::COMPUTE_TASK_FINISHED>
+ {
+ public:
+ /**
* Constructor.
*/
- ComputeTaskFinishedNotification(Readable& result) :
- status(0),
- errorMessage(),
+ explicit ComputeTaskFinishedNotification(Readable& result) :
result(result)
{
// No-op.
@@ -1595,39 +1802,52 @@ namespace ignite
}
/**
- * Check if the message is failure.
- * @return @c true on failure.
+ * Read data if response status is ResponseStatus::SUCCESS.
+ *
+ * @param reader Reader.
+ * @param ver Version.
+ */
+ virtual void ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolVersion& ver);
+
+ private:
+ /** Result. */
+ Readable& result;
+ };
+
+ /**
+ * Continuous query notification.
+ */
+ class ClientCacheEntryEventNotification : public NotificationAdapter<MessageType::QUERY_CONTINUOUS_EVENT_NOTIFICATION>
+ {
+ public:
+ /**
+ * Constructor.
*/
- bool IsFailure() const
+ explicit ClientCacheEntryEventNotification(cache::query::continuous::ContinuousQueryClientHolderBase& query) :
+ query(query)
{
- return !errorMessage.empty();
+ // No-op.
}
/**
- * Get error message.
- * @return Error message.
+ * Destructor.
*/
- const std::string& GetErrorMessage() const
+ virtual ~ClientCacheEntryEventNotification()
{
- return errorMessage;
+ // No-op.
}
/**
- * Read response using provided reader.
+ * Read data if response status is ResponseStatus::SUCCESS.
+ *
* @param reader Reader.
- * @param ver Protocol version.
+ * @param ver Version.
*/
- void Read(binary::BinaryReaderImpl& reader, const ProtocolVersion& ver);
+ virtual void ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolVersion& ver);
private:
- /** Status. */
- int32_t status;
-
- /** Error message. */
- std::string errorMessage;
-
/** Result. */
- Readable& result;
+ cache::query::continuous::ContinuousQueryClientHolderBase& query;
};
}
}
diff --git a/modules/platforms/cpp/thin-client/src/impl/notification_handler.h b/modules/platforms/cpp/thin-client/src/impl/notification_handler.h
index c2acc5b..de9e88c 100644
--- a/modules/platforms/cpp/thin-client/src/impl/notification_handler.h
+++ b/modules/platforms/cpp/thin-client/src/impl/notification_handler.h
@@ -23,6 +23,7 @@
#include <vector>
#include <ignite/ignite_error.h>
+#include <ignite/common/thread_pool.h>
#include <ignite/network/data_buffer.h>
#include <ignite/impl/interop/interop_memory.h>
@@ -51,12 +52,137 @@ namespace ignite
* @param msg Message.
* @return @c true if processing complete.
*/
- virtual bool OnNotification(const network::DataBuffer& msg) = 0;
+ virtual void OnNotification(const network::DataBuffer& msg) = 0;
+
+ /**
+ * Disconnected callback.
+ *
+ * Called if channel was disconnected.
+ */
+ virtual void OnDisconnected() = 0;
};
/** Shared pointer to notification handler. */
typedef common::concurrent::SharedPointer<NotificationHandler> SP_NotificationHandler;
+ /**
+ * Task that handles notification
+ */
+ class HandleNotificationTask : public common::ThreadPoolTask
+ {
+ public:
+ /**
+ * Constructor.
+ *
+ * @param msg Message.
+ * @param handler Notification handler.
+ * @param channelId Channel ID.
+ * @param channelStateHandler Channel state handler.
+ */
+ HandleNotificationTask(
+ const network::DataBuffer& msg,
+ const SP_NotificationHandler& handler,
+ uint64_t channelId,
+ ChannelStateHandler& channelStateHandler
+ ) :
+ msg(msg),
+ handler(handler),
+ channelId(channelId),
+ channelStateHandler(channelStateHandler)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~HandleNotificationTask()
+ {
+ // No-op.
+ }
+
+ /**
+ * Execute task.
+ */
+ virtual void Execute()
+ {
+ handler.Get()->OnNotification(msg);
+ }
+
+ /**
+ * Called if error occurred during task processing.
+ *
+ * @param err Error.
+ */
+ virtual void OnError(const IgniteError& err)
+ {
+ channelStateHandler.OnNotificationHandlingError(channelId, err);
+ }
+
+ private:
+ /** Message. */
+ network::DataBuffer msg;
+
+ /** Handler. */
+ SP_NotificationHandler handler;
+
+ /** Channel ID. */
+ uint64_t channelId;
+
+ /** Channel state handler. */
+ ChannelStateHandler& channelStateHandler;
+ };
+
+ /**
+ * Task that handles connection closing
+ */
+ class DisconnectedTask : public common::ThreadPoolTask
+ {
+ public:
+ /**
+ * Constructor.
+ *
+ * @param handler Notification handler.
+ */
+ explicit DisconnectedTask(const SP_NotificationHandler& handler) :
+ handler(handler)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~DisconnectedTask()
+ {
+ // No-op.
+ }
+
+ /**
+ * Execute task.
+ */
+ virtual void Execute()
+ {
+ handler.Get()->OnDisconnected();
+ }
+
+ /**
+ * Called if error occurred during task processing.
+ *
+ * @param err Error.
+ */
+ virtual void OnError(const IgniteError&)
+ {
+ // No-op. Connection already closed so there is not much we can do.
+ // TODO: Add logging here once it's implemented.
+ }
+
+ private:
+ /** Handler. */
+ SP_NotificationHandler handler;
+ };
+
+
/** Notification handler. */
class NotificationHandlerHolder
{
@@ -68,9 +194,9 @@ namespace ignite
* Default constructor.
*/
NotificationHandlerHolder() :
+ disconnected(false),
queue(),
- handler(),
- complete(false)
+ handler()
{
// No-op.
}
@@ -87,16 +213,37 @@ namespace ignite
* Process notification.
*
* @param msg Notification message to process.
+ * @param channelId Channel ID.
+ * @param channelStateHandler Channel state handler.
+ * @return Task for dispatching if handler is present and null otherwise.
*/
- void ProcessNotification(const network::DataBuffer& msg)
+ common::SP_ThreadPoolTask ProcessNotification(const network::DataBuffer& msg,
+ uint64_t channelId, ChannelStateHandler& channelStateHandler)
{
- if (complete)
- return;
+ network::DataBuffer notification(msg.Clone());
if (handler.IsValid())
- complete = handler.Get()->OnNotification(msg);
- else
- queue.push_back(msg.Clone());
+ return common::SP_ThreadPoolTask(
+ new HandleNotificationTask(notification, handler, channelId, channelStateHandler));
+
+ queue.push_back(notification);
+
+ return common::SP_ThreadPoolTask();
+ }
+
+ /**
+ * Process disconnect.
+ *
+ * @return Task for dispatching if handler is present and null otherwise.
+ */
+ common::SP_ThreadPoolTask ProcessClosed()
+ {
+ disconnected = true;
+
+ if (handler.IsValid())
+ return common::SP_ThreadPoolTask(new DisconnectedTask(handler));
+
+ return common::SP_ThreadPoolTask();
}
/**
@@ -104,38 +251,31 @@ namespace ignite
*
* @param handler Notification handler.
*/
- void SetHandler(const SP_NotificationHandler& handler)
+ void SetHandler(const SP_NotificationHandler& handler0)
{
- if (this->handler.IsValid())
+ if (handler.IsValid())
throw IgniteError(IgniteError::IGNITE_ERR_GENERIC,
"Internal error: handler is already set for the notification");
- this->handler = handler;
+ handler = handler0;
for (MessageQueue::iterator it = queue.begin(); it != queue.end(); ++it)
- complete = complete || this->handler.Get()->OnNotification(*it);
+ handler.Get()->OnNotification(*it);
queue.clear();
- }
- /**
- * Check whether processing complete.
- *
- * @return @c true if processing complete.
- */
- bool IsProcessingComplete() const
- {
- return complete;
+ if (disconnected)
+ handler.Get()->OnDisconnected();
}
private:
+ /** Disconnected flag. */
+ bool disconnected;
+
/** Notification queue. */
MessageQueue queue;
/** Notification handler. */
SP_NotificationHandler handler;
-
- /** Processing complete. */
- bool complete;
};
}
}