You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/06 13:31:23 UTC
[16/50] [abbrv] ignite git commit: IGNITE-3575 CPP: Added support for
continuous queries remote filters.
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp b/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
index dfef8e4..858ee77 100644
--- a/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
+++ b/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
@@ -17,8 +17,11 @@
#include <ignite/common/utils.h>
-#include "ignite/impl/cache/cache_impl.h"
-#include "ignite/impl/binary/binary_type_updater_impl.h"
+#include <ignite/impl/cache/cache_impl.h>
+#include <ignite/impl/binary/binary_type_updater_impl.h>
+#include <ignite/impl/cache/query/continuous/continuous_query_handle_impl.h>
+
+#include <ignite/cache/query/continuous/continuous_query_handle.h>
using namespace ignite::common::concurrent;
using namespace ignite::jni::java;
@@ -381,14 +384,93 @@ namespace ignite
IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
}
- struct DummyQry { void Write(BinaryRawWriter&) const { }};
+ struct Dummy
+ {
+ void Write(BinaryRawWriter&) const
+ {
+ // No-op.
+ }
+ };
ContinuousQueryHandleImpl* CacheImpl::QueryContinuous(const SharedPointer<ContinuousQueryImplBase> qry,
IgniteError& err)
{
- DummyQry dummy;
+ Dummy dummy;
return QueryContinuous(qry, dummy, -1, OP_QRY_CONTINUOUS, err);
}
+
+ template <typename T>
+ QueryCursorImpl* CacheImpl::QueryInternal(const T& qry, int32_t typ, IgniteError& err)
+ {
+ JniErrorInfo jniErr;
+
+ SharedPointer<InteropMemory> mem = GetEnvironment().AllocateMemory();
+ InteropMemory* mem0 = mem.Get();
+ InteropOutputStream out(mem0);
+ BinaryWriterImpl writer(&out, GetEnvironment().GetTypeManager());
+ BinaryRawWriter rawWriter(&writer);
+
+ qry.Write(rawWriter);
+
+ out.Synchronize();
+
+ jobject qryJavaRef = GetEnvironment().Context()->CacheOutOpQueryCursor(GetTarget(),
+ typ, mem.Get()->PointerLong(), &jniErr);
+
+ IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
+
+ if (jniErr.code == IGNITE_JNI_ERR_SUCCESS)
+ return new QueryCursorImpl(GetEnvironmentPointer(), qryJavaRef);
+
+ return 0;
+ }
+
+ template <typename T>
+ ContinuousQueryHandleImpl* CacheImpl::QueryContinuous(const SharedPointer<ContinuousQueryImplBase> qry,
+ const T& initialQry, int32_t typ, int32_t cmd, IgniteError& err)
+ {
+ JniErrorInfo jniErr;
+
+ SharedPointer<InteropMemory> mem = GetEnvironment().AllocateMemory();
+ InteropMemory* mem0 = mem.Get();
+ InteropOutputStream out(mem0);
+ BinaryWriterImpl writer(&out, GetEnvironment().GetTypeManager());
+ BinaryRawWriter rawWriter(&writer);
+
+ const ContinuousQueryImplBase& qry0 = *qry.Get();
+
+ int64_t handle = GetEnvironment().GetHandleRegistry().Allocate(qry);
+
+ rawWriter.WriteInt64(handle);
+ rawWriter.WriteBool(qry0.GetLocal());
+
+ event::CacheEntryEventFilterHolderBase& filterOp = qry0.GetFilterHolder();
+
+ filterOp.Write(writer);
+
+ rawWriter.WriteInt32(qry0.GetBufferSize());
+ rawWriter.WriteInt64(qry0.GetTimeInterval());
+
+ // Autounsubscribe is a filter feature.
+ rawWriter.WriteBool(false);
+
+ // Writing initial query. When there is not initial query writing -1.
+ rawWriter.WriteInt32(typ);
+ if (typ != -1)
+ initialQry.Write(rawWriter);
+
+ out.Synchronize();
+
+ jobject qryJavaRef = GetEnvironment().Context()->CacheOutOpContinuousQuery(GetTarget(),
+ cmd, mem.Get()->PointerLong(), &jniErr);
+
+ IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
+
+ if (jniErr.code == IGNITE_JNI_ERR_SUCCESS)
+ return new ContinuousQueryHandleImpl(GetEnvironmentPointer(), handle, qryJavaRef);
+
+ return 0;
+ }
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp b/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp
index b2fa1fd..b15183b 100644
--- a/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp
+++ b/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp
@@ -84,11 +84,6 @@ namespace ignite
return new QueryCursorImpl(env, res);
}
-
- void ContinuousQueryHandleImpl::SetQuery(SP_ContinuousQueryImplBase query)
- {
- qry = query;
- }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/src/impl/ignite_binding_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/ignite_binding_impl.cpp b/modules/platforms/cpp/core/src/impl/ignite_binding_impl.cpp
new file mode 100644
index 0000000..2e09de2
--- /dev/null
+++ b/modules/platforms/cpp/core/src/impl/ignite_binding_impl.cpp
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <ignite/impl/ignite_environment.h>
+#include <ignite/impl/ignite_binding_impl.h>
+
+using namespace ignite::common::concurrent;
+
+namespace ignite
+{
+ namespace impl
+ {
+ IgniteBindingImpl::IgniteBindingImpl(IgniteEnvironment &env) :
+ env(env),
+ callbacks()
+ {
+ // No-op.
+ }
+
+ int64_t IgniteBindingImpl::InvokeCallback(bool& found, int32_t type, int32_t id,
+ binary::BinaryReaderImpl& reader, binary::BinaryWriterImpl& writer)
+ {
+ int64_t key = makeKey(type, id);
+
+ CsLockGuard guard(lock);
+
+ std::map<int64_t, Callback*>::iterator it = callbacks.find(key);
+
+ found = it != callbacks.end();
+
+ if (found)
+ {
+ Callback* callback = it->second;
+
+ // We have found callback and does not need lock here anymore.
+ guard.Reset();
+
+ return callback(reader, writer, env);
+ }
+
+ return 0;
+ }
+
+ void IgniteBindingImpl::RegisterCallback(int32_t type, int32_t id, Callback* proc, IgniteError& err)
+ {
+ int64_t key = makeKey(type, id);
+
+ CsLockGuard guard(lock);
+
+ bool inserted = callbacks.insert(std::make_pair(key, proc)).second;
+
+ guard.Reset();
+
+ if (!inserted)
+ {
+ std::stringstream builder;
+
+ builder << "Trying to register multiple PRC callbacks with the same ID. [type="
+ << type << ", id=" << id << ']';
+
+ err = IgniteError(IgniteError::IGNITE_ERR_ENTRY_PROCESSOR, builder.str().c_str());
+ }
+ }
+
+ void IgniteBindingImpl::RegisterCallback(int32_t type, int32_t id, Callback* callback)
+ {
+ IgniteError err;
+
+ RegisterCallback(type, id, callback, err);
+
+ IgniteError::ThrowIfNeeded(err);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/ignite_environment.cpp b/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
index b37fa8f..4e2a1f2 100644
--- a/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
+++ b/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
@@ -15,14 +15,18 @@
* limitations under the License.
*/
-#include "ignite/impl/interop/interop_external_memory.h"
-#include "ignite/impl/binary/binary_reader_impl.h"
-#include "ignite/impl/ignite_environment.h"
-#include "ignite/cache/query/continuous/continuous_query.h"
-#include "ignite/binary/binary.h"
-#include "ignite/impl/binary/binary_type_updater_impl.h"
-#include "ignite/impl/module_manager.h"
-#include "ignite/ignite_binding.h"
+#include <ignite/impl/interop/interop_external_memory.h>
+#include <ignite/impl/binary/binary_reader_impl.h>
+#include <ignite/impl/binary/binary_type_updater_impl.h>
+#include <ignite/impl/module_manager.h>
+#include <ignite/impl/ignite_binding_impl.h>
+
+#include <ignite/binary/binary.h>
+#include <ignite/cache/query/continuous/continuous_query.h>
+#include <ignite/ignite_binding.h>
+#include <ignite/ignite_binding_context.h>
+
+#include <ignite/impl/ignite_environment.h>
using namespace ignite::common::concurrent;
using namespace ignite::jni::java;
@@ -42,6 +46,8 @@ namespace ignite
{
CACHE_INVOKE = 8,
CONTINUOUS_QUERY_LISTENER_APPLY = 18,
+ CONTINUOUS_QUERY_FILTER_CREATE = 19,
+ CONTINUOUS_QUERY_FILTER_APPLY = 20,
CONTINUOUS_QUERY_FILTER_RELEASE = 21,
REALLOC = 36,
ON_START = 49,
@@ -57,6 +63,7 @@ namespace ignite
*/
long long IGNITE_CALL InLongOutLong(void* target, int type, long long val)
{
+ int64_t res = 0;
SharedPointer<IgniteEnvironment>* env = static_cast<SharedPointer<IgniteEnvironment>*>(target);
switch (type)
@@ -77,6 +84,24 @@ namespace ignite
break;
}
+ case CONTINUOUS_QUERY_FILTER_CREATE:
+ {
+ SharedPointer<InteropMemory> mem = env->Get()->GetMemory(val);
+
+ res = env->Get()->OnContinuousQueryFilterCreate(mem);
+
+ break;
+ }
+
+ case CONTINUOUS_QUERY_FILTER_APPLY:
+ {
+ SharedPointer<InteropMemory> mem = env->Get()->GetMemory(val);
+
+ res = env->Get()->OnContinuousQueryFilterApply(mem);
+
+ break;
+ }
+
case CONTINUOUS_QUERY_FILTER_RELEASE:
{
// No-op.
@@ -98,7 +123,7 @@ namespace ignite
}
}
- return 0;
+ return res;
}
/**
@@ -152,10 +177,14 @@ namespace ignite
registry(DEFAULT_FAST_PATH_CONTAINERS_CAP, DEFAULT_SLOW_PATH_CONTAINERS_CAP),
metaMgr(new BinaryTypeManager()),
metaUpdater(0),
- binding(new IgniteBindingImpl()),
- moduleMgr(new ModuleManager(GetBindingContext()))
+ binding(),
+ moduleMgr()
{
- // No-op.
+ binding = SharedPointer<IgniteBindingImpl>(new IgniteBindingImpl(*this));
+
+ IgniteBindingContext bindingContext(cfg, GetBinding());
+
+ moduleMgr = SharedPointer<ModuleManager>(new ModuleManager(bindingContext));
}
IgniteEnvironment::~IgniteEnvironment()
@@ -263,14 +292,9 @@ namespace ignite
return metaUpdater;
}
- IgniteBinding IgniteEnvironment::GetBinding() const
- {
- return IgniteBinding(binding);
- }
-
- IgniteBindingContext IgniteEnvironment::GetBindingContext() const
+ SharedPointer<IgniteBindingImpl> IgniteEnvironment::GetBinding() const
{
- return IgniteBindingContext(*cfg, GetBinding());
+ return binding;
}
void IgniteEnvironment::ProcessorReleaseStart()
@@ -321,6 +345,62 @@ namespace ignite
}
}
+ int64_t IgniteEnvironment::OnContinuousQueryFilterCreate(SharedPointer<InteropMemory>& mem)
+ {
+ if (!binding.Get())
+ throw IgniteError(IgniteError::IGNITE_ERR_UNKNOWN, "IgniteBinding is not initialized.");
+
+ InteropInputStream inStream(mem.Get());
+ BinaryReaderImpl reader(&inStream);
+
+ InteropOutputStream outStream(mem.Get());
+ BinaryWriterImpl writer(&outStream, GetTypeManager());
+
+ BinaryObjectImpl binFilter = BinaryObjectImpl::FromMemory(*mem.Get(), inStream.Position());
+
+ int32_t filterId = binFilter.GetTypeId();
+
+ bool invoked = false;
+
+ int64_t res = binding.Get()->InvokeCallback(invoked,
+ IgniteBindingImpl::CACHE_ENTRY_FILTER_CREATE, filterId, reader, writer);
+
+ if (!invoked)
+ {
+ IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION,
+ "C++ remote filter is not registered on the node (did you compile your program without -rdynamic?).",
+ "filterId", filterId);
+ }
+
+ outStream.Synchronize();
+
+ return res;
+ }
+
+ int64_t IgniteEnvironment::OnContinuousQueryFilterApply(SharedPointer<InteropMemory>& mem)
+ {
+ InteropInputStream inStream(mem.Get());
+ BinaryReaderImpl reader(&inStream);
+ BinaryRawReader rawReader(&reader);
+
+ int64_t handle = rawReader.ReadInt64();
+
+ SharedPointer<ContinuousQueryImplBase> qry =
+ StaticPointerCast<ContinuousQueryImplBase>(registry.Get(handle));
+
+ if (!qry.Get())
+ IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_GENERIC, "Null query for handle.", "handle", handle);
+
+ cache::event::CacheEntryEventFilterBase* filter = qry.Get()->GetFilterHolder().GetFilter();
+
+ if (!filter)
+ IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_GENERIC, "Null filter for handle.", "handle", handle);
+
+ bool res = filter->ReadAndProcessEvent(rawReader);
+
+ return res ? 1 : 0;
+ }
+
void IgniteEnvironment::CacheInvokeCallback(SharedPointer<InteropMemory>& mem)
{
if (!binding.Get())
@@ -340,9 +420,11 @@ namespace ignite
BinaryObjectImpl binProcHolder = BinaryObjectImpl::FromMemory(*mem.Get(), inStream.Position(), 0);
BinaryObjectImpl binProc = binProcHolder.GetField(0);
- int64_t procId = binProc.GetTypeId();
+ int32_t procId = binProc.GetTypeId();
+
+ bool invoked = false;
- bool invoked = binding.Get()->InvokeCallbackById(procId, reader, writer);
+ binding.Get()->InvokeCallback(invoked, IgniteBindingImpl::CACHE_ENTRY_PROCESSOR_APPLY, procId, reader, writer);
if (!invoked)
{
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/src/impl/ignite_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/ignite_impl.cpp b/modules/platforms/cpp/core/src/impl/ignite_impl.cpp
index fd9bf45..546cd01 100644
--- a/modules/platforms/cpp/core/src/impl/ignite_impl.cpp
+++ b/modules/platforms/cpp/core/src/impl/ignite_impl.cpp
@@ -59,7 +59,7 @@ namespace ignite
return env.Get()->Context();
}
- IgniteBinding IgniteImpl::GetBinding()
+ SharedPointer<IgniteBindingImpl> IgniteImpl::GetBinding()
{
return env.Get()->GetBinding();
}