You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/06 13:31:23 UTC

[16/50] [abbrv] ignite git commit: IGNITE-3575 CPP: Added support for continuous queries remote filters.

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp b/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
index dfef8e4..858ee77 100644
--- a/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
+++ b/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
@@ -17,8 +17,11 @@
 
 #include <ignite/common/utils.h>
 
-#include "ignite/impl/cache/cache_impl.h"
-#include "ignite/impl/binary/binary_type_updater_impl.h"
+#include <ignite/impl/cache/cache_impl.h>
+#include <ignite/impl/binary/binary_type_updater_impl.h>
+#include <ignite/impl/cache/query/continuous/continuous_query_handle_impl.h>
+
+#include <ignite/cache/query/continuous/continuous_query_handle.h>
 
 using namespace ignite::common::concurrent;
 using namespace ignite::jni::java;
@@ -381,14 +384,93 @@ namespace ignite
                 IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
             }
 
-            struct DummyQry { void Write(BinaryRawWriter&) const { }};
+            struct Dummy
+            {
+                void Write(BinaryRawWriter&) const
+                {
+                    // No-op.
+                }
+            };
 
             ContinuousQueryHandleImpl* CacheImpl::QueryContinuous(const SharedPointer<ContinuousQueryImplBase> qry,
                 IgniteError& err)
             {
-                DummyQry dummy;
+                Dummy dummy;
                 return QueryContinuous(qry, dummy, -1, OP_QRY_CONTINUOUS, err);
             }
+
+            template <typename T>
+            QueryCursorImpl* CacheImpl::QueryInternal(const T& qry, int32_t typ, IgniteError& err)
+            {
+                JniErrorInfo jniErr;
+
+                SharedPointer<InteropMemory> mem = GetEnvironment().AllocateMemory();
+                InteropMemory* mem0 = mem.Get();
+                InteropOutputStream out(mem0);
+                BinaryWriterImpl writer(&out, GetEnvironment().GetTypeManager());
+                BinaryRawWriter rawWriter(&writer);
+
+                qry.Write(rawWriter);
+
+                out.Synchronize();
+
+                jobject qryJavaRef = GetEnvironment().Context()->CacheOutOpQueryCursor(GetTarget(),
+                    typ, mem.Get()->PointerLong(), &jniErr);
+
+                IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
+
+                if (jniErr.code == IGNITE_JNI_ERR_SUCCESS)
+                    return new QueryCursorImpl(GetEnvironmentPointer(), qryJavaRef);
+
+                return 0;
+            }
+
+            template <typename T>
+            ContinuousQueryHandleImpl* CacheImpl::QueryContinuous(const SharedPointer<ContinuousQueryImplBase> qry,
+                const T& initialQry, int32_t typ, int32_t cmd, IgniteError& err)
+            {
+                JniErrorInfo jniErr;
+
+                SharedPointer<InteropMemory> mem = GetEnvironment().AllocateMemory();
+                InteropMemory* mem0 = mem.Get();
+                InteropOutputStream out(mem0);
+                BinaryWriterImpl writer(&out, GetEnvironment().GetTypeManager());
+                BinaryRawWriter rawWriter(&writer);
+
+                const ContinuousQueryImplBase& qry0 = *qry.Get();
+
+                int64_t handle = GetEnvironment().GetHandleRegistry().Allocate(qry);
+
+                rawWriter.WriteInt64(handle);
+                rawWriter.WriteBool(qry0.GetLocal());
+
+                event::CacheEntryEventFilterHolderBase& filterOp = qry0.GetFilterHolder();
+
+                filterOp.Write(writer);
+
+                rawWriter.WriteInt32(qry0.GetBufferSize());
+                rawWriter.WriteInt64(qry0.GetTimeInterval());
+
+                // Autounsubscribe is a filter feature.
+                rawWriter.WriteBool(false);
+
+                // Writing initial query. When there is not initial query writing -1.
+                rawWriter.WriteInt32(typ);
+                if (typ != -1)
+                    initialQry.Write(rawWriter);
+
+                out.Synchronize();
+
+                jobject qryJavaRef = GetEnvironment().Context()->CacheOutOpContinuousQuery(GetTarget(),
+                    cmd, mem.Get()->PointerLong(), &jniErr);
+
+                IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
+
+                if (jniErr.code == IGNITE_JNI_ERR_SUCCESS)
+                    return new ContinuousQueryHandleImpl(GetEnvironmentPointer(), handle, qryJavaRef);
+
+                return 0;
+            }
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp b/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp
index b2fa1fd..b15183b 100644
--- a/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp
+++ b/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp
@@ -84,11 +84,6 @@ namespace ignite
 
                         return new QueryCursorImpl(env, res);
                     }
