You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by is...@apache.org on 2017/03/31 13:41:43 UTC
[1/2] ignite git commit: IGNITE-3575 CPP: Added support for
continuous queries remote filters.
Repository: ignite
Updated Branches:
refs/heads/master 79bac4f87 -> d4da92b7a
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp b/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
index dfef8e4..858ee77 100644
--- a/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
+++ b/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
@@ -17,8 +17,11 @@
#include <ignite/common/utils.h>
-#include "ignite/impl/cache/cache_impl.h"
-#include "ignite/impl/binary/binary_type_updater_impl.h"
+#include <ignite/impl/cache/cache_impl.h>
+#include <ignite/impl/binary/binary_type_updater_impl.h>
+#include <ignite/impl/cache/query/continuous/continuous_query_handle_impl.h>
+
+#include <ignite/cache/query/continuous/continuous_query_handle.h>
using namespace ignite::common::concurrent;
using namespace ignite::jni::java;
@@ -381,14 +384,93 @@ namespace ignite
IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
}
- struct DummyQry { void Write(BinaryRawWriter&) const { }};
+ struct Dummy
+ {
+ void Write(BinaryRawWriter&) const
+ {
+ // No-op.
+ }
+ };
ContinuousQueryHandleImpl* CacheImpl::QueryContinuous(const SharedPointer<ContinuousQueryImplBase> qry,
IgniteError& err)
{
- DummyQry dummy;
+ Dummy dummy;
return QueryContinuous(qry, dummy, -1, OP_QRY_CONTINUOUS, err);
}
+
+ template <typename T>
+ QueryCursorImpl* CacheImpl::QueryInternal(const T& qry, int32_t typ, IgniteError& err)
+ {
+ JniErrorInfo jniErr;
+
+ SharedPointer<InteropMemory> mem = GetEnvironment().AllocateMemory();
+ InteropMemory* mem0 = mem.Get();
+ InteropOutputStream out(mem0);
+ BinaryWriterImpl writer(&out, GetEnvironment().GetTypeManager());
+ BinaryRawWriter rawWriter(&writer);
+
+ qry.Write(rawWriter);
+
+ out.Synchronize();
+
+ jobject qryJavaRef = GetEnvironment().Context()->CacheOutOpQueryCursor(GetTarget(),
+ typ, mem.Get()->PointerLong(), &jniErr);
+
+ IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
+
+ if (jniErr.code == IGNITE_JNI_ERR_SUCCESS)
+ return new QueryCursorImpl(GetEnvironmentPointer(), qryJavaRef);
+
+ return 0;
+ }
+
+ template <typename T>
+ ContinuousQueryHandleImpl* CacheImpl::QueryContinuous(const SharedPointer<ContinuousQueryImplBase> qry,
+ const T& initialQry, int32_t typ, int32_t cmd, IgniteError& err)
+ {
+ JniErrorInfo jniErr;
+
+ SharedPointer<InteropMemory> mem = GetEnvironment().AllocateMemory();
+ InteropMemory* mem0 = mem.Get();
+ InteropOutputStream out(mem0);
+ BinaryWriterImpl writer(&out, GetEnvironment().GetTypeManager());
+ BinaryRawWriter rawWriter(&writer);
+
+ const ContinuousQueryImplBase& qry0 = *qry.Get();
+
+ int64_t handle = GetEnvironment().GetHandleRegistry().Allocate(qry);
+
+ rawWriter.WriteInt64(handle);
+ rawWriter.WriteBool(qry0.GetLocal());
+
+ event::CacheEntryEventFilterHolderBase& filterOp = qry0.GetFilterHolder();
+
+ filterOp.Write(writer);
+
+ rawWriter.WriteInt32(qry0.GetBufferSize());
+ rawWriter.WriteInt64(qry0.GetTimeInterval());
+
+ // Autounsubscribe is a filter feature.
+ rawWriter.WriteBool(false);
+
+ // Writing initial query. When there is not initial query writing -1.
+ rawWriter.WriteInt32(typ);
+ if (typ != -1)
+ initialQry.Write(rawWriter);
+
+ out.Synchronize();
+
+ jobject qryJavaRef = GetEnvironment().Context()->CacheOutOpContinuousQuery(GetTarget(),
+ cmd, mem.Get()->PointerLong(), &jniErr);
+
+ IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
+
+ if (jniErr.code == IGNITE_JNI_ERR_SUCCESS)
+ return new ContinuousQueryHandleImpl(GetEnvironmentPointer(), handle, qryJavaRef);
+
+ return 0;
+ }
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp b/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp
index b2fa1fd..b15183b 100644
--- a/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp
+++ b/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp
@@ -84,11 +84,6 @@ namespace ignite
return new QueryCursorImpl(env, res);
}
-
- void ContinuousQueryHandleImpl::SetQuery(SP_ContinuousQueryImplBase query)
- {
- qry = query;
- }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/src/impl/ignite_binding_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/ignite_binding_impl.cpp b/modules/platforms/cpp/core/src/impl/ignite_binding_impl.cpp
new file mode 100644
index 0000000..2e09de2
--- /dev/null
+++ b/modules/platforms/cpp/core/src/impl/ignite_binding_impl.cpp
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <ignite/impl/ignite_environment.h>
+#include <ignite/impl/ignite_binding_impl.h>
+
+using namespace ignite::common::concurrent;
+
+namespace ignite
+{
+ namespace impl
+ {
+ IgniteBindingImpl::IgniteBindingImpl(IgniteEnvironment &env) :
+ env(env),
+ callbacks()
+ {
+ // No-op.
+ }
+
+ int64_t IgniteBindingImpl::InvokeCallback(bool& found, int32_t type, int32_t id,
+ binary::BinaryReaderImpl& reader, binary::BinaryWriterImpl& writer)
+ {
+ int64_t key = makeKey(type, id);
+
+ CsLockGuard guard(lock);
+
+ std::map<int64_t, Callback*>::iterator it = callbacks.find(key);
+
+ found = it != callbacks.end();
+
+ if (found)
+ {
+ Callback* callback = it->second;
+
+ // We have found callback and does not need lock here anymore.
+ guard.Reset();
+
+ return callback(reader, writer, env);
+ }
+
+ return 0;
+ }
+
+ void IgniteBindingImpl::RegisterCallback(int32_t type, int32_t id, Callback* proc, IgniteError& err)
+ {
+ int64_t key = makeKey(type, id);
+
+ CsLockGuard guard(lock);
+
+ bool inserted = callbacks.insert(std::make_pair(key, proc)).second;
+
+ guard.Reset();
+
+ if (!inserted)
+ {
+ std::stringstream builder;
+
+ builder << "Trying to register multiple PRC callbacks with the same ID. [type="
+ << type << ", id=" << id << ']';
+
+ err = IgniteError(IgniteError::IGNITE_ERR_ENTRY_PROCESSOR, builder.str().c_str());
+ }
+ }
+
+ void IgniteBindingImpl::RegisterCallback(int32_t type, int32_t id, Callback* callback)
+ {
+ IgniteError err;
+
+ RegisterCallback(type, id, callback, err);
+
+ IgniteError::ThrowIfNeeded(err);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/ignite_environment.cpp b/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
index b37fa8f..4e2a1f2 100644
--- a/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
+++ b/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
@@ -15,14 +15,18 @@
* limitations under the License.
*/
-#include "ignite/impl/interop/interop_external_memory.h"
-#include "ignite/impl/binary/binary_reader_impl.h"
-#include "ignite/impl/ignite_environment.h"
-#include "ignite/cache/query/continuous/continuous_query.h"
-#include "ignite/binary/binary.h"
-#include "ignite/impl/binary/binary_type_updater_impl.h"
-#include "ignite/impl/module_manager.h"
-#include "ignite/ignite_binding.h"
+#include <ignite/impl/interop/interop_external_memory.h>
+#include <ignite/impl/binary/binary_reader_impl.h>
+#include <ignite/impl/binary/binary_type_updater_impl.h>
+#include <ignite/impl/module_manager.h>
+#include <ignite/impl/ignite_binding_impl.h>
+
+#include <ignite/binary/binary.h>
+#include <ignite/cache/query/continuous/continuous_query.h>
+#include <ignite/ignite_binding.h>
+#include <ignite/ignite_binding_context.h>
+
+#include <ignite/impl/ignite_environment.h>
using namespace ignite::common::concurrent;
using namespace ignite::jni::java;
@@ -42,6 +46,8 @@ namespace ignite
{
CACHE_INVOKE = 8,
CONTINUOUS_QUERY_LISTENER_APPLY = 18,
+ CONTINUOUS_QUERY_FILTER_CREATE = 19,
+ CONTINUOUS_QUERY_FILTER_APPLY = 20,
CONTINUOUS_QUERY_FILTER_RELEASE = 21,
REALLOC = 36,
ON_START = 49,
@@ -57,6 +63,7 @@ namespace ignite
*/
long long IGNITE_CALL InLongOutLong(void* target, int type, long long val)
{
+ int64_t res = 0;
SharedPointer<IgniteEnvironment>* env = static_cast<SharedPointer<IgniteEnvironment>*>(target);
switch (type)
@@ -77,6 +84,24 @@ namespace ignite
break;
}
+ case CONTINUOUS_QUERY_FILTER_CREATE:
+ {
+ SharedPointer<InteropMemory> mem = env->Get()->GetMemory(val);
+
+ res = env->Get()->OnContinuousQueryFilterCreate(mem);
+
+ break;
+ }
+
+ case CONTINUOUS_QUERY_FILTER_APPLY:
+ {
+ SharedPointer<InteropMemory> mem = env->Get()->GetMemory(val);
+
+ res = env->Get()->OnContinuousQueryFilterApply(mem);
+
+ break;
+ }
+
case CONTINUOUS_QUERY_FILTER_RELEASE:
{
// No-op.
@@ -98,7 +123,7 @@ namespace ignite
}
}
- return 0;
+ return res;
}
/**
@@ -152,10 +177,14 @@ namespace ignite
registry(DEFAULT_FAST_PATH_CONTAINERS_CAP, DEFAULT_SLOW_PATH_CONTAINERS_CAP),
metaMgr(new BinaryTypeManager()),
metaUpdater(0),
- binding(new IgniteBindingImpl()),
- moduleMgr(new ModuleManager(GetBindingContext()))
+ binding(),
+ moduleMgr()
{
- // No-op.
+ binding = SharedPointer<IgniteBindingImpl>(new IgniteBindingImpl(*this));
+
+ IgniteBindingContext bindingContext(cfg, GetBinding());
+
+ moduleMgr = SharedPointer<ModuleManager>(new ModuleManager(bindingContext));
}
IgniteEnvironment::~IgniteEnvironment()
@@ -263,14 +292,9 @@ namespace ignite
return metaUpdater;
}
- IgniteBinding IgniteEnvironment::GetBinding() const
- {
- return IgniteBinding(binding);
- }
-
- IgniteBindingContext IgniteEnvironment::GetBindingContext() const
+ SharedPointer<IgniteBindingImpl> IgniteEnvironment::GetBinding() const
{
- return IgniteBindingContext(*cfg, GetBinding());
+ return binding;
}
void IgniteEnvironment::ProcessorReleaseStart()
@@ -321,6 +345,62 @@ namespace ignite
}
}
+ int64_t IgniteEnvironment::OnContinuousQueryFilterCreate(SharedPointer<InteropMemory>& mem)
+ {
+ if (!binding.Get())
+ throw IgniteError(IgniteError::IGNITE_ERR_UNKNOWN, "IgniteBinding is not initialized.");
+
+ InteropInputStream inStream(mem.Get());
+ BinaryReaderImpl reader(&inStream);
+
+ InteropOutputStream outStream(mem.Get());
+ BinaryWriterImpl writer(&outStream, GetTypeManager());
+
+ BinaryObjectImpl binFilter = BinaryObjectImpl::FromMemory(*mem.Get(), inStream.Position());
+
+ int32_t filterId = binFilter.GetTypeId();
+
+ bool invoked = false;
+
+ int64_t res = binding.Get()->InvokeCallback(invoked,
+ IgniteBindingImpl::CACHE_ENTRY_FILTER_CREATE, filterId, reader, writer);
+
+ if (!invoked)
+ {
+ IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION,
+ "C++ remote filter is not registered on the node (did you compile your program without -rdynamic?).",
+ "filterId", filterId);
+ }
+
+ outStream.Synchronize();
+
+ return res;
+ }
+
+ int64_t IgniteEnvironment::OnContinuousQueryFilterApply(SharedPointer<InteropMemory>& mem)
+ {
+ InteropInputStream inStream(mem.Get());
+ BinaryReaderImpl reader(&inStream);
+ BinaryRawReader rawReader(&reader);
+
+ int64_t handle = rawReader.ReadInt64();
+
+ SharedPointer<ContinuousQueryImplBase> qry =
+ StaticPointerCast<ContinuousQueryImplBase>(registry.Get(handle));
+
+ if (!qry.Get())
+ IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_GENERIC, "Null query for handle.", "handle", handle);
+
+ cache::event::CacheEntryEventFilterBase* filter = qry.Get()->GetFilterHolder().GetFilter();
+
+ if (!filter)
+ IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_GENERIC, "Null filter for handle.", "handle", handle);
+
+ bool res = filter->ReadAndProcessEvent(rawReader);
+
+ return res ? 1 : 0;
+ }
+
void IgniteEnvironment::CacheInvokeCallback(SharedPointer<InteropMemory>& mem)
{
if (!binding.Get())
@@ -340,9 +420,11 @@ namespace ignite
BinaryObjectImpl binProcHolder = BinaryObjectImpl::FromMemory(*mem.Get(), inStream.Position(), 0);
BinaryObjectImpl binProc = binProcHolder.GetField(0);
- int64_t procId = binProc.GetTypeId();
+ int32_t procId = binProc.GetTypeId();
+
+ bool invoked = false;
- bool invoked = binding.Get()->InvokeCallbackById(procId, reader, writer);
+ binding.Get()->InvokeCallback(invoked, IgniteBindingImpl::CACHE_ENTRY_PROCESSOR_APPLY, procId, reader, writer);
if (!invoked)
{
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/src/impl/ignite_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/ignite_impl.cpp b/modules/platforms/cpp/core/src/impl/ignite_impl.cpp
index fd9bf45..546cd01 100644
--- a/modules/platforms/cpp/core/src/impl/ignite_impl.cpp
+++ b/modules/platforms/cpp/core/src/impl/ignite_impl.cpp
@@ -59,7 +59,7 @@ namespace ignite
return env.Get()->Context();
}
- IgniteBinding IgniteImpl::GetBinding()
+ SharedPointer<IgniteBindingImpl> IgniteImpl::GetBinding()
{
return env.Get()->GetBinding();
}
[2/2] ignite git commit: IGNITE-3575 CPP: Added support for
continuous queries remote filters.
Posted by is...@apache.org.
IGNITE-3575 CPP: Added support for continuous queries remote filters.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d4da92b7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d4da92b7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d4da92b7
Branch: refs/heads/master
Commit: d4da92b7ab2625c1e09d8ae06bf1eb9393162c8e
Parents: 79bac4f
Author: Igor Sapego <is...@gridgain.com>
Authored: Fri Mar 31 16:40:25 2017 +0300
Committer: Igor Sapego <is...@gridgain.com>
Committed: Fri Mar 31 16:40:25 2017 +0300
----------------------------------------------------------------------
.../ignite/impl/binary/binary_type_impl.h | 2 +-
.../common/include/ignite/common/concurrent.h | 30 +++
.../cpp/common/include/ignite/reference.h | 14 +-
.../cpp/core-test/config/cache-test.xml | 2 +-
.../project/vs/core-test.vcxproj.filters | 3 +
.../cpp/core-test/src/cache_invoke_test.cpp | 6 +-
.../platforms/cpp/core-test/src/cache_test.cpp | 23 ++-
.../cpp/core-test/src/continuous_query_test.cpp | 202 ++++++++++++++++++-
.../cpp/core-test/src/reference_test.cpp | 12 +-
modules/platforms/cpp/core/Makefile.am | 1 +
modules/platforms/cpp/core/include/Makefile.am | 70 ++++---
.../cpp/core/include/ignite/cache/cache.h | 22 +-
.../ignite/cache/cache_entry_processor.h | 42 +---
.../cache/event/cache_entry_event_filter.h | 109 ++++++++++
.../cache/query/continuous/continuous_query.h | 35 +++-
.../cpp/core/include/ignite/ignite_binding.h | 39 +++-
.../include/ignite/ignite_binding_context.h | 2 +-
.../cpp/core/include/ignite/impl/bindings.h | 95 +++++++++
.../impl/cache/cache_entry_processor_holder.h | 15 --
.../core/include/ignite/impl/cache/cache_impl.h | 81 +-------
.../cache/event/cache_entry_event_filter_base.h | 66 ++++++
.../event/cache_entry_event_filter_holder.h | 185 +++++++++++++++++
.../continuous/continuous_query_handle_impl.h | 10 -
.../query/continuous/continuous_query_impl.h | 60 +++++-
.../include/ignite/impl/ignite_binding_impl.h | 101 +++++-----
.../include/ignite/impl/ignite_environment.h | 37 ++--
.../cpp/core/include/ignite/impl/ignite_impl.h | 10 +-
.../cpp/core/include/ignite/impl/operations.h | 2 +-
.../platforms/cpp/core/project/vs/core.vcxproj | 5 +
.../cpp/core/project/vs/core.vcxproj.filters | 18 ++
.../cpp/core/src/impl/cache/cache_impl.cpp | 90 ++++++++-
.../continuous/continuous_query_handle_impl.cpp | 5 -
.../cpp/core/src/impl/ignite_binding_impl.cpp | 88 ++++++++
.../cpp/core/src/impl/ignite_environment.cpp | 124 ++++++++++--
.../platforms/cpp/core/src/impl/ignite_impl.cpp | 2 +-
35 files changed, 1284 insertions(+), 324 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_type_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_type_impl.h b/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_type_impl.h
index d0cbb86..08c60c0 100644
--- a/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_type_impl.h
+++ b/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_type_impl.h
@@ -103,7 +103,7 @@ namespace ignite
ignite::binary::BinaryType<T> bt;
ignite::Reference<ignite::binary::BinaryIdentityResolver> resolver = bt.GetIdentityResolver();
- return resolver.Get().GetHashCode(obj);
+ return resolver.Get()->GetHashCode(obj);
}
};
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/common/include/ignite/common/concurrent.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/common/include/ignite/common/concurrent.h b/modules/platforms/cpp/common/include/ignite/common/concurrent.h
index 84a1f0e..69b8eda 100644
--- a/modules/platforms/cpp/common/include/ignite/common/concurrent.h
+++ b/modules/platforms/cpp/common/include/ignite/common/concurrent.h
@@ -30,6 +30,11 @@ namespace ignite
namespace concurrent
{
/**
+ * Type tag for static pointer cast.
+ */
+ struct StaticTag {};
+
+ /**
* Default deleter implementation.
*
* @param obj Object to be deleted.
@@ -198,6 +203,20 @@ namespace ignite
}
/**
+ * Static-cast constructor.
+ *
+ * @param other Instance to copy.
+ */
+ template<typename T2>
+ SharedPointer(const SharedPointer<T2>& other, StaticTag) :
+ ptr(static_cast<T*>(other.ptr)),
+ impl(other.impl)
+ {
+ if (impl)
+ impl->Increment();
+ }
+
+ /**
* Assignment operator.
*
* @param other Other instance.
@@ -313,6 +332,17 @@ namespace ignite
};
/**
+ * Enables static-cast semantics for SharedPointer.
+ *
+ * @param val Value to cast.
+ */
+ template<class T1, class T2>
+ SharedPointer<T1> StaticPointerCast(const SharedPointer<T2>& val)
+ {
+ return SharedPointer<T1>(val, StaticTag());
+ }
+
+ /**
* The class provides functionality that allows objects of derived
* classes to create instances of shared_ptr pointing to themselves
* and sharing ownership with existing shared_ptr objects.
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/common/include/ignite/reference.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/common/include/ignite/reference.h b/modules/platforms/cpp/common/include/ignite/reference.h
index b026ad7..08cccec 100644
--- a/modules/platforms/cpp/common/include/ignite/reference.h
+++ b/modules/platforms/cpp/common/include/ignite/reference.h
@@ -160,9 +160,9 @@ namespace ignite
*
* @return Constant reference to underlying value.
*/
- const T& Get() const
+ const T* Get() const
{
- return *reinterpret_cast<const T*>(reinterpret_cast<ptrdiff_t>(ptr.Get()->Get()) + offset);
+ return reinterpret_cast<const T*>(reinterpret_cast<ptrdiff_t>(ptr.Get()->Get()) + offset);
}
/**
@@ -326,11 +326,11 @@ namespace ignite
* If the pointer is null then this operation causes undefined
* behaviour.
*
- * @return Constant reference to underlying value.
+ * @return Constant pointer to underlying value.
*/
- const T& Get() const
+ const T* Get() const
{
- return *reinterpret_cast<const T*>(reinterpret_cast<ptrdiff_t>(ptr.Get()->Get()) + offset);
+ return reinterpret_cast<const T*>(reinterpret_cast<ptrdiff_t>(ptr.Get()->Get()) + offset);
}
/**
@@ -341,9 +341,9 @@ namespace ignite
*
* @return Reference to underlying value.
*/
- T& Get()
+ T* Get()
{
- return *reinterpret_cast<T*>(reinterpret_cast<ptrdiff_t>(ptr.Get()->Get()) + offset);
+ return reinterpret_cast<T*>(reinterpret_cast<ptrdiff_t>(ptr.Get()->Get()) + offset);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core-test/config/cache-test.xml
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/config/cache-test.xml b/modules/platforms/cpp/core-test/config/cache-test.xml
index 0ea5876..10300ba 100644
--- a/modules/platforms/cpp/core-test/config/cache-test.xml
+++ b/modules/platforms/cpp/core-test/config/cache-test.xml
@@ -55,7 +55,7 @@
<property name="cacheMode" value="PARTITIONED"/>
<property name="atomicityMode" value="TRANSACTIONAL"/>
</bean>
-
+
<bean parent="cache-template">
<property name="name" value="partitioned2"/>
<property name="cacheMode" value="PARTITIONED"/>
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters
index fb0be1b..5181f96 100644
--- a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters
+++ b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters
@@ -142,5 +142,8 @@
<None Include="..\..\config\cache-store.xml">
<Filter>Configs</Filter>
</None>
+ <None Include="..\..\config\cache-query-continuous.xml">
+ <Filter>Configs</Filter>
+ </None>
</ItemGroup>
</Project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core-test/src/cache_invoke_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/src/cache_invoke_test.cpp b/modules/platforms/cpp/core-test/src/cache_invoke_test.cpp
index db304e2..4f1f30a 100644
--- a/modules/platforms/cpp/core-test/src/cache_invoke_test.cpp
+++ b/modules/platforms/cpp/core-test/src/cache_invoke_test.cpp
@@ -44,7 +44,7 @@ using namespace ignite::common;
/**
* CacheEntryModifier class for invoke tests.
*/
-class CacheEntryModifier : public CacheEntryProcessor<CacheEntryModifier, int, int, int, int>
+class CacheEntryModifier : public CacheEntryProcessor<int, int, int, int>
{
public:
/**
@@ -151,7 +151,7 @@ namespace ignite
/**
* Divisor class for invoke tests.
*/
-class Divisor : public CacheEntryProcessor<Divisor, int, int, double, double>
+class Divisor : public CacheEntryProcessor<int, int, double, double>
{
public:
/**
@@ -262,7 +262,7 @@ namespace ignite
/**
* Character remover class for invoke tests.
*/
-class CharRemover : public CacheEntryProcessor<CharRemover, std::string, std::string, int, bool>
+class CharRemover : public CacheEntryProcessor<std::string, std::string, int, bool>
{
public:
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core-test/src/cache_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/src/cache_test.cpp b/modules/platforms/cpp/core-test/src/cache_test.cpp
index 437ed234..d57b757 100644
--- a/modules/platforms/cpp/core-test/src/cache_test.cpp
+++ b/modules/platforms/cpp/core-test/src/cache_test.cpp
@@ -29,16 +29,6 @@
using namespace ignite;
using namespace boost::unit_test;
-/* Nodes started during the test. */
-Ignite grid0 = Ignite();
-Ignite grid1 = Ignite();
-
-/** Cache accessor. */
-cache::Cache<int, int> Cache()
-{
- return grid0.GetCache<int, int>("partitioned");
-}
-
struct Person
{
std::string name;
@@ -88,7 +78,18 @@ namespace ignite
/*
* Test setup fixture.
*/
-struct CacheTestSuiteFixture {
+struct CacheTestSuiteFixture
+{
+ /* Nodes started during the test. */
+ Ignite grid0;
+ Ignite grid1;
+
+ /** Cache accessor. */
+ cache::Cache<int, int> Cache()
+ {
+ return grid0.GetCache<int, int>("partitioned");
+ }
+
/*
* Constructor.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core-test/src/continuous_query_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/src/continuous_query_test.cpp b/modules/platforms/cpp/core-test/src/continuous_query_test.cpp
index 1be21c1..f81eb5d 100644
--- a/modules/platforms/cpp/core-test/src/continuous_query_test.cpp
+++ b/modules/platforms/cpp/core-test/src/continuous_query_test.cpp
@@ -175,6 +175,61 @@ private:
ConcurrentQueue< CacheEntryEvent<K, V> > eventQueue;
};
+/**
+ * Only lets through keys from the range.
+ */
+template<typename K, typename V>
+struct RangeFilter : CacheEntryEventFilter<K, V>
+{
+ /**
+ * Default constructor.
+ */
+ RangeFilter() :
+ rangeBegin(0),
+ rangeEnd(0)
+ {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param from Range beginning. Inclusive.
+ * @param to Range end. Not inclusive.
+ */
+ RangeFilter(const K& from, const K& to) :
+ rangeBegin(from),
+ rangeEnd(to)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~RangeFilter()
+ {
+ // No-op.
+ }
+
+ /**
+ * Event callback.
+ *
+ * @param event Event.
+ * @return True if the event passes filter.
+ */
+ virtual bool Process(const CacheEntryEvent<K, V>& event)
+ {
+ return event.GetKey() >= rangeBegin && event.GetKey() < rangeEnd;
+ }
+
+ /** Beginning of the range. */
+ K rangeBegin;
+
+ /** End of the range. */
+ K rangeEnd;
+};
+
/*
* Test entry.
*/
@@ -204,10 +259,9 @@ namespace ignite
{
namespace binary
{
- /**
- * Binary type definition.
- */
- IGNITE_BINARY_TYPE_START(TestEntry)
+ template<>
+ struct BinaryType<TestEntry>
+ {
IGNITE_BINARY_GET_TYPE_ID_AS_HASH(TestEntry)
IGNITE_BINARY_GET_TYPE_NAME_AS_IS(TestEntry)
IGNITE_BINARY_GET_FIELD_ID_AS_HASH
@@ -227,8 +281,52 @@ namespace ignite
return res;
}
+ };
- IGNITE_BINARY_TYPE_END
+ template<typename K, typename V>
+ struct BinaryType< RangeFilter<K,V> >
+ {
+ int32_t GetTypeId()
+ {
+ return GetBinaryStringHashCode("RangeFilter");
+ }
+
+ std::string GetTypeName()
+ {
+ return "RangeFilter";
+
+ }
+ IGNITE_BINARY_GET_FIELD_ID_AS_HASH
+
+ int32_t GetHashCode(const RangeFilter<K,V>&)
+ {
+ return 0;
+ }
+
+ bool IsNull(const RangeFilter<K,V>&)
+ {
+ return false;
+ }
+
+ RangeFilter<K,V> GetNull()
+ {
+ return RangeFilter<K,V>();
+ }
+
+ void Write(BinaryWriter& writer, const RangeFilter<K,V>& obj)
+ {
+ writer.WriteObject("rangeBegin", obj.rangeBegin);
+ writer.WriteObject("rangeEnd", obj.rangeEnd);
+ }
+
+ RangeFilter<K,V> Read(BinaryReader& reader)
+ {
+ K begin = reader.ReadObject<K>("rangeBegin");
+ K end = reader.ReadObject<K>("rangeEnd");
+
+ return RangeFilter<K,V>(begin, end);
+ }
+ };
}
}
@@ -237,7 +335,7 @@ namespace ignite
*/
struct ContinuousQueryTestSuiteFixture
{
- Ignite grid;
+ Ignite node;
Cache<int, TestEntry> cache;
@@ -245,8 +343,8 @@ struct ContinuousQueryTestSuiteFixture
* Constructor.
*/
ContinuousQueryTestSuiteFixture() :
- grid(ignite_test::StartNode("cache-query-continuous.xml", "node-01")),
- cache(grid.GetCache<int, TestEntry>("transactional_no_backup"))
+ node(ignite_test::StartNode("cache-query-continuous.xml", "node-01")),
+ cache(node.GetCache<int, TestEntry>("transactional_no_backup"))
{
// No-op.
}
@@ -258,7 +356,7 @@ struct ContinuousQueryTestSuiteFixture
{
Ignition::StopAll(false);
- grid = Ignite();
+ node = Ignite();
}
};
@@ -581,4 +679,90 @@ BOOST_AUTO_TEST_CASE(TestPublicPrivateConstantsConsistence)
static_cast<int>(QueryType::DEFAULT_BUFFER_SIZE));
}
+BOOST_AUTO_TEST_CASE(TestFilterSingleNode)
+{
+ node.GetBinding().RegisterCacheEntryEventFilter< RangeFilter<int, TestEntry> >();
+
+ Listener<int, TestEntry> lsnr;
+ RangeFilter<int, TestEntry> filter(100, 150);
+
+ ContinuousQuery<int, TestEntry> qry(MakeReference(lsnr), MakeReference(filter));
+
+ ContinuousQueryHandle<int, TestEntry> handle = cache.QueryContinuous(qry);
+
+ cache.Put(1, TestEntry(10));
+ cache.Put(1, TestEntry(11));
+
+ cache.Put(2, TestEntry(20));
+ cache.Remove(2);
+
+ cache.Put(100, TestEntry(1000));
+ cache.Put(101, TestEntry(1010));
+
+ cache.Put(142, TestEntry(1420));
+ cache.Put(142, TestEntry(1421));
+ cache.Remove(142);
+
+ cache.Put(149, TestEntry(1490));
+ cache.Put(150, TestEntry(1500));
+ cache.Put(150, TestEntry(1502));
+ cache.Remove(150);
+
+ lsnr.CheckNextEvent(100, boost::none, TestEntry(1000));
+ lsnr.CheckNextEvent(101, boost::none, TestEntry(1010));
+
+ lsnr.CheckNextEvent(142, boost::none, TestEntry(1420));
+ lsnr.CheckNextEvent(142, TestEntry(1420), TestEntry(1421));
+ lsnr.CheckNextEvent(142, TestEntry(1421), boost::none);
+
+ lsnr.CheckNextEvent(149, boost::none, TestEntry(1490));
+}
+
+BOOST_AUTO_TEST_CASE(TestFilterMultipleNodes)
+{
+ Ignite node2 = ignite_test::StartNode("cache-query-continuous.xml", "node-02");
+ Ignite node3 = ignite_test::StartNode("cache-query-continuous.xml", "node-03");
+
+ node.GetBinding().RegisterCacheEntryEventFilter< RangeFilter<int, TestEntry> >();
+
+ Listener<int, TestEntry> lsnr;
+ RangeFilter<int, TestEntry> filter(100, 150);
+
+ ContinuousQuery<int, TestEntry> qry(MakeReference(lsnr), MakeReference(filter));
+
+ ContinuousQueryHandle<int, TestEntry> handle = cache.QueryContinuous(qry);
+
+ Cache<int, TestEntry> cache2 = node2.GetCache<int, TestEntry>("transactional_no_backup");
+
+ cache2.Put(1, TestEntry(10));
+ cache2.Put(1, TestEntry(11));
+
+ cache2.Put(2, TestEntry(20));
+ cache2.Remove(2);
+
+ cache2.Put(100, TestEntry(1000));
+ cache2.Put(101, TestEntry(1010));
+
+ cache2.Put(142, TestEntry(1420));
+ cache2.Put(142, TestEntry(1421));
+ cache2.Remove(142);
+
+ cache2.Put(149, TestEntry(1490));
+ cache2.Put(150, TestEntry(1500));
+ cache2.Put(150, TestEntry(1502));
+ cache2.Remove(150);
+
+ for (int i = 200; i < 250; ++i)
+ cache2.Put(i, TestEntry(i * 10));
+
+ lsnr.CheckNextEvent(100, boost::none, TestEntry(1000));
+ lsnr.CheckNextEvent(101, boost::none, TestEntry(1010));
+
+ lsnr.CheckNextEvent(142, boost::none, TestEntry(1420));
+ lsnr.CheckNextEvent(142, TestEntry(1420), TestEntry(1421));
+ lsnr.CheckNextEvent(142, TestEntry(1421), boost::none);
+
+ lsnr.CheckNextEvent(149, boost::none, TestEntry(1490));
+}
+
BOOST_AUTO_TEST_SUITE_END()
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core-test/src/reference_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/src/reference_test.cpp b/modules/platforms/cpp/core-test/src/reference_test.cpp
index a5ac559..ec445c7 100644
--- a/modules/platforms/cpp/core-test/src/reference_test.cpp
+++ b/modules/platforms/cpp/core-test/src/reference_test.cpp
@@ -118,32 +118,32 @@ struct C3 : C1, C2
void TestFunction1(Reference<C1> c1, int expected)
{
- BOOST_CHECK_EQUAL(c1.Get().c1, expected);
+ BOOST_CHECK_EQUAL(c1.Get()->c1, expected);
}
void TestFunction2(Reference<C2> c2, int expected)
{
- BOOST_CHECK_EQUAL(c2.Get().c2, expected);
+ BOOST_CHECK_EQUAL(c2.Get()->c2, expected);
}
void TestFunction3(Reference<C3> c3, int expected)
{
- BOOST_CHECK_EQUAL(c3.Get().c3, expected);
+ BOOST_CHECK_EQUAL(c3.Get()->c3, expected);
}
void TestFunctionConst1(ConstReference<C1> c1, int expected)
{
- BOOST_CHECK_EQUAL(c1.Get().c1, expected);
+ BOOST_CHECK_EQUAL(c1.Get()->c1, expected);
}
void TestFunctionConst2(ConstReference<C2> c2, int expected)
{
- BOOST_CHECK_EQUAL(c2.Get().c2, expected);
+ BOOST_CHECK_EQUAL(c2.Get()->c2, expected);
}
void TestFunctionConst3(ConstReference<C3> c3, int expected)
{
- BOOST_CHECK_EQUAL(c3.Get().c3, expected);
+ BOOST_CHECK_EQUAL(c3.Get()->c3, expected);
}
BOOST_AUTO_TEST_SUITE(ReferenceTestSuite)
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/Makefile.am
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/Makefile.am b/modules/platforms/cpp/core/Makefile.am
index 46d6bc9..4de45d3 100644
--- a/modules/platforms/cpp/core/Makefile.am
+++ b/modules/platforms/cpp/core/Makefile.am
@@ -69,6 +69,7 @@ libignite_la_SOURCES = \
src/impl/transactions/transactions_impl.cpp \
src/impl/cluster/cluster_group_impl.cpp \
src/impl/ignite_impl.cpp \
+ src/impl/ignite_binding_impl.cpp \
src/transactions/transaction.cpp \
src/transactions/transactions.cpp
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/Makefile.am
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/Makefile.am b/modules/platforms/cpp/core/include/Makefile.am
index 21d3062..0e9a7ec 100644
--- a/modules/platforms/cpp/core/include/Makefile.am
+++ b/modules/platforms/cpp/core/include/Makefile.am
@@ -18,45 +18,55 @@
ACLOCAL_AMFLAGS =-I m4
nobase_include_HEADERS = \
- ignite/ignite_configuration.h \
- ignite/ignite.h \
- ignite/impl/binary/binary_type_updater_impl.h \
- ignite/impl/operations.h \
- ignite/impl/ignite_environment.h \
- ignite/impl/ignite_impl.h \
- ignite/impl/cache/query/query_fields_row_impl.h \
- ignite/impl/cache/query/query_argument.h \
- ignite/impl/cache/query/query_impl.h \
- ignite/impl/cache/cache_impl.h \
- ignite/impl/cache/cache_entry_processor_holder.h \
- ignite/impl/cache/query/query_batch.h \
- ignite/impl/interop/interop_target.h \
- ignite/impl/interop/interop_external_memory.h \
- ignite/impl/handle_registry.h \
- ignite/impl/transactions/transaction_impl.h \
- ignite/impl/transactions/transactions_impl.h \
- ignite/impl/cluster/cluster_group_impl.h \
- ignite/impl/ignite_binding_impl.h \
- ignite/impl/module_manager.h \
- ignite/cache/query/query_fields_row.h \
+ ignite/cache/cache.h \
+ ignite/cache/cache_entry.h \
+ ignite/cache/cache_entry_processor.h \
+ ignite/cache/cache_peek_mode.h \
+ ignite/cache/event/cache_entry_event.h \
+ ignite/cache/event/cache_entry_event_filter.h \
+ ignite/cache/event/cache_entry_event_listener.h \
+ ignite/cache/mutable_cache_entry.h \
+ ignite/cache/query/continuous/continuous_query.h \
+ ignite/cache/query/continuous/continuous_query_handle.h \
+ ignite/cache/query/query.h \
+ ignite/cache/query/query_cursor.h \
ignite/cache/query/query_fields_cursor.h \
+ ignite/cache/query/query_fields_row.h \
ignite/cache/query/query_scan.h \
- ignite/cache/query/query_cursor.h \
ignite/cache/query/query_sql.h \
- ignite/cache/query/query.h \
ignite/cache/query/query_sql_fields.h \
ignite/cache/query/query_text.h \
- ignite/cache/cache.h \
- ignite/cache/cache_entry.h \
- ignite/cache/cache_peek_mode.h \
- ignite/cache/cache_entry_processor.h \
- ignite/cache/mutable_cache_entry.h \
- ignite/ignition.h \
+ ignite/ignite.h \
ignite/ignite_binding.h \
ignite/ignite_binding_context.h \
+ ignite/ignite_configuration.h \
+ ignite/ignition.h \
+ ignite/impl/binary/binary_type_updater_impl.h \
+ ignite/impl/bindings.h \
+ ignite/impl/cache/cache_entry_processor_holder.h \
+ ignite/impl/cache/cache_impl.h \
+ ignite/impl/cache/event/cache_entry_event_filter_base.h \
+ ignite/impl/cache/event/cache_entry_event_filter_holder.h \
+ ignite/impl/cache/query/continuous/continuous_query_handle_impl.h \
+ ignite/impl/cache/query/continuous/continuous_query_impl.h \
+ ignite/impl/cache/query/query_argument.h \
+ ignite/impl/cache/query/query_batch.h \
+ ignite/impl/cache/query/query_fields_row_impl.h \
+ ignite/impl/cache/query/query_impl.h \
+ ignite/impl/cluster/cluster_group_impl.h \
+ ignite/impl/handle_registry.h \
+ ignite/impl/ignite_binding_impl.h \
+ ignite/impl/ignite_environment.h \
+ ignite/impl/ignite_impl.h \
+ ignite/impl/interop/interop_external_memory.h \
+ ignite/impl/interop/interop_target.h \
+ ignite/impl/module_manager.h \
+ ignite/impl/operations.h \
+ ignite/impl/transactions/transactions_impl.h \
+ ignite/impl/transactions/transaction_impl.h \
ignite/transactions/transaction.h \
- ignite/transactions/transaction_consts.h \
ignite/transactions/transactions.h \
+ ignite/transactions/transaction_consts.h \
ignite/transactions/transaction_metrics.h
uninstall-hook:
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/cache/cache.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/cache/cache.h b/modules/platforms/cpp/core/include/ignite/cache/cache.h
index f9c442c..00d1c81 100644
--- a/modules/platforms/cpp/core/include/ignite/cache/cache.h
+++ b/modules/platforms/cpp/core/include/ignite/cache/cache.h
@@ -1485,8 +1485,11 @@ namespace ignite
const query::continuous::ContinuousQuery<K, V>& qry, IgniteError& err)
{
using namespace impl::cache::query::continuous;
+ using namespace common::concurrent;
- if (!qry.impl.IsValid() || !qry.impl.Get()->HasListener())
+ const SharedPointer<ContinuousQueryImpl<K, V> >& qryImpl = qry.impl;
+
+ if (!qryImpl.IsValid() || !qryImpl.Get()->HasListener())
{
err = IgniteError(IgniteError::IGNITE_ERR_GENERIC,
"Event listener is not set for ContinuousQuery instance");
@@ -1494,11 +1497,7 @@ namespace ignite
return query::continuous::ContinuousQueryHandle<K, V>();
}
- ContinuousQueryHandleImpl* cqImpl;
- cqImpl = impl.Get()->QueryContinuous(qry.impl, err);
-
- if (cqImpl)
- cqImpl->SetQuery(qry.impl);
+ ContinuousQueryHandleImpl* cqImpl = impl.Get()->QueryContinuous(qryImpl, err);
return query::continuous::ContinuousQueryHandle<K, V>(cqImpl);
}
@@ -1538,8 +1537,11 @@ namespace ignite
const Q& initialQry, IgniteError& err)
{
using namespace impl::cache::query::continuous;
+ using namespace common::concurrent;
- if (!qry.impl.IsValid() || !qry.impl.Get()->HasListener())
+ const SharedPointer<ContinuousQueryImpl<K, V> >& qryImpl = qry.impl;
+
+ if (!qryImpl.IsValid() || !qryImpl.Get()->HasListener())
{
err = IgniteError(IgniteError::IGNITE_ERR_GENERIC,
"Event listener is not set for ContinuousQuery instance");
@@ -1547,11 +1549,7 @@ namespace ignite
return query::continuous::ContinuousQueryHandle<K, V>();
}
- ContinuousQueryHandleImpl* cqImpl;
- cqImpl = impl.Get()->QueryContinuous(qry.impl, initialQry, err);
-
- if (cqImpl)
- cqImpl->SetQuery(qry.impl);
+ ContinuousQueryHandleImpl* cqImpl = impl.Get()->QueryContinuous(qryImpl, initialQry, err);
return query::continuous::ContinuousQueryHandle<K, V>(cqImpl);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/cache/cache_entry_processor.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/cache/cache_entry_processor.h b/modules/platforms/cpp/core/include/ignite/cache/cache_entry_processor.h
index 7fa1550..e0bb694 100644
--- a/modules/platforms/cpp/core/include/ignite/cache/cache_entry_processor.h
+++ b/modules/platforms/cpp/core/include/ignite/cache/cache_entry_processor.h
@@ -42,17 +42,21 @@ namespace ignite
* All templated types should be default-constructable,
* copy-constructable and assignable.
*
- * @tparam P The processor itself which inherits from CacheEntryProcessor.
* @tparam K Key type.
* @tparam V Value type.
* @tparam R Process method return type.
* @tparam A Process method argument type.
*/
- template<typename P, typename K, typename V, typename R, typename A>
+ template<typename K, typename V, typename R, typename A>
class CacheEntryProcessor
{
friend class ignite::IgniteBinding;
+ typedef A ArgumentType;
+ typedef K KeyType;
+ typedef V ValueType;
+ typedef R ReturnType;
+
public:
/**
* Destructor.
@@ -70,40 +74,6 @@ namespace ignite
* @return Processing result.
*/
virtual R Process(MutableCacheEntry<K, V>& entry, const A& arg) = 0;
-
- private:
- /**
- * Process input streaming data to produce output streaming data.
- *
- * Deserializes cache entry and processor using provided reader, invokes
- * cache entry processor, gets result and serializes it using provided
- * writer.
- *
- * @param reader Reader.
- * @param writer Writer.
- */
- static void InternalProcess(impl::binary::BinaryReaderImpl& reader, impl::binary::BinaryWriterImpl& writer)
- {
- typedef impl::cache::CacheEntryProcessorHolder<P, A> ProcessorHolder;
-
- ProcessorHolder procHolder = reader.ReadObject<ProcessorHolder>();
-
- K key = reader.ReadObject<K>();
-
- V value;
- bool exists = reader.TryReadObject<V>(value);
-
- impl::cache::MutableCacheEntryState entryState;
-
- R res = procHolder.template Process<R, K, V>(key, value, exists, entryState);
-
- writer.WriteInt8(static_cast<int8_t>(entryState));
-
- if (entryState == impl::cache::ENTRY_STATE_VALUE_SET)
- writer.WriteTopObject(value);
-
- writer.WriteTopObject(res);
- }
};
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event_filter.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event_filter.h b/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event_filter.h
new file mode 100644
index 0000000..3a4fc74
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event_filter.h
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::cache::event::CacheEntryEventFilter class.
+ */
+
+#ifndef _IGNITE_CACHE_EVENT_CACHE_ENTRY_EVENT_FILTER
+#define _IGNITE_CACHE_EVENT_CACHE_ENTRY_EVENT_FILTER
+
+#include <ignite/cache/event/cache_entry_event.h>
+#include <ignite/impl/cache/event/cache_entry_event_filter_base.h>
+
+namespace ignite
+{
+ class IgniteBinding;
+
+ namespace impl
+ {
+ namespace cache
+ {
+ namespace event
+ {
+ template<typename T>
+ class CacheEntryEventFilterHolder;
+ }
+ }
+ }
+
+ namespace cache
+ {
+ namespace event
+ {
+ /**
+ * Cache entry event filter.
+ *
+ * All templated types should be default-constructable,
+ * copy-constructable and assignable.
+ *
+ * @tparam K Key type.
+ * @tparam V Value type.
+ */
+ template<typename K, typename V>
+ class CacheEntryEventFilter : private impl::cache::event::CacheEntryEventFilterBase
+ {
+ template<typename T>
+ friend class impl::cache::event::CacheEntryEventFilterHolder;
+
+ public:
+ /**
+ * Default constructor.
+ */
+ CacheEntryEventFilter()
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~CacheEntryEventFilter()
+ {
+ // No-op.
+ }
+
+ /**
+ * Event callback.
+ *
+ * @param event Event.
+ * @return True if the event passes filter.
+ */
+ virtual bool Process(const CacheEntryEvent<K, V>& event) = 0;
+
+ private:
+ /**
+ * Process serialized events.
+ *
+ * @param reader Reader for a serialized event.
+ * @return Filter evaluation result.
+ */
+ virtual bool ReadAndProcessEvent(binary::BinaryRawReader& reader)
+ {
+ CacheEntryEvent<K, V> event;
+
+ event.Read(reader);
+
+ return Process(event);
+ }
+ };
+ }
+ }
+}
+
+#endif //_IGNITE_CACHE_EVENT_CACHE_ENTRY_EVENT_FILTER
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query.h b/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query.h
index 82bb125..0c1146b 100644
--- a/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query.h
+++ b/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query.h
@@ -24,7 +24,9 @@
#define _IGNITE_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY
#include <ignite/impl/cache/query/continuous/continuous_query_impl.h>
+
#include <ignite/cache/event/cache_entry_event_listener.h>
+#include <ignite/cache/event/cache_entry_event_filter.h>
namespace ignite
{
@@ -83,7 +85,7 @@ namespace ignite
* continuous query execution has been started.
*/
ContinuousQuery(Reference<event::CacheEntryEventListener<K, V> > lsnr) :
- impl(new impl::cache::query::continuous::ContinuousQueryImpl<K, V>(lsnr))
+ impl(new impl::cache::query::continuous::ContinuousQueryImpl<K, V>(lsnr, false))
{
// No-op.
}
@@ -102,6 +104,37 @@ namespace ignite
}
/**
+ * Constructor.
+ *
+ * @param lsnr Event listener. Invoked on the node where
+ * continuous query execution has been started.
+ * @param remoteFilter Remote filter.
+ */
+ template<typename F>
+ ContinuousQuery(Reference<event::CacheEntryEventListener<K, V> > lsnr,
+ const Reference<F>& remoteFilter) :
+ impl(new impl::cache::query::continuous::ContinuousQueryImpl<K, V>(lsnr, false, remoteFilter))
+ {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param lsnr Event listener Invoked on the node where
+ * continuous query execution has been started.
+ * @param remoteFilter Remote filter.
+ * @param loc Whether query should be executed locally.
+ */
+ template<typename F>
+ ContinuousQuery(Reference<event::CacheEntryEventListener<K, V> > lsnr,
+ const Reference<F>& remoteFilter, bool loc) :
+ impl(new impl::cache::query::continuous::ContinuousQueryImpl<K, V>(lsnr, loc, remoteFilter))
+ {
+ // No-op.
+ }
+
+ /**
* Set local flag.
*
* @param val Value of the flag. If true, query will be
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/ignite_binding.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/ignite_binding.h b/modules/platforms/cpp/core/include/ignite/ignite_binding.h
index a8decf9..a84a1c1 100644
--- a/modules/platforms/cpp/core/include/ignite/ignite_binding.h
+++ b/modules/platforms/cpp/core/include/ignite/ignite_binding.h
@@ -22,6 +22,7 @@
#include <ignite/common/concurrent.h>
#include <ignite/impl/ignite_binding_impl.h>
+#include <ignite/impl/bindings.h>
namespace ignite
{
@@ -53,12 +54,10 @@ namespace ignite
}
/**
- * Register Type as Cache Entry Processor.
+ * Register type as Cache Entry Processor.
*
* Registred type should be a child of ignite::cache::CacheEntryProcessor
* class.
- *
- * This method should only be used on the valid instance.
*/
template<typename P>
void RegisterCacheEntryProcessor()
@@ -76,8 +75,6 @@ namespace ignite
* Registred type should be a child of ignite::cache::CacheEntryProcessor
* class.
*
- * This method should only be used on the valid instance.
- *
* @param err Error.
*/
template<typename P>
@@ -87,7 +84,11 @@ namespace ignite
impl::IgniteBindingImpl *im = impl.Get();
if (im)
- im->RegisterCallback(bt.GetTypeId(), &P::CacheEntryProcessor::InternalProcess, err);
+ {
+ im->RegisterCallback(impl::IgniteBindingImpl::CACHE_ENTRY_PROCESSOR_APPLY,
+ bt.GetTypeId(), impl::binding::ListenerApply<P, typename P::KeyType,
+ typename P::ValueType, typename P::ReturnType, typename P::ArgumentType>, err);
+ }
else
{
err = IgniteError(IgniteError::IGNITE_ERR_GENERIC,
@@ -96,6 +97,32 @@ namespace ignite
}
/**
+ * Register type as Cache Entry Event Filter.
+ *
+ * Registred type should be a child of ignite::cache::event::CacheEntryEventFilter
+ * class.
+ */
+ template<typename F>
+ void RegisterCacheEntryEventFilter()
+ {
+ binary::BinaryType<F> bt;
+ impl::IgniteBindingImpl *im = impl.Get();
+
+ int32_t typeId = bt.GetTypeId();
+
+ if (im)
+ {
+ im->RegisterCallback(impl::IgniteBindingImpl::CACHE_ENTRY_FILTER_CREATE,
+ typeId, impl::binding::FilterCreate<F>);
+ }
+ else
+ {
+ throw IgniteError(IgniteError::IGNITE_ERR_GENERIC,
+ "Instance is not usable (did you check for error?).");
+ }
+ }
+
+ /**
* Check if the instance is valid.
*
* Invalid instance can be returned if some of the previous operations
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/ignite_binding_context.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/ignite_binding_context.h b/modules/platforms/cpp/core/include/ignite/ignite_binding_context.h
index 1a6d26d..4d8a7a7 100644
--- a/modules/platforms/cpp/core/include/ignite/ignite_binding_context.h
+++ b/modules/platforms/cpp/core/include/ignite/ignite_binding_context.h
@@ -70,7 +70,7 @@ namespace ignite
* @param cfg Configuration.
* @param binding Binding.
*/
- IgniteBindingContext(const IgniteConfiguration& cfg, IgniteBinding binding) :
+ IgniteBindingContext(const IgniteConfiguration& cfg, const IgniteBinding& binding) :
cfg(cfg),
binding(binding)
{
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/bindings.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/bindings.h b/modules/platforms/cpp/core/include/ignite/impl/bindings.h
new file mode 100644
index 0000000..ce77672
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/impl/bindings.h
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_IMPL_BINDINGS
+#define _IGNITE_IMPL_BINDINGS
+
+#include <stdint.h>
+
+#include <ignite/impl/binary/binary_reader_impl.h>
+#include <ignite/impl/ignite_environment.h>
+#include <ignite/impl/cache/query/continuous/continuous_query_impl.h>
+#include <ignite/impl/cache/cache_entry_processor_holder.h>
+
+namespace ignite
+{
+ namespace impl
+ {
+ namespace binding
+ {
+ /**
+ * Binding for filter creation.
+ *
+ * @tparam F The filter which inherits from CacheEntryEventFilter.
+ *
+ * @param reader Reader.
+ * @param env Environment.
+ * @return Handle for the filter.
+ */
+ template<typename F>
+ int64_t FilterCreate(binary::BinaryReaderImpl& reader, binary::BinaryWriterImpl&, IgniteEnvironment& env)
+ {
+ using namespace common::concurrent;
+ using namespace cache::query::continuous;
+
+ F filter = reader.ReadObject<F>();
+
+ SharedPointer<ContinuousQueryImplBase> qry(new RemoteFilterHolder(MakeReferenceFromCopy(filter)));
+
+ return env.GetHandleRegistry().Allocate(qry);
+ }
+
+ /**
+ * Process input streaming data to produce output streaming data.
+ *
+ * Deserializes cache entry and processor using provided reader, invokes
+ * cache entry processor, gets result and serializes it using provided
+ * writer.
+ *
+ * @param reader Reader.
+ * @param writer Writer.
+ */
+ template<typename P, typename K, typename V, typename R, typename A>
+ int64_t ListenerApply(binary::BinaryReaderImpl& reader, binary::BinaryWriterImpl& writer, IgniteEnvironment&)
+ {
+ typedef cache::CacheEntryProcessorHolder<P, A> ProcessorHolder;
+
+ ProcessorHolder procHolder = reader.ReadObject<ProcessorHolder>();
+
+ K key = reader.ReadObject<K>();
+
+ V value;
+ bool exists = reader.TryReadObject<V>(value);
+
+ cache::MutableCacheEntryState entryState;
+
+ R res = procHolder.template Process<R, K, V>(key, value, exists, entryState);
+
+ writer.WriteInt8(static_cast<int8_t>(entryState));
+
+ if (entryState == cache::ENTRY_STATE_VALUE_SET)
+ writer.WriteTopObject(value);
+
+ writer.WriteTopObject(res);
+
+ return 0;
+ }
+ }
+ }
+}
+
+#endif //_IGNITE_IMPL_BINDINGS
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/cache/cache_entry_processor_holder.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/cache_entry_processor_holder.h b/modules/platforms/cpp/core/include/ignite/impl/cache/cache_entry_processor_holder.h
index 23b57c3..c979b4a 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/cache/cache_entry_processor_holder.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/cache/cache_entry_processor_holder.h
@@ -75,20 +75,6 @@ namespace ignite
/**
* Holder for the Cache Entry Processor and its argument. Used as a convenient way to
* transmit Cache Entry Processor between nodes.
- *
- * Both key and value types should be default-constructable,
- * copy-constructable and assignable.
- *
- * Additionally, for the processor class public methods with the
- * following signatures should be defined:
- * @code{.cpp}
- * // Should return unique ID for every class.
- * static int64_t GetJobId();
- *
- * // Main processing method. Takes cache entry and argument and
- * // returns processing result.
- * R Process(ignite::cache::MutableCacheEntry<K, V>&, const A&);
- * @endcode
*/
template<typename P, typename A>
class CacheEntryProcessorHolder
@@ -202,7 +188,6 @@ namespace ignite
typedef impl::cache::CacheEntryProcessorHolder<P, A> UnderlyingType;
IGNITE_BINARY_GET_FIELD_ID_AS_HASH
- IGNITE_BINARY_GET_HASH_CODE_ZERO(UnderlyingType)
IGNITE_BINARY_IS_NULL_FALSE(UnderlyingType)
IGNITE_BINARY_GET_NULL_DEFAULT_CTOR(UnderlyingType)
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h b/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h
index e6cfbab..4599522 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h
@@ -22,9 +22,7 @@
#include <ignite/cache/query/query_sql.h>
#include <ignite/cache/query/query_text.h>
#include <ignite/cache/query/query_sql_fields.h>
-#include <ignite/cache/query/continuous/continuous_query_handle.h>
#include <ignite/impl/cache/query/query_impl.h>
-#include <ignite/impl/cache/query/continuous/continuous_query_handle_impl.h>
#include <ignite/impl/cache/query/continuous/continuous_query_impl.h>
#include <ignite/impl/interop/interop_target.h>
@@ -35,6 +33,15 @@ namespace ignite
{
namespace cache
{
+ namespace query
+ {
+ namespace continuous
+ {
+ /* Forward declaration. */
+ class ContinuousQueryHandleImpl;
+ }
+ }
+
/**
* Cache implementation.
*/
@@ -402,30 +409,7 @@ namespace ignite
* @param err Error.
*/
template<typename T>
- query::QueryCursorImpl* QueryInternal(const T& qry, int32_t typ, IgniteError& err)
- {
- ignite::jni::java::JniErrorInfo jniErr;
-
- ignite::common::concurrent::SharedPointer<interop::InteropMemory> mem = GetEnvironment().AllocateMemory();
- interop::InteropMemory* mem0 = mem.Get();
- interop::InteropOutputStream out(mem0);
- binary::BinaryWriterImpl writer(&out, GetEnvironment().GetTypeManager());
- ignite::binary::BinaryRawWriter rawWriter(&writer);
-
- qry.Write(rawWriter);
-
- out.Synchronize();
-
- jobject qryJavaRef = GetEnvironment().Context()->CacheOutOpQueryCursor(GetTarget(),
- typ, mem.Get()->PointerLong(), &jniErr);
-
- IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
-
- if (jniErr.code == ignite::java::IGNITE_JNI_ERR_SUCCESS)
- return new query::QueryCursorImpl(GetEnvironmentPointer(), qryJavaRef);
- else
- return 0;
- }
+ query::QueryCursorImpl* QueryInternal(const T& qry, int32_t typ, IgniteError& err);
/**
* Start continuous query execution with the initial query.
@@ -438,50 +422,7 @@ namespace ignite
template<typename T>
query::continuous::ContinuousQueryHandleImpl* QueryContinuous(
const common::concurrent::SharedPointer<query::continuous::ContinuousQueryImplBase> qry,
- const T& initialQry, int32_t typ, int32_t cmd, IgniteError& err)
- {
- jni::java::JniErrorInfo jniErr;
-
- common::concurrent::SharedPointer<interop::InteropMemory> mem = GetEnvironment().AllocateMemory();
- interop::InteropMemory* mem0 = mem.Get();
- interop::InteropOutputStream out(mem0);
- binary::BinaryWriterImpl writer(&out, GetEnvironment().GetTypeManager());
- ignite::binary::BinaryRawWriter rawWriter(&writer);
-
- const query::continuous::ContinuousQueryImplBase& qry0 = *qry.Get();
-
- int64_t handle = GetEnvironment().GetHandleRegistry().Allocate(qry);
-
- rawWriter.WriteInt64(handle);
- rawWriter.WriteBool(qry0.GetLocal());
-
- // Filters are not supported for now.
- rawWriter.WriteBool(false);
- rawWriter.WriteNull();
-
- rawWriter.WriteInt32(qry0.GetBufferSize());
- rawWriter.WriteInt64(qry0.GetTimeInterval());
-
- // Autounsubscribe is a filter feature.
- rawWriter.WriteBool(false);
-
- // Writing initial query. When there is not initial query writing -1.
- rawWriter.WriteInt32(typ);
- if (typ != -1)
- initialQry.Write(rawWriter);
-
- out.Synchronize();
-
- jobject qryJavaRef = GetEnvironment().Context()->CacheOutOpContinuousQuery(GetTarget(),
- cmd, mem.Get()->PointerLong(), &jniErr);
-
- IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
-
- if (jniErr.code == java::IGNITE_JNI_ERR_SUCCESS)
- return new query::continuous::ContinuousQueryHandleImpl(GetEnvironmentPointer(), handle, qryJavaRef);
-
- return 0;
- }
+ const T& initialQry, int32_t typ, int32_t cmd, IgniteError& err);
};
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/cache/event/cache_entry_event_filter_base.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/event/cache_entry_event_filter_base.h b/modules/platforms/cpp/core/include/ignite/impl/cache/event/cache_entry_event_filter_base.h
new file mode 100644
index 0000000..a0e1cb6
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/impl/cache/event/cache_entry_event_filter_base.h
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_IMPL_CACHE_EVENT_CACHE_ENTRY_EVENT_FILTER_BASE
+#define _IGNITE_IMPL_CACHE_EVENT_CACHE_ENTRY_EVENT_FILTER_BASE
+
+#include <ignite/binary/binary_raw_reader.h>
+
+namespace ignite
+{
+ namespace impl
+ {
+ namespace cache
+ {
+ namespace event
+ {
+ /**
+ * Base for the Cache Entry Event Filter.
+ */
+ class CacheEntryEventFilterBase
+ {
+ public:
+ /**
+ * Default constructor.
+ */
+ CacheEntryEventFilterBase()
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~CacheEntryEventFilterBase()
+ {
+ // No-op.
+ }
+
+ /**
+ * Process serialized events.
+ *
+ * @param reader Reader for a serialized event.
+ * @return Filter evaluation result.
+ */
+ virtual bool ReadAndProcessEvent(ignite::binary::BinaryRawReader& reader) = 0;
+ };
+ }
+ }
+ }
+}
+
+#endif //_IGNITE_IMPL_CACHE_EVENT_CACHE_ENTRY_EVENT_FILTER_BASE
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/cache/event/cache_entry_event_filter_holder.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/event/cache_entry_event_filter_holder.h b/modules/platforms/cpp/core/include/ignite/impl/cache/event/cache_entry_event_filter_holder.h
new file mode 100644
index 0000000..4256f2b
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/impl/cache/event/cache_entry_event_filter_holder.h
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_IMPL_CACHE_EVENT_CACHE_ENTRY_EVENT_FILTER_HOLDER
+#define _IGNITE_IMPL_CACHE_EVENT_CACHE_ENTRY_EVENT_FILTER_HOLDER
+
+#include <ignite/reference.h>
+
+namespace ignite
+{
+ namespace impl
+ {
+ namespace cache
+ {
+ namespace event
+ {
+ /* Forward declaration. */
+ class CacheEntryEventFilterBase;
+
+ class CacheEntryEventFilterHolderBase
+ {
+ public:
+ /**
+ * Destructor.
+ */
+ virtual ~CacheEntryEventFilterHolderBase()
+ {
+ // No-op.
+ }
+
+ /**
+ * Write.
+ *
+ * @param writer Writer.
+ */
+ virtual void Write(binary::BinaryWriterImpl& writer) = 0;
+
+ /**
+ * Get filter pointer.
+ *
+ * @return Filter.
+ */
+ virtual CacheEntryEventFilterBase* GetFilter() = 0;
+ };
+
+ /**
+ * Holder for the Cache Entry Event Filter.
+ */
+ template<typename F>
+ class CacheEntryEventFilterHolder : public CacheEntryEventFilterHolderBase
+ {
+ public:
+ typedef F FilterType;
+
+ /**
+ * Default constructor.
+ */
+ CacheEntryEventFilterHolder() :
+ filter()
+ {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param filter Filter.
+ */
+ CacheEntryEventFilterHolder(const Reference<FilterType>& filter) :
+ filter(filter)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~CacheEntryEventFilterHolder()
+ {
+ // No-op.
+ }
+
+ /**
+ * Process input.
+ *
+ * @param writer Writer.
+ */
+ virtual void Write(binary::BinaryWriterImpl& writer)
+ {
+ if (!filter.IsNull())
+ {
+ writer.WriteBool(true);
+ writer.WriteObject<FilterType>(*filter.Get());
+ }
+ else
+ {
+ writer.WriteBool(false);
+ writer.WriteNull();
+ }
+ }
+
+ /**
+ * Get filter pointer.
+ *
+ * @return Filter.
+ */
+ virtual CacheEntryEventFilterBase* GetFilter()
+ {
+ return filter.Get();
+ }
+
+ private:
+ /** Stored filter. */
+ Reference<FilterType> filter;
+ };
+
+ template<>
+ class CacheEntryEventFilterHolder<void> : public CacheEntryEventFilterHolderBase
+ {
+ public:
+ /**
+ * Default constructor.
+ */
+ CacheEntryEventFilterHolder()
+ {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ */
+ CacheEntryEventFilterHolder(const Reference<void>&)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~CacheEntryEventFilterHolder()
+ {
+ // No-op.
+ }
+
+ /**
+ * Process input.
+ *
+ * @param writer Writer.
+ */
+ virtual void Write(binary::BinaryWriterImpl& writer)
+ {
+ writer.WriteBool(false);
+ writer.WriteNull();
+ }
+
+ /**
+ * Get filter pointer.
+ *
+ * @return Filter.
+ */
+ virtual CacheEntryEventFilterBase* GetFilter()
+ {
+ return 0;
+ }
+ };
+ }
+ }
+ }
+}
+
+#endif //_IGNITE_IMPL_CACHE_EVENT_CACHE_ENTRY_EVENT_FILTER_HOLDER
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_handle_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_handle_impl.h b/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_handle_impl.h
index 75504b1..07facff 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_handle_impl.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_handle_impl.h
@@ -66,13 +66,6 @@ namespace ignite
*/
QueryCursorImpl* GetInitialQueryCursor(IgniteError& err);
- /**
- * Set query to keep pointer to.
- *
- * @param query Query.
- */
- void SetQuery(SP_ContinuousQueryImplBase query);
-
private:
/** Environment. */
SP_IgniteEnvironment env;
@@ -83,9 +76,6 @@ namespace ignite
/** Handle to Java object. */
jobject javaRef;
- /** Shared pointer to query. Kept for query to live long enough. */
- SP_ContinuousQueryImplBase qry;
-
/** Mutex. */
common::concurrent::CriticalSection mutex;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h b/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h
index 2a24e5f..d2bf241 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h
@@ -24,11 +24,13 @@
#define _IGNITE_IMPL_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_IMPL
#include <stdint.h>
+#include <memory>
#include <ignite/reference.h>
#include <ignite/cache/event/cache_entry_event_listener.h>
#include <ignite/binary/binary_raw_reader.h>
+#include <ignite/impl/cache/event/cache_entry_event_filter_holder.h>
namespace ignite
{
@@ -80,10 +82,11 @@ namespace ignite
*
* @param loc Whether query should be executed locally.
*/
- explicit ContinuousQueryImplBase(bool loc) :
+ explicit ContinuousQueryImplBase(bool loc, event::CacheEntryEventFilterHolderBase* filterOp) :
local(loc),
bufferSize(DEFAULT_BUFFER_SIZE),
- timeInterval(DEFAULT_TIME_INTERVAL)
+ timeInterval(DEFAULT_TIME_INTERVAL),
+ filterOp(filterOp)
{
// No-op.
}
@@ -183,6 +186,16 @@ namespace ignite
}
/**
+ * Get remote filter holder.
+ *
+ * @return Filter holder.
+ */
+ event::CacheEntryEventFilterHolderBase& GetFilterHolder() const
+ {
+ return *filterOp;
+ }
+
+ /**
* Callback that reads and processes cache events.
*
* @param reader Reader to use.
@@ -221,6 +234,9 @@ namespace ignite
* sent only when buffer is full.
*/
int64_t timeInterval;
+
+ /** Cache entry event filter holder. */
+ std::auto_ptr<event::CacheEntryEventFilterHolderBase> filterOp;
};
/**
@@ -252,11 +268,13 @@ namespace ignite
/**
* Constructor.
*
- * @param lsnr Event listener. Invoked on the node where
+ * @param lsnr Event listener Invoked on the node where
* continuous query execution has been started.
+ * @param loc Whether query should be executed locally.
*/
- ContinuousQueryImpl(Reference<ignite::cache::event::CacheEntryEventListener<K, V> >& lsnr) :
- ContinuousQueryImplBase(false),
+ ContinuousQueryImpl(Reference<ignite::cache::event::CacheEntryEventListener<K, V> >& lsnr,
+ bool loc) :
+ ContinuousQueryImplBase(loc, new event::CacheEntryEventFilterHolder<void>()),
lsnr(lsnr)
{
// No-op.
@@ -269,8 +287,10 @@ namespace ignite
* continuous query execution has been started.
* @param loc Whether query should be executed locally.
*/
- ContinuousQueryImpl(Reference<ignite::cache::event::CacheEntryEventListener<K, V> >& lsnr, bool loc) :
- ContinuousQueryImplBase(loc),
+ template<typename F>
+ ContinuousQueryImpl(Reference<ignite::cache::event::CacheEntryEventListener<K, V> >& lsnr,
+ bool loc, const Reference<F>& filter) :
+ ContinuousQueryImplBase(loc, new event::CacheEntryEventFilterHolder<F>(filter)),
lsnr(lsnr)
{
// No-op.
@@ -335,13 +355,37 @@ namespace ignite
for (int32_t i = 0; i < cnt; ++i)
events[i].Read(reader);
- lsnr.Get().OnEvent(events.data(), cnt);
+ lsnr.Get()->OnEvent(events.data(), cnt);
}
private:
/** Cache entry event listener. */
Reference<ignite::cache::event::CacheEntryEventListener<K, V> > lsnr;
};
+
+ /**
+ * Used to store filter on remote nodes where no
+ * ContinuousQuery instance were really created.
+ */
+ class RemoteFilterHolder : public ContinuousQueryImplBase
+ {
+ public:
+ /**
+ * Constructor.
+ */
+ template<typename F>
+ RemoteFilterHolder(const Reference<F>& filter):
+ ContinuousQueryImplBase(false, new event::CacheEntryEventFilterHolder<F>(filter))
+ {
+ // No-op.
+ }
+
+ virtual void ReadAndProcessEvents(ignite::binary::BinaryRawReader&)
+ {
+ throw IgniteError(IgniteError::IGNITE_ERR_GENERIC,
+ "No listener is registered for the ContinuousQuery instance");
+ }
+ };
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h b/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h
index 32de2cb..7b20c50 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h
@@ -18,6 +18,7 @@
#ifndef _IGNITE_IMPL_IGNITE_BINDING_IMPL
#define _IGNITE_IMPL_IGNITE_BINDING_IMPL
+#include <stdint.h>
#include <map>
#include <ignite/common/common.h>
@@ -29,6 +30,9 @@ namespace ignite
{
namespace impl
{
+ /* Forward declaration. */
+ class IgniteEnvironment;
+
/**
* Ignite binding implementation.
*
@@ -36,16 +40,24 @@ namespace ignite
*/
class IgniteBindingImpl
{
- typedef void (Callback)(binary::BinaryReaderImpl&, binary::BinaryWriterImpl&);
+ typedef int64_t(Callback)(binary::BinaryReaderImpl&, binary::BinaryWriterImpl&, IgniteEnvironment&);
public:
+ enum CallbackType
+ {
+ CACHE_ENTRY_PROCESSOR_APPLY = 1,
+
+ CACHE_ENTRY_FILTER_CREATE = 2,
+
+ CACHE_ENTRY_FILTER_APPLY = 3,
+ };
+
/**
- * Default constructor.
+ * Constructor.
+ *
+ * @param env Environment.
*/
- IgniteBindingImpl() : callbacks()
- {
- // No-op.
- }
+ IgniteBindingImpl(IgniteEnvironment &env);
/**
* Invoke callback using provided ID.
@@ -53,31 +65,15 @@ namespace ignite
* Deserializes data and callback itself, invokes callback and
* serializes processing result using providede reader and writer.
*
- * @param id Processor ID.
+ * @param type Callback Type.
+ * @param id Callback ID.
* @param reader Reader.
* @param writer Writer.
- * @return True if callback is registered and false otherwise.
+ * @param found Output param. True if callback was found and false otherwise.
+ * @return Callback return value.
*/
- bool InvokeCallbackById(int64_t id, binary::BinaryReaderImpl& reader, binary::BinaryWriterImpl& writer)
- {
- common::concurrent::CsLockGuard guard(lock);
-
- std::map<int64_t, Callback*>::iterator it = callbacks.find(id);
-
- if (it != callbacks.end())
- {
- Callback* callback = it->second;
-
- // We have found callback and does not need lock here anymore.
- guard.Reset();
-
- callback(reader, writer);
-
- return true;
- }
-
- return false;
- }
+ IGNITE_IMPORT_EXPORT int64_t InvokeCallback(bool& found, int32_t type, int32_t id, binary::BinaryReaderImpl& reader,
+ binary::BinaryWriterImpl& writer);
/**
* Register cache entry processor and associate it with provided ID.
@@ -85,29 +81,42 @@ namespace ignite
* @throw IgniteError another processor is already associated with
* the given ID.
*
- * @param id Identifier for processor to be associated with.
- * @param proc Callback.
+ * @param type Callback type.
+ * @param id Callback identifier.
+ * @param callback Callback.
+ * @param err Error.
*/
- void RegisterCallback(int64_t id, Callback* proc, IgniteError& err)
- {
- common::concurrent::CsLockGuard guard(lock);
-
- bool inserted = callbacks.insert(std::make_pair(id, proc)).second;
-
- guard.Reset();
-
- if (!inserted)
- {
- std::stringstream builder;
+ IGNITE_IMPORT_EXPORT void RegisterCallback(int32_t type, int32_t id, Callback* callback, IgniteError& err);
+
+ /**
+ * Register cache entry processor and associate it with provided ID.
+ *
+ * @throw IgniteError another processor is already associated with
+ * the given ID.
+ *
+ * @param type Callback type.
+ * @param id Callback identifier.
+ * @param callback Callback.
+ */
+ IGNITE_IMPORT_EXPORT void RegisterCallback(int32_t type, int32_t id, Callback* callback);
- builder << "Trying to register multiple PRC callbacks with the same ID. [id=" << id << ']';
+ private:
+ IGNITE_NO_COPY_ASSIGNMENT(IgniteBindingImpl);
- err = IgniteError(IgniteError::IGNITE_ERR_ENTRY_PROCESSOR, builder.str().c_str());
- }
+ /**
+ * Make key out of callback's type and ID.
+ *
+ * @param type Callback Type.
+ * @param id Callback ID.
+ * @return Key for callback.
+ */
+ int64_t makeKey(int32_t type, int32_t id)
+ {
+ return (static_cast<int64_t>(type) << 32) | id;
}
- private:
- IGNITE_NO_COPY_ASSIGNMENT(IgniteBindingImpl);
+ /** Ignite environment. */
+ IgniteEnvironment& env;
/** Registered callbacks. */
std::map<int64_t, Callback*> callbacks;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h b/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
index 5fc9a27..e3cb859 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
@@ -21,19 +21,20 @@
#include <ignite/common/concurrent.h>
#include <ignite/jni/java.h>
#include <ignite/jni/utils.h>
-#include <ignite/ignite_binding_context.h>
#include <ignite/ignite_configuration.h>
-#include "ignite/impl/interop/interop_memory.h"
-#include "ignite/impl/binary/binary_type_manager.h"
-#include "ignite/impl/handle_registry.h"
-#include "ignite/impl/module_manager.h"
-#include "ignite/impl/ignite_binding_impl.h"
+#include <ignite/impl/interop/interop_memory.h>
+#include <ignite/impl/binary/binary_type_manager.h>
+#include <ignite/impl/handle_registry.h>
namespace ignite
{
namespace impl
{
+ /* Forward declarations. */
+ class IgniteBindingImpl;
+ class ModuleManager;
+
/**
* Defines environment in which Ignite operates.
*/
@@ -110,6 +111,21 @@ namespace ignite
void OnContinuousQueryListenerApply(common::concurrent::SharedPointer<interop::InteropMemory>& mem);
/**
+ * Continuous query filter create callback.
+ *
+ * @param mem Memory with data.
+ * @return Filter handle.
+ */
+ int64_t OnContinuousQueryFilterCreate(common::concurrent::SharedPointer<interop::InteropMemory>& mem);
+
+ /**
+ * Continuous query filter apply callback.
+ *
+ * @param mem Memory with data.
+ */
+ int64_t OnContinuousQueryFilterApply(common::concurrent::SharedPointer<interop::InteropMemory>& mem);
+
+ /**
* Cache Invoke callback.
*
* @param mem Input-output memory.
@@ -191,14 +207,7 @@ namespace ignite
*
* @return IgniteBinding instance.
*/
- IgniteBinding GetBinding() const;
-
- /**
- * Get binding context.
- *
- * @return Binding context.
- */
- IgniteBindingContext GetBindingContext() const;
+ common::concurrent::SharedPointer<IgniteBindingImpl> GetBinding() const;
private:
/** Node configuration. */
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h b/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h
index 24fc989..5b1f527 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h
@@ -22,10 +22,10 @@
#include <ignite/jni/java.h>
#include <ignite/common/utils.h>
-#include "ignite/impl/cache/cache_impl.h"
-#include "ignite/impl/transactions/transactions_impl.h"
-#include "ignite/impl/cluster/cluster_group_impl.h"
-#include "ignite/impl/ignite_environment.h"
+#include <ignite/impl/cache/cache_impl.h>
+#include <ignite/impl/transactions/transactions_impl.h>
+#include <ignite/impl/cluster/cluster_group_impl.h>
+#include <ignite/impl/ignite_environment.h>
namespace ignite
{
@@ -154,7 +154,7 @@ namespace ignite
*
* @return IgniteBinding class instance.
*/
- IgniteBinding GetBinding();
+ common::concurrent::SharedPointer<IgniteBindingImpl> GetBinding();
/**
* Get instance of the implementation from the proxy class.
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/operations.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/operations.h b/modules/platforms/cpp/core/include/ignite/impl/operations.h
index dfaa4e8..fff8a86 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/operations.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/operations.h
@@ -79,7 +79,7 @@ namespace ignite
}
private:
/** Value. */
- const T val;
+ const T& val;
IGNITE_NO_COPY_ASSIGNMENT(In1Operation)
};
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/project/vs/core.vcxproj
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj b/modules/platforms/cpp/core/project/vs/core.vcxproj
index b490887..b5a95bd 100644
--- a/modules/platforms/cpp/core/project/vs/core.vcxproj
+++ b/modules/platforms/cpp/core/project/vs/core.vcxproj
@@ -195,6 +195,7 @@
<ClInclude Include="..\..\include\ignite\cache\cache_entry_processor.h" />
<ClInclude Include="..\..\include\ignite\cache\cache_peek_mode.h" />
<ClInclude Include="..\..\include\ignite\cache\event\cache_entry_event.h" />
+ <ClInclude Include="..\..\include\ignite\cache\event\cache_entry_event_filter.h" />
<ClInclude Include="..\..\include\ignite\cache\event\cache_entry_event_listener.h" />
<ClInclude Include="..\..\include\ignite\cache\query\continuous\continuous_query.h" />
<ClInclude Include="..\..\include\ignite\cache\query\continuous\continuous_query_handle.h" />
@@ -212,9 +213,12 @@
<ClInclude Include="..\..\include\ignite\ignite_configuration.h" />
<ClInclude Include="..\..\include\ignite\ignition.h" />
<ClInclude Include="..\..\include\ignite\impl\binary\binary_type_updater_impl.h" />
+ <ClInclude Include="..\..\include\ignite\impl\bindings.h" />
<ClInclude Include="..\..\include\ignite\impl\cache\cache_entry_processor_holder.h" />
<ClInclude Include="..\..\include\ignite\impl\cache\cache_impl.h" />
<ClInclude Include="..\..\include\ignite\impl\cache\query\query_argument.h" />
+ <ClInclude Include="..\..\include\ignite\impl\cache\event\cache_entry_event_filter_base.h" />
+ <ClInclude Include="..\..\include\ignite\impl\cache\event\cache_entry_event_filter_holder.h" />
<ClInclude Include="..\..\include\ignite\impl\cache\query\query_batch.h" />
<ClInclude Include="..\..\include\ignite\impl\cache\query\continuous\continuous_query_handle_impl.h" />
<ClInclude Include="..\..\include\ignite\impl\cache\query\continuous\continuous_query_impl.h" />
@@ -246,6 +250,7 @@
<ClCompile Include="..\..\src\impl\cache\query\continuous\continuous_query_handle_impl.cpp" />
<ClCompile Include="..\..\src\impl\cache\query\query_impl.cpp" />
<ClCompile Include="..\..\src\impl\cluster\cluster_group_impl.cpp" />
+ <ClCompile Include="..\..\src\impl\ignite_binding_impl.cpp" />
<ClCompile Include="..\..\src\impl\ignite_environment.cpp" />
<ClCompile Include="..\..\src\impl\ignite_impl.cpp" />
<ClCompile Include="..\..\src\impl\handle_registry.cpp" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
index b75b3b2..3b17d53 100644
--- a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
+++ b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
@@ -52,6 +52,9 @@
<ClCompile Include="..\..\src\impl\cluster\cluster_group_impl.cpp">
<Filter>Code\impl\cluster</Filter>
</ClCompile>
+ <ClCompile Include="..\..\src\impl\ignite_binding_impl.cpp">
+ <Filter>Code\impl</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\include\ignite\impl\cache\cache_impl.h">
@@ -192,6 +195,18 @@
<ClInclude Include="..\..\include\ignite\impl\cache\query\query_argument.h">
<Filter>Code\impl\cache\query</Filter>
</ClInclude>
+ <ClInclude Include="..\..\include\ignite\cache\event\cache_entry_event_filter.h">
+ <Filter>Code\cache\event</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\include\ignite\impl\cache\event\cache_entry_event_filter_base.h">
+ <Filter>Code\impl\cache\event</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\include\ignite\impl\cache\event\cache_entry_event_filter_holder.h">
+ <Filter>Code\impl\cache\event</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\include\ignite\impl\bindings.h">
+ <Filter>Code\impl</Filter>
+ </ClInclude>
</ItemGroup>
<ItemGroup>
<Filter Include="Code">
@@ -236,5 +251,8 @@
<Filter Include="Code\impl\cluster">
<UniqueIdentifier>{f5b54635-91a1-447e-923a-1b4608d7e5bc}</UniqueIdentifier>
</Filter>
+ <Filter Include="Code\impl\cache\event">
+ <UniqueIdentifier>{9c5e9732-755a-4553-8926-b4cf3b6abaf3}</UniqueIdentifier>
+ </Filter>
</ItemGroup>
</Project>
\ No newline at end of file