-
-                    void ContinuousQueryHandleImpl::SetQuery(SP_ContinuousQueryImplBase query)
-                    {
-                        qry = query;
-                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/src/impl/ignite_binding_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/ignite_binding_impl.cpp b/modules/platforms/cpp/core/src/impl/ignite_binding_impl.cpp
new file mode 100644
index 0000000..2e09de2
--- /dev/null
+++ b/modules/platforms/cpp/core/src/impl/ignite_binding_impl.cpp
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <ignite/impl/ignite_environment.h>
+#include <ignite/impl/ignite_binding_impl.h>
+
+using namespace ignite::common::concurrent;
+
+namespace ignite
+{
+    namespace impl
+    {
+        IgniteBindingImpl::IgniteBindingImpl(IgniteEnvironment &env) :
+            env(env),
+            callbacks()
+        {
+            // No-op.
+        }
+
+        int64_t IgniteBindingImpl::InvokeCallback(bool& found, int32_t type, int32_t id,
+            binary::BinaryReaderImpl& reader, binary::BinaryWriterImpl& writer)
+        {
+            int64_t key = makeKey(type, id);
+
+            CsLockGuard guard(lock);
+
+            std::map<int64_t, Callback*>::iterator it = callbacks.find(key);
+
+            found = it != callbacks.end();
+
+            if (found)
+            {
+                Callback* callback = it->second;
+
+                // We have found callback and does not need lock here anymore.
+                guard.Reset();
+
+                return callback(reader, writer, env);
+            }
+
+            return 0;
+        }
+
+        void IgniteBindingImpl::RegisterCallback(int32_t type, int32_t id, Callback* proc, IgniteError& err)
+        {
+            int64_t key = makeKey(type, id);
+
+            CsLockGuard guard(lock);
+
+            bool inserted = callbacks.insert(std::make_pair(key, proc)).second;
+
+            guard.Reset();
+
+            if (!inserted)
+            {
+                std::stringstream builder;
+
+                builder << "Trying to register multiple PRC callbacks with the same ID. [type="
+                        << type << ", id=" << id << ']';
+
+                err = IgniteError(IgniteError::IGNITE_ERR_ENTRY_PROCESSOR, builder.str().c_str());
+            }
+        }
+
+        void IgniteBindingImpl::RegisterCallback(int32_t type, int32_t id, Callback* callback)
+        {
+            IgniteError err;
+
+            RegisterCallback(type, id, callback, err);
+
+            IgniteError::ThrowIfNeeded(err);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/ignite_environment.cpp b/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
index b37fa8f..4e2a1f2 100644
--- a/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
+++ b/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
@@ -15,14 +15,18 @@
  * limitations under the License.
  */
 
-#include "ignite/impl/interop/interop_external_memory.h"
-#include "ignite/impl/binary/binary_reader_impl.h"
-#include "ignite/impl/ignite_environment.h"
-#include "ignite/cache/query/continuous/continuous_query.h"
-#include "ignite/binary/binary.h"
-#include "ignite/impl/binary/binary_type_updater_impl.h"
-#include "ignite/impl/module_manager.h"
-#include "ignite/ignite_binding.h"
+#include <ignite/impl/interop/interop_external_memory.h>
+#include <ignite/impl/binary/binary_reader_impl.h>
+#include <ignite/impl/binary/binary_type_updater_impl.h>
+#include <ignite/impl/module_manager.h>
+#include <ignite/impl/ignite_binding_impl.h>
+
+#include <ignite/binary/binary.h>
+#include <ignite/cache/query/continuous/continuous_query.h>
+#include <ignite/ignite_binding.h>
+#include <ignite/ignite_binding_context.h>
+
+#include <ignite/impl/ignite_environment.h>
 
 using namespace ignite::common::concurrent;
 using namespace ignite::jni::java;
@@ -42,6 +46,8 @@ namespace ignite
         {
             CACHE_INVOKE = 8,
             CONTINUOUS_QUERY_LISTENER_APPLY = 18,
+            CONTINUOUS_QUERY_FILTER_CREATE = 19,
+            CONTINUOUS_QUERY_FILTER_APPLY = 20,
             CONTINUOUS_QUERY_FILTER_RELEASE = 21,
             REALLOC = 36,
             ON_START = 49,
@@ -57,6 +63,7 @@ namespace ignite
          */
         long long IGNITE_CALL InLongOutLong(void* target, int type, long long val)
         {
+            int64_t res = 0;
             SharedPointer<IgniteEnvironment>* env = static_cast<SharedPointer<IgniteEnvironment>*>(target);
 
             switch (type)
@@ -77,6 +84,24 @@ namespace ignite
                     break;
                 }
 
+                case CONTINUOUS_QUERY_FILTER_CREATE:
+                {
+                    SharedPointer<InteropMemory> mem = env->Get()->GetMemory(val);
+
+                    res = env->Get()->OnContinuousQueryFilterCreate(mem);
+
+                    break;
+                }
+
+                case CONTINUOUS_QUERY_FILTER_APPLY:
+                {
+                    SharedPointer<InteropMemory> mem = env->Get()->GetMemory(val);
+
+                    res = env->Get()->OnContinuousQueryFilterApply(mem);
+
+                    break;
+                }
+
                 case CONTINUOUS_QUERY_FILTER_RELEASE:
                 {
                     // No-op.
@@ -98,7 +123,7 @@ namespace ignite
                 }
             }
 
-            return 0;
+            return res;
         }
 
         /**
@@ -152,10 +177,14 @@ namespace ignite
             registry(DEFAULT_FAST_PATH_CONTAINERS_CAP, DEFAULT_SLOW_PATH_CONTAINERS_CAP),
             metaMgr(new BinaryTypeManager()),
             metaUpdater(0),
-            binding(new IgniteBindingImpl()),
-            moduleMgr(new ModuleManager(GetBindingContext()))
+            binding(),
+            moduleMgr()
         {
-            // No-op.
+            binding = SharedPointer<IgniteBindingImpl>(new IgniteBindingImpl(*this));
+
+            IgniteBindingContext bindingContext(cfg, GetBinding());
+
+            moduleMgr = SharedPointer<ModuleManager>(new ModuleManager(bindingContext));
         }
 
         IgniteEnvironment::~IgniteEnvironment()
@@ -263,14 +292,9 @@ namespace ignite
             return metaUpdater;
         }
 
-        IgniteBinding IgniteEnvironment::GetBinding() const
-        {
-            return IgniteBinding(binding);
-        }
-
-        IgniteBindingContext IgniteEnvironment::GetBindingContext() const
+        SharedPointer<IgniteBindingImpl> IgniteEnvironment::GetBinding() const
         {
-            return IgniteBindingContext(*cfg, GetBinding());
+            return binding;
         }
 
         void IgniteEnvironment::ProcessorReleaseStart()
@@ -321,6 +345,62 @@ namespace ignite
             }
         }
 
+        int64_t IgniteEnvironment::OnContinuousQueryFilterCreate(SharedPointer<InteropMemory>& mem)
+        {
+            if (!binding.Get())
+                throw IgniteError(IgniteError::IGNITE_ERR_UNKNOWN, "IgniteBinding is not initialized.");
+
+            InteropInputStream inStream(mem.Get());
+            BinaryReaderImpl reader(&inStream);
+
+            InteropOutputStream outStream(mem.Get());
+            BinaryWriterImpl writer(&outStream, GetTypeManager());
+
+            BinaryObjectImpl binFilter = BinaryObjectImpl::FromMemory(*mem.Get(), inStream.Position());
+
+            int32_t filterId = binFilter.GetTypeId();
+
+            bool invoked = false;
+
+            int64_t res = binding.Get()->InvokeCallback(invoked,
+                IgniteBindingImpl::CACHE_ENTRY_FILTER_CREATE, filterId, reader, writer);
+
+            if (!invoked)
+            {
+                IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION,
+                    "C++ remote filter is not registered on the node (did you compile your program without -rdynamic?).",
+                    "filterId", filterId);
+            }
+
+            outStream.Synchronize();
+
+            return res;
+        }
+
+        int64_t IgniteEnvironment::OnContinuousQueryFilterApply(SharedPointer<InteropMemory>& mem)
+        {
+            InteropInputStream inStream(mem.Get());
+            BinaryReaderImpl reader(&inStream);
+            BinaryRawReader rawReader(&reader);
+
+            int64_t handle = rawReader.ReadInt64();
+
+            SharedPointer<ContinuousQueryImplBase> qry =
+                StaticPointerCast<ContinuousQueryImplBase>(registry.Get(handle));
+
+            if (!qry.Get())
+                IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_GENERIC, "Null query for handle.", "handle", handle);
+
+            cache::event::CacheEntryEventFilterBase* filter = qry.Get()->GetFilterHolder().GetFilter();
+
+            if (!filter)
+                IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_GENERIC, "Null filter for handle.", "handle", handle);
+
+            bool res = filter->ReadAndProcessEvent(rawReader);
+
+            return res ? 1 : 0;
+        }
+
         void IgniteEnvironment::CacheInvokeCallback(SharedPointer<InteropMemory>& mem)
         {
             if (!binding.Get())
@@ -340,9 +420,11 @@ namespace ignite
             BinaryObjectImpl binProcHolder = BinaryObjectImpl::FromMemory(*mem.Get(), inStream.Position(), 0);
             BinaryObjectImpl binProc = binProcHolder.GetField(0);
 
-            int64_t procId = binProc.GetTypeId();
+            int32_t procId = binProc.GetTypeId();
+
+            bool invoked = false;
 
-            bool invoked = binding.Get()->InvokeCallbackById(procId, reader, writer);
+            binding.Get()->InvokeCallback(invoked, IgniteBindingImpl::CACHE_ENTRY_PROCESSOR_APPLY, procId, reader, writer);
 
             if (!invoked)
             {

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/src/impl/ignite_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/ignite_impl.cpp b/modules/platforms/cpp/core/src/impl/ignite_impl.cpp
index fd9bf45..546cd01 100644
--- a/modules/platforms/cpp/core/src/impl/ignite_impl.cpp
+++ b/modules/platforms/cpp/core/src/impl/ignite_impl.cpp
@@ -59,7 +59,7 @@ namespace ignite
             return env.Get()->Context();
         }
 
-        IgniteBinding IgniteImpl::GetBinding()
+        SharedPointer<IgniteBindingImpl> IgniteImpl::GetBinding()
         {
             return env.Get()->GetBinding();
         }