You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2016/12/20 16:38:58 UTC
[1/3] ignite git commit: IGNITE-1443: Implemented ContinuousQuery for
C++
Repository: ignite
Updated Branches:
refs/heads/master 700529a6e -> 598b464f8
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/ignite_environment.cpp b/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
index 4947b94..b058f7c 100644
--- a/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
+++ b/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
@@ -18,6 +18,7 @@
#include "ignite/impl/interop/interop_external_memory.h"
#include "ignite/impl/binary/binary_reader_impl.h"
#include "ignite/impl/ignite_environment.h"
+#include "ignite/cache/query/continuous/continuous_query.h"
#include "ignite/binary/binary.h"
#include "ignite/impl/binary/binary_type_updater_impl.h"
@@ -26,6 +27,7 @@ using namespace ignite::jni::java;
using namespace ignite::impl::interop;
using namespace ignite::impl::binary;
using namespace ignite::binary;
+using namespace ignite::impl::cache::query::continuous;
namespace ignite
{
@@ -36,6 +38,8 @@ namespace ignite
*/
enum CallbackOp
{
+ CONTINUOUS_QUERY_LISTENER_APPLY = 18,
+ CONTINUOUS_QUERY_FILTER_RELEASE = 21,
REALLOC = 36,
ON_START = 49,
ON_STOP = 50
@@ -50,11 +54,36 @@ namespace ignite
*/
long long IGNITE_CALL InLongOutLong(void* target, int type, long long val)
{
- if (type == ON_STOP)
+ SharedPointer<IgniteEnvironment>* env = static_cast<SharedPointer<IgniteEnvironment>*>(target);
+
+ switch (type)
{
- SharedPointer<IgniteEnvironment>* ptr = static_cast<SharedPointer<IgniteEnvironment>*>(target);
+ case ON_STOP:
+ {
+ delete env;
+
+ break;
+ }
+
+ case CONTINUOUS_QUERY_LISTENER_APPLY:
+ {
+ SharedPointer<InteropMemory> mem = env->Get()->GetMemory(val);
+
+ env->Get()->OnContinuousQueryListenerApply(mem);
+
+ break;
+ }
+
+ case CONTINUOUS_QUERY_FILTER_RELEASE:
+ {
+ // No-op.
+ break;
+ }
- delete ptr;
+ default:
+ {
+ break;
+ }
}
return 0;
@@ -73,26 +102,43 @@ namespace ignite
long long IGNITE_CALL InLongLongLongObjectOutLong(void* target, int type, long long val1, long long val2,
long long val3, void* arg)
{
- if (type == ON_START)
- {
- SharedPointer<IgniteEnvironment>* ptr = static_cast<SharedPointer<IgniteEnvironment>*>(target);
+ SharedPointer<IgniteEnvironment>* env = static_cast<SharedPointer<IgniteEnvironment>*>(target);
- ptr->Get()->OnStartCallback(val1, reinterpret_cast<jobject>(arg));
- }
- else if (type == REALLOC)
+ switch (type)
{
- SharedPointer<IgniteEnvironment>* env = static_cast<SharedPointer<IgniteEnvironment>*>(target);
+ case ON_START:
+ {
+ env->Get()->OnStartCallback(val1, reinterpret_cast<jobject>(arg));
+
+ break;
+ }
- SharedPointer<InteropMemory> mem = env->Get()->GetMemory(val1);
+ case REALLOC:
+ {
+ SharedPointer<InteropMemory> mem = env->Get()->GetMemory(val1);
- mem.Get()->Reallocate(static_cast<int32_t>(val2));
+ mem.Get()->Reallocate(static_cast<int32_t>(val2));
+
+ break;
+ }
+
+ default:
+ {
+ break;
+ }
}
return 0;
}
- IgniteEnvironment::IgniteEnvironment() : ctx(SharedPointer<JniContext>()), latch(new SingleLatch), name(0),
- proc(), metaMgr(new BinaryTypeManager()), metaUpdater(0)
+ IgniteEnvironment::IgniteEnvironment() :
+ ctx(SharedPointer<JniContext>()),
+ latch(new SingleLatch),
+ name(0),
+ proc(),
+ metaMgr(new BinaryTypeManager()),
+ metaUpdater(0),
+ registry(DEFAULT_FAST_PATH_CONTAINERS_CAP, DEFAULT_SLOW_PATH_CONTAINERS_CAP)
{
// No-op.
}
@@ -107,7 +153,7 @@ namespace ignite
JniHandlers IgniteEnvironment::GetJniHandlers(SharedPointer<IgniteEnvironment>* target)
{
- JniHandlers hnds = JniHandlers();
+ JniHandlers hnds;
hnds.target = target;
@@ -193,6 +239,11 @@ namespace ignite
ctx.Get()->ProcessorReleaseStart(proc.Get());
}
+ HandleRegistry& IgniteEnvironment::GetHandleRegistry()
+ {
+ return registry;
+ }
+
void IgniteEnvironment::OnStartCallback(long long memPtr, jobject proc)
{
this->proc = jni::JavaGlobalRef(*ctx.Get(), proc);
@@ -212,6 +263,23 @@ namespace ignite
else
name = 0;
}
+
+ void IgniteEnvironment::OnContinuousQueryListenerApply(SharedPointer<InteropMemory>& mem)
+ {
+ InteropInputStream stream(mem.Get());
+ BinaryReaderImpl reader(&stream);
+
+ int64_t qryHandle = reader.ReadInt64();
+
+ ContinuousQueryImplBase* contQry = reinterpret_cast<ContinuousQueryImplBase*>(registry.Get(qryHandle).Get());
+
+ if (contQry)
+ {
+ BinaryRawReader rawReader(&reader);
+
+ contQry->ReadAndProcessEvents(rawReader);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/examples/Makefile.am
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/examples/Makefile.am b/modules/platforms/cpp/examples/Makefile.am
index cda5132..e1c7905 100644
--- a/modules/platforms/cpp/examples/Makefile.am
+++ b/modules/platforms/cpp/examples/Makefile.am
@@ -21,5 +21,6 @@ SUBDIRS = \
putget-example \
odbc-example \
query-example \
+ continuous-query-example \
include
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/examples/configure.ac
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/examples/configure.ac b/modules/platforms/cpp/examples/configure.ac
index 6ddd7c8..d167cf0 100644
--- a/modules/platforms/cpp/examples/configure.ac
+++ b/modules/platforms/cpp/examples/configure.ac
@@ -56,6 +56,7 @@ AC_CONFIG_FILES([ \
putget-example/Makefile \
odbc-example/Makefile \
query-example/Makefile \
+ continuous-query-example/Makefile \
])
AC_OUTPUT
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/examples/continuous-query-example/Makefile.am
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/examples/continuous-query-example/Makefile.am b/modules/platforms/cpp/examples/continuous-query-example/Makefile.am
new file mode 100644
index 0000000..2566689
--- /dev/null
+++ b/modules/platforms/cpp/examples/continuous-query-example/Makefile.am
@@ -0,0 +1,58 @@
+##
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+##
+
+ACLOCAL_AMFLAGS =-I m4
+
+noinst_PROGRAMS = ignite-continuous-query-example
+
+AM_CPPFLAGS = \
+ -I@top_srcdir@/include \
+ -I@top_srcdir@/../core/include \
+ -I@top_srcdir@/../core/os/linux/include \
+ -I@top_srcdir@/../common/include \
+ -I@top_srcdir@/../common/os/linux/include \
+ -I@top_srcdir@/../binary/include \
+ -I@top_srcdir@/../jni/include \
+ -I@top_srcdir@/../jni/os/linux/include \
+ -I$(JAVA_HOME)/include \
+ -I$(JAVA_HOME)/include/linux \
+ -DIGNITE_IMPL \
+ -D__STDC_LIMIT_MACROS \
+ -D__STDC_CONSTANT_MACROS
+
+AM_CXXFLAGS = \
+ -Wall \
+ -std=c++0x
+
+ignite_continuous_query_example_LDADD = \
+ @top_srcdir@/../core/libignite.la \
+ -lpthread
+
+ignite_continuous_query_example_LDFLAGS = \
+ -static-libtool-libs
+
+ignite_continuous_query_example_SOURCES = \
+ src/continuous_query_example.cpp
+
+run-check: check
+ ./ignite-continuous-query-example -p
+
+clean-local: clean-check
+ $(RM) *.gcno *.gcda
+
+clean-check:
+ $(RM) $(ignite_continuous_query_example_OBJECTS)
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/examples/continuous-query-example/config/continuous-query-example.xml
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/examples/continuous-query-example/config/continuous-query-example.xml b/modules/platforms/cpp/examples/continuous-query-example/config/continuous-query-example.xml
new file mode 100644
index 0000000..bdc1e92
--- /dev/null
+++ b/modules/platforms/cpp/examples/continuous-query-example/config/continuous-query-example.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+
+ <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
+ <property name="discoverySpi">
+ <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+ <property name="ipFinder">
+ <!--
+ Ignite provides several options for automatic discovery that can be used
+ instead os static IP based discovery.
+ -->
+ <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
+ <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
+ <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
+ <property name="addresses">
+ <list>
+ <!-- In distributed environment, replace with actual host IP address. -->
+ <value>127.0.0.1:47550..47551</value>
+ </list>
+ </property>
+ </bean>
+ </property>
+ </bean>
+ </property>
+ </bean>
+</beans>
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/examples/continuous-query-example/project/vs/continuous-query-example.vcxproj
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/examples/continuous-query-example/project/vs/continuous-query-example.vcxproj b/modules/platforms/cpp/examples/continuous-query-example/project/vs/continuous-query-example.vcxproj
new file mode 100644
index 0000000..dfc5f9d
--- /dev/null
+++ b/modules/platforms/cpp/examples/continuous-query-example/project/vs/continuous-query-example.vcxproj
@@ -0,0 +1,110 @@
+\ufeff<?xml version="1.0" encoding="utf-8"?>
+<Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <ItemGroup Label="ProjectConfigurations">
+ <ProjectConfiguration Include="Release|Win32">
+ <Configuration>Release</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Release|x64">
+ <Configuration>Release</Configuration>
+ <Platform>x64</Platform>
+ </ProjectConfiguration>
+ </ItemGroup>
+ <PropertyGroup Label="Globals">
+ <RootNamespace>igniteexamples</RootNamespace>
+ <ProjectGuid>{73BB124A-0CD4-4961-A6CD-61F9C71028A6}</ProjectGuid>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <UseDebugLibraries>false</UseDebugLibraries>
+ <PlatformToolset>v100</PlatformToolset>
+ <WholeProgramOptimization>true</WholeProgramOptimization>
+ <CharacterSet>Unicode</CharacterSet>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <UseDebugLibraries>false</UseDebugLibraries>
+ <PlatformToolset>v100</PlatformToolset>
+ <WholeProgramOptimization>true</WholeProgramOptimization>
+ <CharacterSet>Unicode</CharacterSet>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
+ <ImportGroup Label="ExtensionSettings">
+ </ImportGroup>
+ <ImportGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" Label="PropertySheets">
+ <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
+ </ImportGroup>
+ <ImportGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="PropertySheets">
+ <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
+ </ImportGroup>
+ <PropertyGroup Label="UserMacros" />
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
+ <LinkIncremental>false</LinkIncremental>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
+ <LinkIncremental>false</LinkIncremental>
+ </PropertyGroup>
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
+ <ClCompile>
+ <WarningLevel>Level3</WarningLevel>
+ <PrecompiledHeader>
+ </PrecompiledHeader>
+ <Optimization>MaxSpeed</Optimization>
+ <FunctionLevelLinking>true</FunctionLevelLinking>
+ <IntrinsicFunctions>true</IntrinsicFunctions>
+ <PreprocessorDefinitions>WIN32;NDEBUG;_CONSOLE;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <AdditionalIncludeDirectories>$(JAVA_HOME)\include;$(JAVA_HOME)\include\win32;..\..\..\include;..\..\..\..\jni\os\win\include;..\..\..\..\jni\include;..\..\..\..\common\os\win\include;..\..\..\..\common\include;..\..\..\..\binary\include;..\..\..\..\core\os\win\include;..\..\..\..\core\include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation>true</GenerateDebugInformation>
+ <EnableCOMDATFolding>true</EnableCOMDATFolding>
+ <OptimizeReferences>true</OptimizeReferences>
+ <AdditionalDependencies>jvm.lib;ignite.jni.lib;ignite.binary.lib;ignite.core.lib;%(AdditionalDependencies)</AdditionalDependencies>
+ <AdditionalLibraryDirectories>..\..\..\..\project\vs\$(Platform)\$(Configuration)\;$(JAVA_HOME)\lib;%(AdditionalLibraryDirectories)</AdditionalLibraryDirectories>
+ </Link>
+ <PostBuildEvent>
+ <Command>copy "$(ProjectDir)..\..\..\..\project\vs\$(Platform)\$(Configuration)\ignite.common.dll" "$(OutDir)"
+copy "$(ProjectDir)..\..\..\..\project\vs\$(Platform)\$(Configuration)\ignite.core.dll" "$(OutDir)"</Command>
+ </PostBuildEvent>
+ </ItemDefinitionGroup>
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
+ <ClCompile>
+ <WarningLevel>Level3</WarningLevel>
+ <PrecompiledHeader>
+ </PrecompiledHeader>
+ <Optimization>MaxSpeed</Optimization>
+ <FunctionLevelLinking>true</FunctionLevelLinking>
+ <IntrinsicFunctions>true</IntrinsicFunctions>
+ <PreprocessorDefinitions>WIN32;NDEBUG;_CONSOLE;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <AdditionalIncludeDirectories>$(JAVA_HOME)\include;$(JAVA_HOME)\include\win32;..\..\..\include;..\..\..\..\jni\os\win\include;..\..\..\..\jni\include;..\..\..\..\common\os\win\include;..\..\..\..\common\include;..\..\..\..\binary\include;..\..\..\..\core\os\win\include;..\..\..\..\core\include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation>true</GenerateDebugInformation>
+ <EnableCOMDATFolding>true</EnableCOMDATFolding>
+ <OptimizeReferences>true</OptimizeReferences>
+ <AdditionalDependencies>jvm.lib;ignite.jni.lib;ignite.binary.lib;ignite.core.lib;%(AdditionalDependencies)</AdditionalDependencies>
+ <AdditionalLibraryDirectories>..\..\..\..\project\vs\$(Platform)\$(Configuration)\;$(JAVA_HOME)\lib;%(AdditionalLibraryDirectories)</AdditionalLibraryDirectories>
+ </Link>
+ <PostBuildEvent>
+ <Command>copy "$(ProjectDir)..\..\..\..\project\vs\$(Platform)\$(Configuration)\ignite.common.dll" "$(OutDir)"
+copy "$(ProjectDir)..\..\..\..\project\vs\$(Platform)\$(Configuration)\ignite.core.dll" "$(OutDir)"</Command>
+ </PostBuildEvent>
+ </ItemDefinitionGroup>
+ <ItemGroup>
+ <ClCompile Include="..\..\src\continuous_query_example.cpp" />
+ </ItemGroup>
+ <ItemGroup>
+ <ClInclude Include="..\..\..\include\ignite\examples\address.h" />
+ <ClInclude Include="..\..\..\include\ignite\examples\organization.h" />
+ <ClInclude Include="..\..\..\include\ignite\examples\person.h" />
+ </ItemGroup>
+ <ItemGroup>
+ <None Include="..\..\config\continuous-query-example.xml" />
+ </ItemGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
+ <ImportGroup Label="ExtensionTargets">
+ </ImportGroup>
+</Project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/examples/continuous-query-example/project/vs/continuous-query-example.vcxproj.filters
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/examples/continuous-query-example/project/vs/continuous-query-example.vcxproj.filters b/modules/platforms/cpp/examples/continuous-query-example/project/vs/continuous-query-example.vcxproj.filters
new file mode 100644
index 0000000..cf3bca9
--- /dev/null
+++ b/modules/platforms/cpp/examples/continuous-query-example/project/vs/continuous-query-example.vcxproj.filters
@@ -0,0 +1,35 @@
+\ufeff<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <ItemGroup>
+ <ClInclude Include="..\..\..\include\ignite\examples\address.h">
+ <Filter>Header Files</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\..\include\ignite\examples\organization.h">
+ <Filter>Header Files</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\..\include\ignite\examples\person.h">
+ <Filter>Header Files</Filter>
+ </ClInclude>
+ </ItemGroup>
+ <ItemGroup>
+ <Filter Include="Source Files">
+ <UniqueIdentifier>{35cb32b7-bf2e-440f-9b32-80d392d81847}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="Header Files">
+ <UniqueIdentifier>{b355095f-b4e2-4324-9516-854828c876ff}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="Config">
+ <UniqueIdentifier>{3799efd0-3cfe-47e2-9e9e-a51b25bf40ef}</UniqueIdentifier>
+ </Filter>
+ </ItemGroup>
+ <ItemGroup>
+ <ClCompile Include="..\..\src\continuous_query_example.cpp">
+ <Filter>Source Files</Filter>
+ </ClCompile>
+ </ItemGroup>
+ <ItemGroup>
+ <None Include="..\..\config\continuous-query-example.xml">
+ <Filter>Config</Filter>
+ </None>
+ </ItemGroup>
+</Project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/examples/continuous-query-example/src/continuous_query_example.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/examples/continuous-query-example/src/continuous_query_example.cpp b/modules/platforms/cpp/examples/continuous-query-example/src/continuous_query_example.cpp
new file mode 100644
index 0000000..b08d4b8
--- /dev/null
+++ b/modules/platforms/cpp/examples/continuous-query-example/src/continuous_query_example.cpp
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdint.h>
+#include <iostream>
+
+#include "ignite/ignition.h"
+#include "ignite/cache/query/continuous/continuous_query.h"
+
+#include "ignite/examples/organization.h"
+#include "ignite/examples/person.h"
+
+using namespace ignite;
+using namespace cache;
+using namespace query;
+
+using namespace examples;
+
+/** Cache name. */
+const char* CACHE_NAME = "cpp_cache_continuous_query";
+
+/*
+ * Listener class.
+ */
+template<typename K, typename V>
+class Listener : public event::CacheEntryEventListener<K, V>
+{
+public:
+ /*
+ * Default constructor.
+ */
+ Listener()
+ {
+ // No-op.
+ }
+
+ /**
+ * Event callback.
+ *
+ * @param evts Events.
+ * @param num Events number.
+ */
+ virtual void OnEvent(const CacheEntryEvent<K, V>* evts, uint32_t num)
+ {
+ for (uint32_t i = 0; i < num; ++i)
+ {
+ std::cout << "Queried entry [key=" << (evts[i].HasValue() ? evts[i].GetKey() : K())
+ << ", val=" << (evts[i].HasValue() ? evts[i].GetValue() : V()) << ']'
+ << std::endl;
+ }
+ }
+};
+
+int main()
+{
+ IgniteConfiguration cfg;
+
+ cfg.springCfgPath = "platforms/cpp/examples/continuous-query-example/config/continuous-query-example.xml";
+
+ try
+ {
+ // Start a node.
+ Ignite ignite = Ignition::Start(cfg);
+
+ std::cout << std::endl;
+ std::cout << ">>> Cache continuous query example started." << std::endl;
+ std::cout << std::endl;
+
+ // Get cache instance.
+ Cache<int32_t, std::string> cache = ignite.GetOrCreateCache<int32_t, std::string>(CACHE_NAME);
+
+ cache.Clear();
+
+ const int32_t keyCnt = 20;
+
+ for (int32_t i = 0; i < keyCnt; ++i)
+ {
+ std::stringstream builder;
+
+ builder << i;
+
+ cache.Put(i, builder.str());
+ }
+
+ // Declaring listener.
+ Listener<int, std::string> listener;
+
+ // Declaring continuous query.
+ continuous::ContinuousQuery<int, std::string> qry(MakeReference(listener));
+
+ {
+ // Continous query scope. Query is closed when scope is left.
+ continuous::ContinuousQueryHandle<int, std::string> handle = cache.QueryContinuous(qry);
+
+ // Add a few more keys and watch more query notifications.
+ for (int32_t i = keyCnt; i < keyCnt + 5; ++i)
+ {
+ std::stringstream builder;
+
+ builder << i;
+
+ cache.Put(i, builder.str());
+ }
+
+ // Let user wait while callback is notified about remaining puts.
+ std::cout << std::endl;
+ std::cout << ">>> Press 'Enter' to continue..." << std::endl;
+ std::cout << std::endl;
+
+ std::cin.get();
+ }
+
+ // Stop node.
+ Ignition::StopAll(false);
+ }
+ catch (IgniteError& err)
+ {
+ std::cout << "An error occurred: " << err.GetText() << std::endl;
+ }
+
+ std::cout << std::endl;
+ std::cout << ">>> Example finished, press 'Enter' to exit ..." << std::endl;
+ std::cout << std::endl;
+
+ std::cin.get();
+
+ return 0;
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/examples/include/ignite/examples/person.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/examples/include/ignite/examples/person.h b/modules/platforms/cpp/examples/include/ignite/examples/person.h
index 2c92660..86c51c8 100644
--- a/modules/platforms/cpp/examples/include/ignite/examples/person.h
+++ b/modules/platforms/cpp/examples/include/ignite/examples/person.h
@@ -46,7 +46,7 @@ namespace ignite
// No-op.
}
- std::string ToString()
+ std::string ToString() const
{
std::ostringstream oss;
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/examples/project/vs/ignite-examples.sln
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/examples/project/vs/ignite-examples.sln b/modules/platforms/cpp/examples/project/vs/ignite-examples.sln
index 89f609f..d839f09 100644
--- a/modules/platforms/cpp/examples/project/vs/ignite-examples.sln
+++ b/modules/platforms/cpp/examples/project/vs/ignite-examples.sln
@@ -7,6 +7,8 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "odbc-example", "..\..\odbc-
EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "query-example", "..\..\query-example\project\vs\query-example.vcxproj", "{9FB34AB4-01DD-4C6F-99BF-681019D0E4DD}"
EndProject
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "continuous-query-example", "..\..\continuous-query-example\project\vs\continuous-query-example.vcxproj", "{73BB124A-0CD4-4961-A6CD-61F9C71028A6}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Release|x64 = Release|x64
@@ -25,6 +27,10 @@ Global
{9FB34AB4-01DD-4C6F-99BF-681019D0E4DD}.Release|x64.Build.0 = Release|x64
{9FB34AB4-01DD-4C6F-99BF-681019D0E4DD}.Release|x86.ActiveCfg = Release|Win32
{9FB34AB4-01DD-4C6F-99BF-681019D0E4DD}.Release|x86.Build.0 = Release|Win32
+ {73BB124A-0CD4-4961-A6CD-61F9C71028A6}.Release|x64.ActiveCfg = Release|x64
+ {73BB124A-0CD4-4961-A6CD-61F9C71028A6}.Release|x64.Build.0 = Release|x64
+ {73BB124A-0CD4-4961-A6CD-61F9C71028A6}.Release|x86.ActiveCfg = Release|Win32
+ {73BB124A-0CD4-4961-A6CD-61F9C71028A6}.Release|x86.Build.0 = Release|Win32
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/jni/include/ignite/jni/java.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/include/ignite/jni/java.h b/modules/platforms/cpp/jni/include/ignite/jni/java.h
index 442cc10..6289d73 100644
--- a/modules/platforms/cpp/jni/include/ignite/jni/java.h
+++ b/modules/platforms/cpp/jni/include/ignite/jni/java.h
@@ -389,7 +389,7 @@ namespace ignite
void TargetListenFutureForOperation(jobject obj, long long futId, int typ, int opId);
jobject CacheOutOpQueryCursor(jobject obj, int type, long long memPtr, JniErrorInfo* errInfo = NULL);
- jobject CacheOutOpContinuousQuery(jobject obj, int type, long long memPtr);
+ jobject CacheOutOpContinuousQuery(jobject obj, int type, long long memPtr, JniErrorInfo* errInfo = NULL);
jobject Acquire(jobject obj);
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/jni/src/java.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/src/java.cpp b/modules/platforms/cpp/jni/src/java.cpp
index 698263d..e6f2f88 100644
--- a/modules/platforms/cpp/jni/src/java.cpp
+++ b/modules/platforms/cpp/jni/src/java.cpp
@@ -1357,13 +1357,13 @@ namespace ignite
return LocalToGlobal(env, res);
}
- jobject JniContext::CacheOutOpContinuousQuery(jobject obj, int type, long long memPtr) {
+ jobject JniContext::CacheOutOpContinuousQuery(jobject obj, int type, long long memPtr, JniErrorInfo* err) {
JNIEnv* env = Attach();
jobject res = env->CallObjectMethod(
obj, jvm->GetMembers().m_PlatformTarget_inStreamOutObject, type, memPtr);
- ExceptionCheck(env);
+ ExceptionCheck(env, err);
return LocalToGlobal(env, res);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/odbc-test/project/vs/odbc-test.vcxproj
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc-test/project/vs/odbc-test.vcxproj b/modules/platforms/cpp/odbc-test/project/vs/odbc-test.vcxproj
index 753ae4c..6f85896 100644
--- a/modules/platforms/cpp/odbc-test/project/vs/odbc-test.vcxproj
+++ b/modules/platforms/cpp/odbc-test/project/vs/odbc-test.vcxproj
@@ -97,7 +97,7 @@
</ClCompile>
<Link>
<GenerateDebugInformation>true</GenerateDebugInformation>
- <AdditionalDependencies>$(JAVA_HOME)\lib\jvm.lib;$(BOOST_HOME)\lib64-msvc-10.0\libboost_unit_test_framework-vc100-mt-gd-1_58.lib;%(AdditionalDependencies)</AdditionalDependencies>
+ <AdditionalDependencies>$(JAVA_HOME)\lib\jvm.lib;$(BOOST_HOME)\lib32-msvc-10.0\libboost_unit_test_framework-vc100-mt-gd-1_58.lib;%(AdditionalDependencies)</AdditionalDependencies>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
@@ -130,7 +130,7 @@
<GenerateDebugInformation>true</GenerateDebugInformation>
<EnableCOMDATFolding>true</EnableCOMDATFolding>
<OptimizeReferences>true</OptimizeReferences>
- <AdditionalDependencies>$(JAVA_HOME)\lib\jvm.lib;$(BOOST_HOME)\lib64-msvc-10.0\libboost_unit_test_framework-vc100-mt-1_58.lib;%(AdditionalDependencies)</AdditionalDependencies>
+ <AdditionalDependencies>$(JAVA_HOME)\lib\jvm.lib;$(BOOST_HOME)\lib32-msvc-10.0\libboost_unit_test_framework-vc100-mt-1_58.lib;%(AdditionalDependencies)</AdditionalDependencies>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/project/vs/ignite.slnrel
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/project/vs/ignite.slnrel b/modules/platforms/cpp/project/vs/ignite.slnrel
index 1d874a8..6a37bf4 100644
--- a/modules/platforms/cpp/project/vs/ignite.slnrel
+++ b/modules/platforms/cpp/project/vs/ignite.slnrel
@@ -2,6 +2,9 @@
Microsoft Visual Studio Solution File, Format Version 11.00
# Visual Studio 2010
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "core", "..\..\core\project\vs\core.vcxproj", "{E2DEA693-F2EA-43C2-A813-053378F6E4DB}"
+ ProjectSection(ProjectDependencies) = postProject
+ {4F15669B-92EB-49F0-B774-8F19BAE0B960} = {4F15669B-92EB-49F0-B774-8F19BAE0B960}
+ EndProjectSection
EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "ignite", "..\..\ignite\project\vs\ignite.vcxproj", "{69688B4D-3EE0-43F5-A1C6-29B5D2DDE949}"
ProjectSection(ProjectDependencies) = postProject
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/project/vs/ignite_x86.slnrel
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/project/vs/ignite_x86.slnrel b/modules/platforms/cpp/project/vs/ignite_x86.slnrel
index 4785579..abd0ad1 100644
--- a/modules/platforms/cpp/project/vs/ignite_x86.slnrel
+++ b/modules/platforms/cpp/project/vs/ignite_x86.slnrel
@@ -2,6 +2,9 @@
Microsoft Visual Studio Solution File, Format Version 11.00
# Visual Studio 2010
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "core", "..\..\core\project\vs\core.vcxproj", "{E2DEA693-F2EA-43C2-A813-053378F6E4DB}"
+ ProjectSection(ProjectDependencies) = postProject
+ {4F15669B-92EB-49F0-B774-8F19BAE0B960} = {4F15669B-92EB-49F0-B774-8F19BAE0B960}
+ EndProjectSection
EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "ignite", "..\..\ignite\project\vs\ignite.vcxproj", "{69688B4D-3EE0-43F5-A1C6-29B5D2DDE949}"
ProjectSection(ProjectDependencies) = postProject
[3/3] ignite git commit: IGNITE-1443: Implemented ContinuousQuery for
C++
Posted by pt...@apache.org.
IGNITE-1443: Implemented ContinuousQuery for C++
This closes #1343
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/598b464f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/598b464f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/598b464f
Branch: refs/heads/master
Commit: 598b464f81d2e74dc0df62011e08f3a76a674db6
Parents: 700529a
Author: Igor Sapego <is...@gridgain.com>
Authored: Tue Dec 20 19:38:41 2016 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Tue Dec 20 19:38:41 2016 +0300
----------------------------------------------------------------------
.gitignore | 1 +
.../include/ignite/binary/binary_raw_reader.h | 21 +
.../ignite/impl/binary/binary_reader_impl.h | 9 +-
.../src/impl/binary/binary_reader_impl.cpp | 17 +
.../platforms/cpp/common/include/Makefile.am | 5 +-
.../common/include/ignite/common/concurrent.h | 90 ++-
.../include/ignite/common/reference_impl.h | 286 +++++++++
.../cpp/common/include/ignite/reference.h | 564 +++++++++++++++++
.../cpp/common/project/vs/common.vcxproj | 2 +
.../common/project/vs/common.vcxproj.filters | 6 +
modules/platforms/cpp/core-test/Makefile.am | 7 +-
.../core-test/config/cache-query-continuous.xml | 87 +++
.../cpp/core-test/project/vs/core-test.vcxproj | 10 +-
.../project/vs/core-test.vcxproj.filters | 6 +
.../cpp/core-test/src/cache_query_test.cpp | 76 +--
.../cpp/core-test/src/continuous_query_test.cpp | 611 +++++++++++++++++++
.../cpp/core-test/src/handle_registry_test.cpp | 18 +-
.../cpp/core-test/src/reference_test.cpp | 412 +++++++++++++
modules/platforms/cpp/core/Makefile.am | 1 +
.../cpp/core/include/ignite/cache/cache.h | 104 +++-
.../cpp/core/include/ignite/cache/cache_entry.h | 40 +-
.../ignite/cache/event/cache_entry_event.h | 139 +++++
.../cache/event/cache_entry_event_listener.h | 71 +++
.../cache/query/continuous/continuous_query.h | 239 ++++++++
.../query/continuous/continuous_query_handle.h | 133 ++++
.../core/include/ignite/impl/cache/cache_impl.h | 116 +++-
.../continuous/continuous_query_handle_impl.h | 101 +++
.../query/continuous/continuous_query_impl.h | 351 +++++++++++
.../core/include/ignite/impl/handle_registry.h | 62 +-
.../include/ignite/impl/ignite_environment.h | 34 +-
modules/platforms/cpp/core/namespaces.dox | 74 ++-
.../platforms/cpp/core/project/vs/core.vcxproj | 7 +
.../cpp/core/project/vs/core.vcxproj.filters | 30 +
.../cpp/core/src/impl/cache/cache_impl.cpp | 31 +
.../continuous/continuous_query_handle_impl.cpp | 96 +++
.../cpp/core/src/impl/handle_registry.cpp | 102 ++--
.../cpp/core/src/impl/ignite_environment.cpp | 98 ++-
modules/platforms/cpp/examples/Makefile.am | 1 +
modules/platforms/cpp/examples/configure.ac | 1 +
.../continuous-query-example/Makefile.am | 58 ++
.../config/continuous-query-example.xml | 52 ++
.../project/vs/continuous-query-example.vcxproj | 110 ++++
.../vs/continuous-query-example.vcxproj.filters | 35 ++
.../src/continuous_query_example.cpp | 142 +++++
.../examples/include/ignite/examples/person.h | 2 +-
.../cpp/examples/project/vs/ignite-examples.sln | 6 +
.../platforms/cpp/jni/include/ignite/jni/java.h | 2 +-
modules/platforms/cpp/jni/src/java.cpp | 4 +-
.../cpp/odbc-test/project/vs/odbc-test.vcxproj | 4 +-
modules/platforms/cpp/project/vs/ignite.slnrel | 3 +
.../platforms/cpp/project/vs/ignite_x86.slnrel | 3 +
51 files changed, 4216 insertions(+), 264 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index b534eb7..db6128e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -51,6 +51,7 @@ git-patch-prop-local.sh
*.opensdf
*.db
*.opendb
+.vs
ipch/
[Oo]bj/
[Bb]in
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/binary/include/ignite/binary/binary_raw_reader.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/binary/include/ignite/binary/binary_raw_reader.h b/modules/platforms/cpp/binary/include/ignite/binary/binary_raw_reader.h
index 72aab55..d15848b 100644
--- a/modules/platforms/cpp/binary/include/ignite/binary/binary_raw_reader.h
+++ b/modules/platforms/cpp/binary/include/ignite/binary/binary_raw_reader.h
@@ -398,6 +398,27 @@ namespace ignite
{
return impl->ReadObject<T>();
}
+
+ /**
+ * Try read object.
+ * Reads value, stores it to res and returns true if the value is
+ * not null. Otherwise just returns false.
+ *
+ * @param res Read value is placed here if non-null.
+ * @return True if the non-null value has been read and false
+ * otherwise.
+ */
+ template<typename T>
+ bool TryReadObject(T& res)
+ {
+ if (impl->SkipIfNull())
+ return false;
+
+ res = impl->ReadObject<T>();
+
+ return true;
+ }
+
private:
/** Implementation delegate. */
ignite::impl::binary::BinaryReaderImpl* impl;
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_reader_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_reader_impl.h b/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_reader_impl.h
index 8c4b464..cd32203 100644
--- a/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_reader_impl.h
+++ b/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_reader_impl.h
@@ -723,6 +723,13 @@ namespace ignite
bool HasNextElement(int32_t id) const;
/**
+ * Skip next value if it is the null.
+ *
+ * @return True if the null value has been detected and skipped.
+ */
+ bool SkipIfNull();
+
+ /**
* Read element.
*
* @param id Session ID.
@@ -763,7 +770,7 @@ namespace ignite
*key = ReadTopObject<K>();
*val = ReadTopObject<V>();
}
-
+
/**
* Read object.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/binary/src/impl/binary/binary_reader_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/binary/src/impl/binary/binary_reader_impl.cpp b/modules/platforms/cpp/binary/src/impl/binary/binary_reader_impl.cpp
index fb75ba5..c128df6 100644
--- a/modules/platforms/cpp/binary/src/impl/binary/binary_reader_impl.cpp
+++ b/modules/platforms/cpp/binary/src/impl/binary/binary_reader_impl.cpp
@@ -663,6 +663,23 @@ namespace ignite
return elemId == id && elemRead < elemCnt;
}
+ bool BinaryReaderImpl::SkipIfNull()
+ {
+ CheckRawMode(true);
+ CheckSingleMode(true);
+
+ InteropStreamPositionGuard<InteropInputStream> positionGuard(*stream);
+
+ int8_t hdr = stream->ReadInt8();
+
+ if (hdr != IGNITE_HDR_NULL)
+ return false;
+
+ positionGuard.Release();
+
+ return true;
+ }
+
void BinaryReaderImpl::SetRawMode()
{
CheckRawMode(false);
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/common/include/Makefile.am
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/common/include/Makefile.am b/modules/platforms/cpp/common/include/Makefile.am
index 2e53608..a5073f6 100644
--- a/modules/platforms/cpp/common/include/Makefile.am
+++ b/modules/platforms/cpp/common/include/Makefile.am
@@ -23,14 +23,15 @@ nobase_include_HEADERS = \
ignite/common/concurrent.h \
ignite/common/decimal.h \
ignite/common/default_allocator.h \
+ ignite/common/reference_impl.h \
ignite/common/dynamic_size_array.h \
ignite/common/fixed_size_array.h \
ignite/common/utils.h \
ignite/date.h \
ignite/guid.h \
ignite/ignite_error.h \
- ignite/timestamp.h
- ignite/timestamp.h
+ ignite/timestamp.h \
+ ignite/reference.h
uninstall-hook:
if [ -d ${includedir}/ignite ]; then find ${includedir}/ignite -type d -empty -delete; fi
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/common/include/ignite/common/concurrent.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/common/include/ignite/common/concurrent.h b/modules/platforms/cpp/common/include/ignite/common/concurrent.h
index 35c2209..ff0e54a 100644
--- a/modules/platforms/cpp/common/include/ignite/common/concurrent.h
+++ b/modules/platforms/cpp/common/include/ignite/common/concurrent.h
@@ -19,6 +19,7 @@
#define _IGNITE_COMMON_CONCURRENT
#include <cassert>
+#include <utility>
#include "ignite/common/concurrent_os.h"
@@ -121,10 +122,15 @@ namespace ignite
public:
friend class EnableSharedFromThis<T>;
+ template<typename T2>
+ friend class SharedPointer;
+
/**
* Constructor.
*/
- SharedPointer() : impl(0)
+ SharedPointer() :
+ ptr(0),
+ impl(0)
{
// No-op.
}
@@ -133,17 +139,17 @@ namespace ignite
* Constructor.
*
* @param ptr Raw pointer.
+ * @param deleter Delete function.
*/
- explicit SharedPointer(T* ptr)
+ SharedPointer(T* ptr, void(*deleter)(T*) = &SharedPointerDefaultDeleter<T>) :
+ ptr(ptr),
+ impl(0)
{
if (ptr)
{
- void(*deleter)(T*) = (void(*)(T*)) &SharedPointerDefaultDeleter<T>;
impl = new SharedPointerImpl(ptr, reinterpret_cast<SharedPointerImpl::DeleterType>(deleter));
ImplEnableShared(ptr, impl);
}
- else
- impl = 0;
}
/**
@@ -152,15 +158,16 @@ namespace ignite
* @param ptr Raw pointer.
* @param deleter Delete function.
*/
- SharedPointer(T* ptr, void(*deleter)(T*))
+ template<typename T2>
+ SharedPointer(T2* ptr, void(*deleter)(T2*) = &SharedPointerDefaultDeleter<T2>) :
+ ptr(ptr),
+ impl(0)
{
if (ptr)
{
impl = new SharedPointerImpl(ptr, reinterpret_cast<SharedPointerImpl::DeleterType>(deleter));
ImplEnableShared(ptr, impl);
}
- else
- impl = 0;
}
/**
@@ -169,6 +176,21 @@ namespace ignite
* @param other Instance to copy.
*/
SharedPointer(const SharedPointer& other) :
+ ptr(other.ptr),
+ impl(other.impl)
+ {
+ if (impl)
+ impl->Increment();
+ }
+
+ /**
+ * Copy constructor.
+ *
+ * @param other Instance to copy.
+ */
+ template<typename T2>
+ SharedPointer(const SharedPointer<T2>& other) :
+ ptr(other.ptr),
impl(other.impl)
{
if (impl)
@@ -186,26 +208,43 @@ namespace ignite
{
SharedPointer tmp(other);
- std::swap(impl, tmp.impl);
+ Swap(tmp);
}
return *this;
}
/**
+ * Assignment operator.
+ *
+ * @param other Other instance.
+ */
+ template<typename T2>
+ SharedPointer& operator=(const SharedPointer<T2>& other)
+ {
+ SharedPointer<T> tmp(other);
+
+ Swap(tmp);
+
+ return *this;
+ }
+
+ /**
* Destructor.
*/
~SharedPointer()
{
if (impl && impl->Decrement())
{
- T* ptr = Get();
+ void* ptr0 = impl->Pointer();
- void(*deleter)(T*) = reinterpret_cast<void(*)(T*)>(impl->Deleter());
+ void(*deleter)(void*) = impl->Deleter();
- deleter(ptr);
+ deleter(ptr0);
delete impl;
+
+ ptr = 0;
}
}
@@ -216,7 +255,7 @@ namespace ignite
*/
T* Get()
{
- return impl ? static_cast<T*>(impl->Pointer()) : 0;
+ return ptr;
}
/**
@@ -226,7 +265,7 @@ namespace ignite
*/
const T* Get() const
{
- return impl ? static_cast<T*>(impl->Pointer()) : 0;
+ return ptr;
}
/**
@@ -245,7 +284,30 @@ namespace ignite
return impl != 0;
}
+ /**
+ * Swap pointer content with another instance.
+ *
+ * @param other Other instance.
+ */
+ void Swap(SharedPointer& other)
+ {
+ if (this != &other)
+ {
+ T* ptrTmp = ptr;
+ SharedPointerImpl* implTmp = impl;
+
+ ptr = other.ptr;
+ impl = other.impl;
+
+ other.ptr = ptrTmp;
+ other.impl = implTmp;
+ }
+ }
+
private:
+ /* Pointer. */
+ T* ptr;
+
/** Implementation. */
SharedPointerImpl* impl;
};
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/common/include/ignite/common/reference_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/common/include/ignite/common/reference_impl.h b/modules/platforms/cpp/common/include/ignite/common/reference_impl.h
new file mode 100644
index 0000000..e38da8c
--- /dev/null
+++ b/modules/platforms/cpp/common/include/ignite/common/reference_impl.h
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::ReferenceImplBase class and its implementations.
+ */
+
+#ifndef _IGNITE_COMMON_REFERENCE_IMPL
+#define _IGNITE_COMMON_REFERENCE_IMPL
+
+#include <utility>
+
+#include <ignite/common/common.h>
+
+namespace ignite
+{
+ namespace common
+ {
+ // Any number is good as long as it is not null.
+ enum { POINTER_CAST_MAGIC_NUMBER = 80000 };
+
+ /**
+ * Interface for constant Reference implementation class template.
+ */
+ class ConstReferenceImplBase
+ {
+ public:
+ /**
+ * Destructor.
+ */
+ virtual ~ConstReferenceImplBase()
+ {
+ // No-op.
+ }
+
+ /**
+ * Get the constant pointer.
+ *
+ * @return Constant pointer to underlying value.
+ */
+ virtual const void* Get() const = 0;
+ };
+
+ /**
+ * Interface for Reference implementation class template.
+ */
+ class ReferenceImplBase : public ConstReferenceImplBase
+ {
+ public:
+ /**
+ * Destructor.
+ */
+ virtual ~ReferenceImplBase()
+ {
+ // No-op.
+ }
+
+ virtual const void* Get() const = 0;
+
+ /**
+ * Get the pointer.
+ *
+ * @return Pointer to underlying value.
+ */
+ virtual void* Get() = 0;
+ };
+
+ /**
+ * Reference class implementation for smart pointers.
+ *
+ * Note, this class does not implement any smart pointer functionality
+ * itself, instead it wraps one of the existing wide-spread smart
+ * pointer implementations and provides unified interface for them.
+ */
+ template<typename P>
+ class ReferenceSmartPointer : public ReferenceImplBase
+ {
+ public:
+ /**
+ * Destructor.
+ */
+ virtual ~ReferenceSmartPointer()
+ {
+ // No-op.
+ }
+
+ /**
+ * Default constructor.
+ */
+ ReferenceSmartPointer() :
+ ptr()
+ {
+ // No-op.
+ }
+
+ virtual const void* Get() const
+ {
+ return reinterpret_cast<const void*>(&(*ptr));
+ }
+
+ virtual void* Get()
+ {
+ return reinterpret_cast<void*>(&(*ptr));
+ }
+
+ /**
+ * Swap underlying smart pointer.
+ *
+ * @param other Another instance.
+ */
+ void Swap(P& other)
+ {
+ using std::swap;
+
+ swap(ptr, other);
+ }
+
+ private:
+ /** Underlying pointer. */
+ P ptr;
+ };
+
+ /**
+ * Reference implementation for the owning raw pointer.
+ */
+ template<typename T>
+ class ReferenceOwningRawPointer : public ReferenceImplBase
+ {
+ public:
+ /**
+ * Destructor.
+ */
+ virtual ~ReferenceOwningRawPointer()
+ {
+ delete ptr;
+ }
+
+ /**
+ * Default constructor.
+ */
+ ReferenceOwningRawPointer() :
+ ptr(0)
+ {
+ // No-op.
+ }
+
+ /**
+ * Pointer constructor.
+ *
+ * @param ptr Pointer to take ownership over.
+ */
+ ReferenceOwningRawPointer(T* ptr) :
+ ptr(ptr)
+ {
+ // No-op.
+ }
+
+ virtual const void* Get() const
+ {
+ return reinterpret_cast<const void*>(ptr);
+ }
+
+ virtual void* Get()
+ {
+ return reinterpret_cast<void*>(ptr);
+ }
+
+ private:
+ /** Underlying pointer. */
+ T* ptr;
+ };
+
+ /**
+ * Reference implementation for the raw pointer.
+ */
+ template<typename T>
+ class ReferenceNonOwningRawPointer : public ReferenceImplBase
+ {
+ public:
+ /**
+ * Destructor.
+ */
+ virtual ~ReferenceNonOwningRawPointer()
+ {
+ // No-op.
+ }
+
+ /**
+ * Default constructor.
+ */
+ ReferenceNonOwningRawPointer() :
+ ptr(0)
+ {
+ // No-op.
+ }
+
+ /**
+ * Pointer constructor.
+ *
+ * @param ptr Pointer.
+ */
+ ReferenceNonOwningRawPointer(T* ptr) :
+ ptr(ptr)
+ {
+ // No-op.
+ }
+
+ virtual const void* Get() const
+ {
+ return reinterpret_cast<const void*>(ptr);
+ }
+
+ virtual void* Get()
+ {
+ return reinterpret_cast<void*>(ptr);
+ }
+
+ private:
+ /** Underlying pointer. */
+ T* ptr;
+ };
+
+ /**
+ * Constant reference implementation for the raw pointer.
+ */
+ template<typename T>
+ class ConstReferenceNonOwningRawPointer : public ConstReferenceImplBase
+ {
+ public:
+ /**
+ * Destructor.
+ */
+ virtual ~ConstReferenceNonOwningRawPointer()
+ {
+ // No-op.
+ }
+
+ /**
+ * Default constructor.
+ */
+ ConstReferenceNonOwningRawPointer() :
+ ptr(0)
+ {
+ // No-op.
+ }
+
+ /**
+ * Pointer constructor.
+ *
+ * @param ptr Pointer.
+ */
+ ConstReferenceNonOwningRawPointer(const T* ptr) :
+ ptr(ptr)
+ {
+ // No-op.
+ }
+
+ virtual const void* Get() const
+ {
+ return reinterpret_cast<const void*>(ptr);
+ }
+
+ private:
+ /** Underlying pointer. */
+ const T* ptr;
+ };
+
+ }
+}
+
+#endif //_IGNITE_COMMON_REFERENCE_IMPL
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/common/include/ignite/reference.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/common/include/ignite/reference.h b/modules/platforms/cpp/common/include/ignite/reference.h
new file mode 100644
index 0000000..b026ad7
--- /dev/null
+++ b/modules/platforms/cpp/common/include/ignite/reference.h
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::Reference class.
+ */
+
+#ifndef _IGNITE_COMMON_REFERENCE
+#define _IGNITE_COMMON_REFERENCE
+
+#include <cstddef>
+
+#include <ignite/common/common.h>
+#include <ignite/common/concurrent.h>
+#include <ignite/common/reference_impl.h>
+
+namespace ignite
+{
+ template<typename T>
+ class Reference;
+
+ /**
+ * Constant Reference class.
+ *
+ * Abstraction on any reference-type object, from simple raw pointers and
+ * references to standard library smart pointers. Provides only constant
+ * access to the underlying data.
+ *
+ * There are no requirements for the template type T.
+ */
+ template<typename T>
+ class ConstReference
+ {
+ template<typename>
+ friend class ConstReference;
+
+ template<typename>
+ friend class Reference;
+
+ public:
+ /**
+ * Default constructor.
+ */
+ ConstReference() :
+ ptr(),
+ offset(0)
+ {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param ptr ConstReference class implementation.
+ * @param offset Pointer offset.
+ */
+ explicit ConstReference(common::ConstReferenceImplBase* ptr, ptrdiff_t offset = 0) :
+ ptr(ptr),
+ offset(offset)
+ {
+ // No-op.
+ }
+
+ /**
+ * Copy constructor.
+ *
+ * @param other Another instance.
+ */
+ ConstReference(const ConstReference& other) :
+ ptr(other.ptr),
+ offset(other.offset)
+ {
+ // No-op.
+ }
+
+ /**
+ * Copy constructor.
+ *
+ * Constant reference of type T2 should be static-castable to constant
+ * reference of type T.
+ *
+ * @param other Another instance.
+ */
+ template<typename T2>
+ ConstReference(const ConstReference<T2>& other) :
+ ptr(other.ptr),
+ offset(other.offset)
+ {
+ T2* p0 = reinterpret_cast<T2*>(common::POINTER_CAST_MAGIC_NUMBER);
+ T* p1 = static_cast<T*>(p0);
+
+ ptrdiff_t diff = reinterpret_cast<ptrdiff_t>(p1) - reinterpret_cast<ptrdiff_t>(p0);
+ offset += diff;
+ }
+
+ /**
+ * Assignment operator.
+ *
+ * @param other Another instance.
+ */
+ ConstReference& operator=(const ConstReference& other)
+ {
+ ptr = other.ptr;
+ offset = other.offset;
+
+ return *this;
+ }
+
+ /**
+ * Assignment operator.
+ *
+ * Constant reference of type T2 should be static-castable to constant
+ * reference of type T.
+ *
+ * @param other Another instance.
+ */
+ template<typename T2>
+ ConstReference& operator=(const ConstReference<T2>& other)
+ {
+ ptr = other.ptr;
+ offset = other.offset;
+
+ T2* p0 = reinterpret_cast<T2*>(common::POINTER_CAST_MAGIC_NUMBER);
+ T* p1 = static_cast<T*>(p0);
+
+ ptrdiff_t diff = reinterpret_cast<ptrdiff_t>(p1) - reinterpret_cast<ptrdiff_t>(p0);
+ offset += diff;
+
+ return *this;
+ }
+
+ /**
+ * Destructor.
+ */
+ ~ConstReference()
+ {
+ // No-op.
+ }
+
+ /**
+ * Dereference the pointer.
+ *
+ * If the pointer is null then this operation causes undefined
+ * behaviour.
+ *
+ * @return Constant reference to underlying value.
+ */
+ const T& Get() const
+ {
+ return *reinterpret_cast<const T*>(reinterpret_cast<ptrdiff_t>(ptr.Get()->Get()) + offset);
+ }
+
+ /**
+ * Check if the pointer is null.
+ *
+ * @return True if the value is null.
+ */
+ bool IsNull() const
+ {
+ const common::ConstReferenceImplBase* raw = ptr.Get();
+
+ return !raw || !raw->Get();
+ }
+
+ private:
+ /** Implementation. */
+ common::concurrent::SharedPointer<common::ConstReferenceImplBase> ptr;
+
+ /** Address offset. */
+ ptrdiff_t offset;
+ };
+
+ /**
+ * Reference class.
+ *
+ * Abstraction on any reference-type object, from simple raw pointers and
+ * references to standard library smart pointers.
+ *
+ * There are no requirements for the template type T.
+ */
+ template<typename T>
+ class Reference
+ {
+ template<typename>
+ friend class Reference;
+ public:
+ /**
+ * Default constructor.
+ */
+ Reference() :
+ ptr(),
+ offset(0)
+ {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param ptr Reference class implementation.
+ * @param offset Pointer offset.
+ */
+ explicit Reference(common::ReferenceImplBase* ptr, ptrdiff_t offset = 0) :
+ ptr(ptr),
+ offset(offset)
+ {
+ // No-op.
+ }
+
+ /**
+ * Copy constructor.
+ *
+ * @param other Another instance.
+ */
+ Reference(const Reference& other) :
+ ptr(other.ptr),
+ offset(other.offset)
+ {
+ // No-op.
+ }
+
+ /**
+ * Copy constructor.
+ *
+ * Reference of type T2 should be static-castable to reference of type T.
+ *
+ * @param other Another instance.
+ */
+ template<typename T2>
+ Reference(const Reference<T2>& other) :
+ ptr(other.ptr),
+ offset(other.offset)
+ {
+ T2* p0 = reinterpret_cast<T2*>(common::POINTER_CAST_MAGIC_NUMBER);
+ T* p1 = static_cast<T*>(p0);
+
+ ptrdiff_t diff = reinterpret_cast<ptrdiff_t>(p1) - reinterpret_cast<ptrdiff_t>(p0);
+ offset += diff;
+ }
+
+ /**
+ * Assignment operator.
+ *
+ * @param other Another instance.
+ */
+ Reference& operator=(const Reference& other)
+ {
+ ptr = other.ptr;
+ offset = other.offset;
+
+ return *this;
+ }
+
+ /**
+ * Assignment operator.
+ *
+ * Reference of type T2 should be static-castable to reference of type T.
+ *
+ * @param other Another instance.
+ */
+ template<typename T2>
+ Reference& operator=(const Reference<T2>& other)
+ {
+ ptr = other.ptr;
+ offset = other.offset;
+
+ T2* p0 = reinterpret_cast<T2*>(common::POINTER_CAST_MAGIC_NUMBER);
+ T* p1 = static_cast<T*>(p0);
+
+ ptrdiff_t diff = reinterpret_cast<ptrdiff_t>(p1) - reinterpret_cast<ptrdiff_t>(p0);
+ offset += diff;
+
+ return *this;
+ }
+
+ /**
+ * Destructor.
+ */
+ ~Reference()
+ {
+ // No-op.
+ }
+
+ /**
+ * Const cast operator.
+ *
+ * Reference of type T2 should be static-castable to reference of type T.
+ *
+ * Casts this instance to constant reference.
+ */
+ template<typename T2>
+ operator ConstReference<T2>()
+ {
+ ConstReference<T2> cr;
+
+ cr.ptr = ptr;
+ cr.offset = offset;
+
+ T2* p0 = reinterpret_cast<T2*>(common::POINTER_CAST_MAGIC_NUMBER);
+ const T* p1 = static_cast<T*>(p0);
+
+ ptrdiff_t diff = reinterpret_cast<ptrdiff_t>(p1) - reinterpret_cast<ptrdiff_t>(p0);
+ cr.offset -= diff;
+
+ return cr;
+ }
+
+ /**
+ * Dereference the pointer.
+ *
+ * If the pointer is null then this operation causes undefined
+ * behaviour.
+ *
+ * @return Constant reference to underlying value.
+ */
+ const T& Get() const
+ {
+ return *reinterpret_cast<const T*>(reinterpret_cast<ptrdiff_t>(ptr.Get()->Get()) + offset);
+ }
+
+ /**
+ * Dereference the pointer.
+ *
+ * If the pointer is null then this operation causes undefined
+ * behaviour.
+ *
+ * @return Reference to underlying value.
+ */
+ T& Get()
+ {
+ return *reinterpret_cast<T*>(reinterpret_cast<ptrdiff_t>(ptr.Get()->Get()) + offset);
+ }
+
+ /**
+ * Check if the pointer is null.
+ *
+ * @return True if the value is null.
+ */
+ bool IsNull() const
+ {
+ const common::ReferenceImplBase* raw = ptr.Get();
+
+ return !raw || !raw->Get();
+ }
+
+ private:
+ /** Implementation. */
+ common::concurrent::SharedPointer<common::ReferenceImplBase> ptr;
+
+ /** Address offset. */
+ ptrdiff_t offset;
+ };
+
+ /**
+ * Make ignite::Reference instance out of smart pointer.
+ *
+ * Template type 'T' should be a smart pointer and provide pointer semantics:
+ * - There should be defined type 'T::element_type', showing underlying type.
+ * - Type 'T' should be dereferencible (should have operators
+ * T::element_type& operator*() and const T::element_type& operator*() const).
+ * - Operation std::swap should result in valid result if applied to two
+ * instances of that type.
+ *
+ * @param ptr Pointer.
+ * @return Implementation defined value. User should not explicitly use the
+ * returned value.
+ */
+ template<typename T>
+ Reference<typename T::element_type> MakeReferenceFromSmartPointer(T ptr)
+ {
+ common::ReferenceSmartPointer<T>* impl = new common::ReferenceSmartPointer<T>();
+
+ Reference<typename T::element_type> res(impl);
+
+ impl->Swap(ptr);
+
+ return res;
+ }
+
+ /**
+ * Make ignite::ConstReference instance out of smart pointer.
+ *
+ * Template type 'T' should be a smart pointer and provide pointer semantics:
+ * - There should be defined type 'T::element_type', showing underlying type.
+ * - Type 'T' should be dereferencible (should have operators
+ * T::element_type& operator*() and const T::element_type& operator*() const).
+ * - Operation std::swap should result in valid result if applied to two
+ * instances of that type.
+ *
+ * @param ptr Pointer.
+ * @return Implementation defined value. User should not explicitly use the
+ * returned value.
+ */
+ template<typename T>
+ ConstReference<typename T::element_type> MakeConstReferenceFromSmartPointer(T ptr)
+ {
+ common::ReferenceSmartPointer<T>* impl = new common::ReferenceSmartPointer<T>();
+
+ ConstReference<typename T::element_type> res(impl);
+
+ impl->Swap(ptr);
+
+ return res;
+ }
+
+ /**
+ * Copy object and wrap it to make ignite::Reference instance.
+ *
+ * Template type 'T' should be copy-constructible.
+ *
+ * @param val Instance.
+ * @return Implementation defined value. User should not explicitly use the
+ * returned value.
+ */
+ template<typename T>
+ Reference<T> MakeReferenceFromCopy(const T& val)
+ {
+ common::ReferenceOwningRawPointer<T>* impl = new common::ReferenceOwningRawPointer<T>(new T(val));
+
+ return Reference<T>(impl);
+ }
+
+ /**
+ * Copy object and wrap it to make ignite::ConstReference instance.
+ *
+ * Template type 'T' should be copy-constructible.
+ *
+ * @param val Instance.
+ * @return Implementation defined value. User should not explicitly use the
+ * returned value.
+ */
+ template<typename T>
+ ConstReference<T> MakeConstReferenceFromCopy(const T& val)
+ {
+ common::ReferenceOwningRawPointer<T>* impl = new common::ReferenceOwningRawPointer<T>(new T(val));
+
+ return ConstReference<T>(impl);
+ }
+
+ /**
+ * Make ignite::Reference instance out of pointer and pass its ownership.
+ * Passed object deleted by Ignite when no longer needed.
+ *
+ * There are no requirements for the template type T.
+ *
+ * @param val Instance.
+ * @return Implementation defined value. User should not explicitly use the
+ * returned value.
+ */
+ template<typename T>
+ Reference<T> MakeReferenceFromOwningPointer(T* val)
+ {
+ common::ReferenceOwningRawPointer<T>* impl = new common::ReferenceOwningRawPointer<T>(val);
+
+ return Reference<T>(impl);
+ }
+
+ /**
+ * Make ignite::ConstReference instance out of pointer and pass its ownership.
+ * Passed object deleted by Ignite when no longer needed.
+ *
+ * There are no requirements for the template type T.
+ *
+ * @param val Instance.
+ * @return Implementation defined value. User should not explicitly use the
+ * returned value.
+ */
+ template<typename T>
+ ConstReference<T> MakeConstReferenceFromOwningPointer(T* val)
+ {
+ common::ReferenceOwningRawPointer<T>* impl = new common::ReferenceOwningRawPointer<T>(val);
+
+ return ConstReference<T>(impl);
+ }
+
+ /**
+ * Make ignite::Reference instance out of reference.
+ * Ignite do not manage passed object and does not affect its lifetime.
+ *
+ * There are no requirements for the template type T.
+ *
+ * @param val Reference.
+ * @return Implementation defined value. User should not explicitly use the
+ * returned value.
+ */
+ template<typename T>
+ Reference<T> MakeReference(T& val)
+ {
+ common::ReferenceNonOwningRawPointer<T>* impl = new common::ReferenceNonOwningRawPointer<T>(&val);
+
+ return Reference<T>(impl);
+ }
+
+ /**
+ * Make ignite::Reference instance out of pointer.
+ * Ignite do not manage passed object and does not affect its lifetime.
+ *
+ * There are no requirements for the template type T.
+ *
+ * @param val Reference.
+ * @return Implementation defined value. User should not explicitly use the
+ * returned value.
+ */
+ template<typename T>
+ Reference<T> MakeReference(T* val)
+ {
+ common::ReferenceNonOwningRawPointer<T>* impl = new common::ReferenceNonOwningRawPointer<T>(val);
+
+ return Reference<T>(impl);
+ }
+
+ /**
+ * Make ignite::ConstReference instance out of constant reference.
+ * Ignite do not manage passed object and does not affect its lifetime.
+ *
+ * There are no requirements for the template type T.
+ *
+ * @param val Reference.
+ * @return Implementation defined value. User should not explicitly use the
+ * returned value.
+ */
+ template<typename T>
+ ConstReference<T> MakeConstReference(const T& val)
+ {
+ common::ConstReferenceNonOwningRawPointer<T>* impl = new common::ConstReferenceNonOwningRawPointer<T>(&val);
+
+ return ConstReference<T>(impl);
+ }
+
+ /**
+ * Make ignite::ConstReference instance out of constant pointer.
+ * Ignite do not manage passed object and does not affect its lifetime.
+ *
+ * There are no requirements for the template type T.
+ *
+ * @param val Reference.
+ * @return Implementation defined value. User should not explicitly use the
+ * returned value.
+ */
+ template<typename T>
+ ConstReference<T> MakeConstReference(const T* val)
+ {
+ common::ConstReferenceNonOwningRawPointer<T>* impl = new common::ConstReferenceNonOwningRawPointer<T>(val);
+
+ return ConstReference<T>(impl);
+ }
+}
+
+#endif //_IGNITE_COMMON_REFERENCE
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/common/project/vs/common.vcxproj
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/common/project/vs/common.vcxproj b/modules/platforms/cpp/common/project/vs/common.vcxproj
index 149fa48..99fd551 100644
--- a/modules/platforms/cpp/common/project/vs/common.vcxproj
+++ b/modules/platforms/cpp/common/project/vs/common.vcxproj
@@ -171,10 +171,12 @@
<ClInclude Include="..\..\include\ignite\common\dynamic_size_array.h" />
<ClInclude Include="..\..\include\ignite\common\fixed_size_array.h" />
<ClInclude Include="..\..\include\ignite\common\bits.h" />
+ <ClInclude Include="..\..\include\ignite\common\reference_impl.h" />
<ClInclude Include="..\..\include\ignite\common\utils.h" />
<ClInclude Include="..\..\include\ignite\date.h" />
<ClInclude Include="..\..\include\ignite\guid.h" />
<ClInclude Include="..\..\include\ignite\ignite_error.h" />
+ <ClInclude Include="..\..\include\ignite\reference.h" />
<ClInclude Include="..\..\include\ignite\timestamp.h" />
<ClInclude Include="..\..\os\win\include\ignite\common\common.h" />
<ClInclude Include="..\..\os\win\include\ignite\common\concurrent_os.h" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/common/project/vs/common.vcxproj.filters
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/common/project/vs/common.vcxproj.filters b/modules/platforms/cpp/common/project/vs/common.vcxproj.filters
index ae17daf..d99722b 100644
--- a/modules/platforms/cpp/common/project/vs/common.vcxproj.filters
+++ b/modules/platforms/cpp/common/project/vs/common.vcxproj.filters
@@ -58,6 +58,12 @@
<ClInclude Include="..\..\include\ignite\common\decimal.h">
<Filter>Code\common</Filter>
</ClInclude>
+ <ClInclude Include="..\..\include\ignite\common\reference_impl.h">
+ <Filter>Code\common</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\include\ignite\reference.h">
+ <Filter>Code</Filter>
+ </ClInclude>
</ItemGroup>
<ItemGroup>
<ClCompile Include="..\..\src\date.cpp">
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core-test/Makefile.am
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/Makefile.am b/modules/platforms/cpp/core-test/Makefile.am
index 42680cd..3cf1d0e 100644
--- a/modules/platforms/cpp/core-test/Makefile.am
+++ b/modules/platforms/cpp/core-test/Makefile.am
@@ -43,15 +43,20 @@ AM_CXXFLAGS = \
ignite_tests_LDADD = \
@top_srcdir@/core/libignite.la \
- -lpthread
+ -lpthread \
+ -lboost_thread \
+ -lboost_system \
+ -lboost_chrono
ignite_tests_LDFLAGS = \
-static-libtool-libs
ignite_tests_SOURCES = \
+ src/reference_test.cpp \
src/bits_test.cpp \
src/cache_test.cpp \
src/cache_query_test.cpp \
+ src/continuous_query_test.cpp \
src/concurrent_test.cpp \
src/ignition_test.cpp \
src/interop_memory_test.cpp \
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core-test/config/cache-query-continuous.xml
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/config/cache-query-continuous.xml b/modules/platforms/cpp/core-test/config/cache-query-continuous.xml
new file mode 100644
index 0000000..1b940fd
--- /dev/null
+++ b/modules/platforms/cpp/core-test/config/cache-query-continuous.xml
@@ -0,0 +1,87 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="localHost" value="127.0.0.1"/>
+ <property name="connectorConfiguration"><null/></property>
+
+ <property name="cacheConfiguration">
+ <list>
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="name" value="transactional_no_backup"/>
+ <property name="cacheMode" value="PARTITIONED"/>
+ <property name="atomicityMode" value="TRANSACTIONAL"/>
+ <property name="writeSynchronizationMode" value="FULL_SYNC"/>
+ <property name="backups" value="0"/>
+ <property name="startSize" value="10"/>
+ <property name="queryEntities">
+ <list>
+ <bean class="org.apache.ignite.cache.QueryEntity">
+ <property name="keyType" value="java.lang.Integer"/>
+ <property name="valueType" value="TestEntry"/>
+
+ <property name="fields">
+ <map>
+ <entry key="value" value="java.lang.Integer"/>
+ </map>
+ </property>
+
+ <property name="indexes">
+ <list>
+ <bean class="org.apache.ignite.cache.QueryIndex">
+ <property name="fields">
+ <map>
+ <entry key="value" value="true"/>
+ </map>
+ </property>
+ <property name="indexType" value="FULLTEXT"/>
+ </bean>
+ </list>
+ </property>
+ </bean>
+ </list>
+ </property>
+ </bean>
+ </list>
+ </property>
+
+ <property name="discoverySpi">
+ <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+ <property name="ipFinder">
+ <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+ <property name="addresses">
+ <list>
+ <!-- In distributed environment, replace with actual host IP address. -->
+ <value>127.0.0.1:47500</value>
+ </list>
+ </property>
+ </bean>
+ </property>
+ <property name="socketTimeout" value="300" />
+ </bean>
+ </property>
+ </bean>
+</beans>
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj
index 6f13719..a41d8f8 100644
--- a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj
+++ b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj
@@ -43,6 +43,7 @@
<ClCompile Include="..\..\src\decimal_test.cpp" />
<ClCompile Include="..\..\src\dynamic_size_array_test.cpp" />
<ClCompile Include="..\..\src\fixed_size_array_test.cpp" />
+ <ClCompile Include="..\..\src\continuous_query_test.cpp" />
<ClCompile Include="..\..\src\ignite_error_test.cpp" />
<ClCompile Include="..\..\src\ignition_test.cpp" />
<ClCompile Include="..\..\src\handle_registry_test.cpp" />
@@ -54,6 +55,7 @@
<ClCompile Include="..\..\src\interop_memory_test.cpp" />
<ClCompile Include="..\..\src\interop_test.cpp" />
<ClCompile Include="..\..\src\bits_test.cpp" />
+ <ClCompile Include="..\..\src\reference_test.cpp" />
<ClCompile Include="..\..\src\teamcity_boost.cpp" />
<ClCompile Include="..\..\src\teamcity_messages.cpp" />
<ClCompile Include="..\..\src\transactions_test.cpp" />
@@ -129,7 +131,7 @@
</ClCompile>
<Link>
<GenerateDebugInformation>true</GenerateDebugInformation>
- <AdditionalDependencies>$(BOOST_HOME)\lib64-msvc-10.0\libboost_unit_test_framework-vc100-mt-gd-1_58.lib;%(AdditionalDependencies)</AdditionalDependencies>
+ <AdditionalDependencies>$(BOOST_HOME)\lib64-msvc-10.0\libboost_unit_test_framework-vc100-mt-gd-1_58.lib;$(BOOST_HOME)\lib64-msvc-10.0\libboost_thread-vc100-mt-gd-1_58.lib;$(BOOST_HOME)\lib64-msvc-10.0\libboost_system-vc100-mt-gd-1_58.lib;$(BOOST_HOME)\lib64-msvc-10.0\libboost_chrono-vc100-mt-gd-1_58.lib;%(AdditionalDependencies)</AdditionalDependencies>
<SubSystem>Console</SubSystem>
</Link>
</ItemDefinitionGroup>
@@ -144,7 +146,7 @@
</ClCompile>
<Link>
<GenerateDebugInformation>true</GenerateDebugInformation>
- <AdditionalDependencies>$(BOOST_HOME)\lib64-msvc-10.0\libboost_unit_test_framework-vc100-mt-gd-1_58.lib;%(AdditionalDependencies)</AdditionalDependencies>
+ <AdditionalDependencies>$(BOOST_HOME)\lib32-msvc-10.0\libboost_unit_test_framework-vc100-mt-gd-1_58.lib;$(BOOST_HOME)\lib32-msvc-10.0\libboost_thread-vc100-mt-gd-1_58.lib;$(BOOST_HOME)\lib32-msvc-10.0\libboost_system-vc100-mt-gd-1_58.lib;$(BOOST_HOME)\lib32-msvc-10.0\libboost_chrono-vc100-mt-gd-1_58.lib;%(AdditionalDependencies)</AdditionalDependencies>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
@@ -162,7 +164,7 @@
<GenerateDebugInformation>true</GenerateDebugInformation>
<EnableCOMDATFolding>true</EnableCOMDATFolding>
<OptimizeReferences>true</OptimizeReferences>
- <AdditionalDependencies>$(BOOST_HOME)\lib64-msvc-10.0\libboost_unit_test_framework-vc100-mt-1_58.lib;%(AdditionalDependencies)</AdditionalDependencies>
+ <AdditionalDependencies>$(BOOST_HOME)\lib64-msvc-10.0\libboost_unit_test_framework-vc100-mt-1_58.lib;$(BOOST_HOME)\lib64-msvc-10.0\libboost_thread-vc100-mt-1_58.lib;$(BOOST_HOME)\lib64-msvc-10.0\libboost_system-vc100-mt-1_58.lib;$(BOOST_HOME)\lib64-msvc-10.0\libboost_chrono-vc100-mt-1_58.lib;%(AdditionalDependencies)</AdditionalDependencies>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
@@ -180,7 +182,7 @@
<GenerateDebugInformation>true</GenerateDebugInformation>
<EnableCOMDATFolding>true</EnableCOMDATFolding>
<OptimizeReferences>true</OptimizeReferences>
- <AdditionalDependencies>$(BOOST_HOME)\lib64-msvc-10.0\libboost_unit_test_framework-vc100-mt-1_58.lib;%(AdditionalDependencies)</AdditionalDependencies>
+ <AdditionalDependencies>$(BOOST_HOME)\lib32-msvc-10.0\libboost_unit_test_framework-vc100-mt-1_58.lib;$(BOOST_HOME)\lib32-msvc-10.0\libboost_thread-vc100-mt-1_58.lib;$(BOOST_HOME)\lib32-msvc-10.0\libboost_system-vc100-mt-1_58.lib;$(BOOST_HOME)\lib32-msvc-10.0\libboost_chrono-vc100-mt-1_58.lib;%(AdditionalDependencies)</AdditionalDependencies>
</Link>
</ItemDefinitionGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters
index cf1aaca..a95e3a4 100644
--- a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters
+++ b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters
@@ -58,6 +58,12 @@
<ClCompile Include="..\..\src\interop_test.cpp">
<Filter>Code</Filter>
</ClCompile>
+ <ClCompile Include="..\..\src\continuous_query_test.cpp">
+ <Filter>Code</Filter>
+ </ClCompile>
+ <ClCompile Include="..\..\src\reference_test.cpp">
+ <Filter>Code</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\include\teamcity_messages.h">
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core-test/src/cache_query_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/src/cache_query_test.cpp b/modules/platforms/cpp/core-test/src/cache_query_test.cpp
index c1c26ac..928d29e 100644
--- a/modules/platforms/cpp/core-test/src/cache_query_test.cpp
+++ b/modules/platforms/cpp/core-test/src/cache_query_test.cpp
@@ -232,8 +232,6 @@ private:
int32_t someVal;
};
-
-
namespace ignite
{
namespace binary
@@ -296,74 +294,6 @@ namespace ignite
}
}
-///**
-// * Test setup fixture.
-// */
-//struct CacheQueryTestSuiteFixture
-//{
-// Ignite StartNode(const char* name)
-// {
-// IgniteConfiguration cfg;
-//
-// cfg.jvmOpts.push_back("-Xdebug");
-// cfg.jvmOpts.push_back("-Xnoagent");
-// cfg.jvmOpts.push_back("-Djava.compiler=NONE");
-// cfg.jvmOpts.push_back("-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005");
-// cfg.jvmOpts.push_back("-XX:+HeapDumpOnOutOfMemoryError");
-//
-//#ifdef IGNITE_TESTS_32
-// cfg.jvmInitMem = 256;
-// cfg.jvmMaxMem = 768;
-//#else
-// cfg.jvmInitMem = 1024;
-// cfg.jvmMaxMem = 4096;
-//#endif
-//
-// cfg.springCfgPath.assign(getenv("IGNITE_NATIVE_TEST_CPP_CONFIG_PATH")).append("/cache-query.xml");
-//
-// IgniteError err;
-//
-// Ignite grid0 = Ignition::Start(cfg, name, &err);
-//
-// if (err.GetCode() != IgniteError::IGNITE_SUCCESS)
-// BOOST_ERROR(err.GetText());
-//
-// return grid0;
-// }
-//
-//
-// /**
-// * Constructor.
-// */
-// CacheQueryTestSuiteFixture() :
-// grid(StartNode("Node1"))
-// {
-// // No-op.
-// }
-//
-// /**
-// * Destructor.
-// */
-// ~CacheQueryTestSuiteFixture()
-// {
-// Ignition::StopAll(true);
-// }
-//
-// /** Person cache accessor. */
-// Cache<int, QueryPerson> GetPersonCache()
-// {
-// return grid.GetCache<int, QueryPerson>("QueryPerson");
-// }
-//
- ///** Relation cache accessor. */
- //Cache<int, QueryRelation> GetRelationCache()
- //{
- // return grid.GetCache<int, QueryRelation>("QueryRelation");
- //}
-//
-// /** Node started during the test. */
-// Ignite grid;
-//};
/**
* Count number of records returned by cursor.
@@ -677,7 +607,11 @@ struct CacheQueryTestSuiteFixture
cfg.jvmMaxMem = 4096;
#endif
- cfg.springCfgPath.assign(getenv("IGNITE_NATIVE_TEST_CPP_CONFIG_PATH")).append("/cache-query.xml");
+ const char* cfgPath = getenv("IGNITE_NATIVE_TEST_CPP_CONFIG_PATH");
+
+ BOOST_CHECK(cfgPath != 0);
+
+ cfg.springCfgPath.assign(cfgPath).append("/cache-query.xml");
IgniteError err;
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core-test/src/continuous_query_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/src/continuous_query_test.cpp b/modules/platforms/cpp/core-test/src/continuous_query_test.cpp
new file mode 100644
index 0000000..e9d7e8a
--- /dev/null
+++ b/modules/platforms/cpp/core-test/src/continuous_query_test.cpp
@@ -0,0 +1,611 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _MSC_VER
+ #define BOOST_TEST_DYN_LINK
+#endif
+
+#include <deque>
+
+#include <boost/test/unit_test.hpp>
+#include <boost/optional.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/condition_variable.hpp>
+
+#include "ignite/ignition.h"
+#include "ignite/cache/cache.h"
+
+using namespace ignite;
+using namespace ignite::cache;
+using namespace ignite::cache::event;
+using namespace ignite::cache::query;
+using namespace ignite::cache::query::continuous;
+using namespace boost::unit_test;
+
+/**
+ * Very simple concurrent queue implementation.
+ */
+template<typename T>
+class ConcurrentQueue
+{
+public:
+ /*
+ * Constructor.
+ */
+ ConcurrentQueue()
+ {
+ // No-op.
+ }
+
+ /*
+ * Push next element to queue.
+ *
+ * @param val Value to push.
+ */
+ void Push(const T& val)
+ {
+ boost::unique_lock<boost::mutex> guard(mutex);
+
+ queue.push_back(val);
+
+ cv.notify_one();
+ }
+
+ /*
+ * Pull element from the queue with the specified timeout.
+ *
+ * @param val Value is placed there on success.
+ * @param timeout Timeout.
+ * @return True on success and false on timeout.
+ */
+ template <typename Rep, typename Period>
+ bool Pull(T& val, const boost::chrono::duration<Rep, Period>& timeout)
+ {
+ boost::unique_lock<boost::mutex> guard(mutex);
+
+ if (queue.empty())
+ {
+ boost::cv_status res = cv.wait_for(guard, timeout);
+
+ if (res == boost::cv_status::timeout)
+ return false;
+ }
+
+ assert(!queue.empty());
+
+ val = queue.front();
+
+ queue.pop_front();
+
+ return true;
+ }
+
+private:
+ boost::mutex mutex;
+
+ boost::condition_variable cv;
+
+ std::deque<T> queue;
+};
+
+/*
+ * Test listener class. Stores events it has been notified about in concurrent
+ * queue so they can be checked later.
+ */
+template<typename K, typename V>
+class Listener : public CacheEntryEventListener<K, V>
+{
+public:
+ /*
+ * Default constructor.
+ */
+ Listener()
+ {
+ // No-op.
+ }
+
+ /**
+ * Event callback.
+ *
+ * @param evts Events.
+ * @param num Events number.
+ */
+ virtual void OnEvent(const CacheEntryEvent<K, V>* evts, uint32_t num)
+ {
+ for (uint32_t i = 0; i < num; ++i)
+ eventQueue.Push(evts[i]);
+ }
+
+ /*
+ * Check that next received event contains specific values.
+ *
+ * @param key Key.
+ * @param oldVal Old value.
+ * @param val Current value.
+ */
+ void CheckNextEvent(const K& key, boost::optional<V> oldVal, boost::optional<V> val)
+ {
+ CacheEntryEvent<K, V> event;
+ bool success = eventQueue.Pull(event, boost::chrono::seconds(1));
+
+ BOOST_REQUIRE(success);
+
+ BOOST_CHECK_EQUAL(event.GetKey(), key);
+ BOOST_CHECK_EQUAL(event.HasOldValue(), oldVal.is_initialized());
+ BOOST_CHECK_EQUAL(event.HasValue(), val.is_initialized());
+
+ if (oldVal && event.HasOldValue())
+ BOOST_CHECK_EQUAL(event.GetOldValue().value, oldVal->value);
+
+ if (val && event.HasValue())
+ BOOST_CHECK_EQUAL(event.GetValue().value, val->value);
+ }
+
+ /*
+ * Check that there is no event for the specified ammount of time.
+ *
+ * @param timeout Timeout.
+ */
+ template <typename Rep, typename Period>
+ void CheckNoEvent(const boost::chrono::duration<Rep, Period>& timeout)
+ {
+ CacheEntryEvent<K, V> event;
+ bool success = eventQueue.Pull(event, timeout);
+
+ BOOST_REQUIRE(!success);
+ }
+
+private:
+ // Events queue.
+ ConcurrentQueue< CacheEntryEvent<K, V> > eventQueue;
+};
+
+/*
+ * Test entry.
+ */
+struct TestEntry
+{
+ /*
+ * Default constructor.
+ */
+ TestEntry() : value(0)
+ {
+ // No-op.
+ }
+
+ /*
+ * Constructor.
+ */
+ TestEntry(int32_t val) : value(val)
+ {
+ // No-op.
+ }
+
+ /* Value */
+ int32_t value;
+};
+
+namespace ignite
+{
+ namespace binary
+ {
+ /**
+ * Binary type definition.
+ */
+ IGNITE_BINARY_TYPE_START(TestEntry)
+ IGNITE_BINARY_GET_TYPE_ID_AS_HASH(TestEntry)
+ IGNITE_BINARY_GET_TYPE_NAME_AS_IS(TestEntry)
+ IGNITE_BINARY_GET_FIELD_ID_AS_HASH
+ IGNITE_BINARY_GET_HASH_CODE_ZERO(TestEntry)
+ IGNITE_BINARY_IS_NULL_FALSE(TestEntry)
+ IGNITE_BINARY_GET_NULL_DEFAULT_CTOR(TestEntry)
+
+ void Write(BinaryWriter& writer, const TestEntry& obj)
+ {
+ writer.WriteInt32("value", obj.value);
+ }
+
+ TestEntry Read(BinaryReader& reader)
+ {
+ TestEntry res;
+ res.value = reader.ReadInt32("value");
+
+ return res;
+ }
+
+ IGNITE_BINARY_TYPE_END
+ }
+}
+
+/*
+ * Test setup fixture.
+ */
+struct ContinuousQueryTestSuiteFixture
+{
+ Ignite grid;
+
+ Cache<int, TestEntry> cache;
+
+ /*
+ * Get configuration for nodes.
+ */
+ IgniteConfiguration GetConfiguration()
+ {
+ IgniteConfiguration cfg;
+
+ cfg.jvmOpts.push_back("-Xdebug");
+ cfg.jvmOpts.push_back("-Xnoagent");
+ cfg.jvmOpts.push_back("-Djava.compiler=NONE");
+ cfg.jvmOpts.push_back("-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005");
+ cfg.jvmOpts.push_back("-XX:+HeapDumpOnOutOfMemoryError");
+
+#ifdef IGNITE_TESTS_32
+ cfg.jvmInitMem = 256;
+ cfg.jvmMaxMem = 768;
+#else
+ cfg.jvmInitMem = 1024;
+ cfg.jvmMaxMem = 4096;
+#endif
+
+ char* cfgPath = getenv("IGNITE_NATIVE_TEST_CPP_CONFIG_PATH");
+
+ cfg.springCfgPath = std::string(cfgPath).append("/").append("cache-query-continuous.xml");
+
+ return cfg;
+ }
+
+ /*
+ * Constructor.
+ */
+ ContinuousQueryTestSuiteFixture() :
+ grid(Ignition::Start(GetConfiguration(), "node-01")),
+ cache(grid.GetCache<int, TestEntry>("transactional_no_backup"))
+ {
+ // No-op.
+ }
+
+ /*
+ * Destructor.
+ */
+ ~ContinuousQueryTestSuiteFixture()
+ {
+ Ignition::StopAll(false);
+
+ grid = Ignite();
+ }
+};
+
+void CheckEvents(Cache<int, TestEntry>& cache, Listener<int, TestEntry>& lsnr)
+{
+ cache.Put(1, TestEntry(10));
+ lsnr.CheckNextEvent(1, boost::none, TestEntry(10));
+
+ cache.Put(1, TestEntry(20));
+ lsnr.CheckNextEvent(1, TestEntry(10), TestEntry(20));
+
+ cache.Put(2, TestEntry(20));
+ lsnr.CheckNextEvent(2, boost::none, TestEntry(20));
+
+ cache.Remove(1);
+ lsnr.CheckNextEvent(1, TestEntry(20), boost::none);
+}
+
+BOOST_FIXTURE_TEST_SUITE(ContinuousQueryTestSuite, ContinuousQueryTestSuiteFixture)
+
+BOOST_AUTO_TEST_CASE(TestBasic)
+{
+ Listener<int, TestEntry> lsnr;
+
+ ContinuousQuery<int, TestEntry> qry(MakeReference(lsnr));
+
+ ContinuousQueryHandle<int, TestEntry> handle = cache.QueryContinuous(qry);
+
+ CheckEvents(cache, lsnr);
+}
+
+BOOST_AUTO_TEST_CASE(TestInitialQueryScan)
+{
+ Listener<int, TestEntry> lsnr;
+
+ ContinuousQuery<int, TestEntry> qry(MakeReference(lsnr));
+
+ cache.Put(11, TestEntry(111));
+ cache.Put(22, TestEntry(222));
+ cache.Put(33, TestEntry(333));
+
+ ContinuousQueryHandle<int, TestEntry> handle = cache.QueryContinuous(qry, ScanQuery());
+
+ std::vector< CacheEntry<int, TestEntry> > vals;
+
+ handle.GetInitialQueryCursor().GetAll(vals);
+
+ BOOST_CHECK_THROW(handle.GetInitialQueryCursor(), IgniteError);
+
+ BOOST_REQUIRE_EQUAL(vals.size(), 3);
+
+ BOOST_CHECK_EQUAL(vals[0].GetKey(), 11);
+ BOOST_CHECK_EQUAL(vals[1].GetKey(), 22);
+ BOOST_CHECK_EQUAL(vals[2].GetKey(), 33);
+
+ BOOST_CHECK_EQUAL(vals[0].GetValue().value, 111);
+ BOOST_CHECK_EQUAL(vals[1].GetValue().value, 222);
+ BOOST_CHECK_EQUAL(vals[2].GetValue().value, 333);
+
+ CheckEvents(cache, lsnr);
+}
+
+BOOST_AUTO_TEST_CASE(TestInitialQuerySql)
+{
+ Listener<int, TestEntry> lsnr;
+
+ ContinuousQuery<int, TestEntry> qry(MakeReference(lsnr));
+
+ cache.Put(11, TestEntry(111));
+ cache.Put(22, TestEntry(222));
+ cache.Put(33, TestEntry(333));
+
+ ContinuousQueryHandle<int, TestEntry> handle = cache.QueryContinuous(qry, SqlQuery("TestEntry", "value > 200"));
+
+ std::vector< CacheEntry<int, TestEntry> > vals;
+
+ handle.GetInitialQueryCursor().GetAll(vals);
+
+ BOOST_CHECK_THROW(handle.GetInitialQueryCursor(), IgniteError);
+
+ BOOST_REQUIRE_EQUAL(vals.size(), 2);
+
+ BOOST_CHECK_EQUAL(vals[0].GetKey(), 22);
+ BOOST_CHECK_EQUAL(vals[1].GetKey(), 33);
+
+ BOOST_CHECK_EQUAL(vals[0].GetValue().value, 222);
+ BOOST_CHECK_EQUAL(vals[1].GetValue().value, 333);
+
+ CheckEvents(cache, lsnr);
+}
+
+BOOST_AUTO_TEST_CASE(TestInitialQueryText)
+{
+ Listener<int, TestEntry> lsnr;
+
+ ContinuousQuery<int, TestEntry> qry(MakeReference(lsnr));
+
+ cache.Put(11, TestEntry(111));
+ cache.Put(22, TestEntry(222));
+ cache.Put(33, TestEntry(333));
+
+ ContinuousQueryHandle<int, TestEntry> handle = cache.QueryContinuous(qry, TextQuery("TestEntry", "222"));
+
+ std::vector< CacheEntry<int, TestEntry> > vals;
+
+ handle.GetInitialQueryCursor().GetAll(vals);
+
+ BOOST_CHECK_THROW(handle.GetInitialQueryCursor(), IgniteError);
+
+ BOOST_REQUIRE_EQUAL(vals.size(), 1);
+
+ BOOST_CHECK_EQUAL(vals[0].GetKey(), 22);
+
+ BOOST_CHECK_EQUAL(vals[0].GetValue().value, 222);
+
+ CheckEvents(cache, lsnr);
+}
+
+BOOST_AUTO_TEST_CASE(TestBasicNoExcept)
+{
+ Listener<int, TestEntry> lsnr;
+
+ ContinuousQuery<int, TestEntry> qry(MakeReference(lsnr));
+
+ IgniteError err;
+
+ ContinuousQueryHandle<int, TestEntry> handle = cache.QueryContinuous(qry, err);
+
+ BOOST_REQUIRE(err.GetCode() == IgniteError::IGNITE_SUCCESS);
+
+ CheckEvents(cache, lsnr);
+}
+
+BOOST_AUTO_TEST_CASE(TestInitialQueryScanNoExcept)
+{
+ Listener<int, TestEntry> lsnr;
+
+ ContinuousQuery<int, TestEntry> qry(MakeReference(lsnr));
+
+ cache.Put(11, TestEntry(111));
+ cache.Put(22, TestEntry(222));
+ cache.Put(33, TestEntry(333));
+
+ IgniteError err;
+
+ ContinuousQueryHandle<int, TestEntry> handle = cache.QueryContinuous(qry, ScanQuery(), err);
+
+ BOOST_REQUIRE(err.GetCode() == IgniteError::IGNITE_SUCCESS);
+
+ std::vector< CacheEntry<int, TestEntry> > vals;
+
+ handle.GetInitialQueryCursor().GetAll(vals);
+
+ BOOST_CHECK_THROW(handle.GetInitialQueryCursor(), IgniteError);
+
+ BOOST_REQUIRE_EQUAL(vals.size(), 3);
+
+ BOOST_CHECK_EQUAL(vals[0].GetKey(), 11);
+ BOOST_CHECK_EQUAL(vals[1].GetKey(), 22);
+ BOOST_CHECK_EQUAL(vals[2].GetKey(), 33);
+
+ BOOST_CHECK_EQUAL(vals[0].GetValue().value, 111);
+ BOOST_CHECK_EQUAL(vals[1].GetValue().value, 222);
+ BOOST_CHECK_EQUAL(vals[2].GetValue().value, 333);
+
+ CheckEvents(cache, lsnr);
+}
+
+BOOST_AUTO_TEST_CASE(TestInitialQuerySqlNoExcept)
+{
+ Listener<int, TestEntry> lsnr;
+
+ ContinuousQuery<int, TestEntry> qry(MakeReference(lsnr));
+
+ cache.Put(11, TestEntry(111));
+ cache.Put(22, TestEntry(222));
+ cache.Put(33, TestEntry(333));
+
+ IgniteError err;
+
+ ContinuousQueryHandle<int, TestEntry> handle = cache.QueryContinuous(qry, SqlQuery("TestEntry", "value > 200"), err);
+
+ BOOST_REQUIRE(err.GetCode() == IgniteError::IGNITE_SUCCESS);
+
+ std::vector< CacheEntry<int, TestEntry> > vals;
+
+ handle.GetInitialQueryCursor().GetAll(vals);
+
+ BOOST_CHECK_THROW(handle.GetInitialQueryCursor(), IgniteError);
+
+ BOOST_REQUIRE_EQUAL(vals.size(), 2);
+
+ BOOST_CHECK_EQUAL(vals[0].GetKey(), 22);
+ BOOST_CHECK_EQUAL(vals[1].GetKey(), 33);
+
+ BOOST_CHECK_EQUAL(vals[0].GetValue().value, 222);
+ BOOST_CHECK_EQUAL(vals[1].GetValue().value, 333);
+
+ CheckEvents(cache, lsnr);
+}
+
+BOOST_AUTO_TEST_CASE(TestInitialQueryTextNoExcept)
+{
+ Listener<int, TestEntry> lsnr;
+
+ ContinuousQuery<int, TestEntry> qry(MakeReference(lsnr));
+
+ cache.Put(11, TestEntry(111));
+ cache.Put(22, TestEntry(222));
+ cache.Put(33, TestEntry(333));
+
+ IgniteError err;
+
+ ContinuousQueryHandle<int, TestEntry> handle = cache.QueryContinuous(qry, TextQuery("TestEntry", "222"), err);
+
+ BOOST_REQUIRE(err.GetCode() == IgniteError::IGNITE_SUCCESS);
+
+ std::vector< CacheEntry<int, TestEntry> > vals;
+
+ handle.GetInitialQueryCursor().GetAll(vals);
+
+ BOOST_CHECK_THROW(handle.GetInitialQueryCursor(), IgniteError);
+
+ BOOST_REQUIRE_EQUAL(vals.size(), 1);
+
+ BOOST_CHECK_EQUAL(vals[0].GetKey(), 22);
+
+ BOOST_CHECK_EQUAL(vals[0].GetValue().value, 222);
+
+ CheckEvents(cache, lsnr);
+}
+
+BOOST_AUTO_TEST_CASE(TestExpiredQuery)
+{
+ Listener<int, TestEntry> lsnr;
+ ContinuousQueryHandle<int, TestEntry> handle;
+
+ {
+ // Query scope.
+ ContinuousQuery<int, TestEntry> qry(MakeReference(lsnr));
+
+ handle = cache.QueryContinuous(qry);
+ }
+
+ // Query is destroyed here.
+
+ CheckEvents(cache, lsnr);
+}
+
+BOOST_AUTO_TEST_CASE(TestSetGetLocal)
+{
+ Listener<int, TestEntry> lsnr;
+
+ ContinuousQuery<int, TestEntry> qry(MakeReference(lsnr));
+
+ BOOST_CHECK(!qry.GetLocal());
+
+ qry.SetLocal(true);
+
+ BOOST_CHECK(qry.GetLocal());
+
+ ContinuousQueryHandle<int, TestEntry> handle = cache.QueryContinuous(qry);
+
+ BOOST_CHECK(qry.GetLocal());
+
+ CheckEvents(cache, lsnr);
+}
+
+BOOST_AUTO_TEST_CASE(TestGetSetBufferSize)
+{
+ typedef ContinuousQuery<int, TestEntry> QueryType;
+ Listener<int, TestEntry> lsnr;
+
+ ContinuousQuery<int, TestEntry> qry(MakeReference(lsnr));
+
+ BOOST_CHECK_EQUAL(qry.GetBufferSize(), QueryType::DEFAULT_BUFFER_SIZE);
+
+ qry.SetBufferSize(2 * QueryType::DEFAULT_BUFFER_SIZE);
+
+ BOOST_CHECK_EQUAL(qry.GetBufferSize(), 2 * QueryType::DEFAULT_BUFFER_SIZE);
+
+ ContinuousQueryHandle<int, TestEntry> handle = cache.QueryContinuous(qry);
+
+ BOOST_CHECK_EQUAL(qry.GetBufferSize(), 2 * QueryType::DEFAULT_BUFFER_SIZE);
+
+ CheckEvents(cache, lsnr);
+}
+
+BOOST_AUTO_TEST_CASE(TestGetSetTimeInterval)
+{
+ typedef ContinuousQuery<int, TestEntry> QueryType;
+ Listener<int, TestEntry> lsnr;
+
+ ContinuousQuery<int, TestEntry> qry(MakeReference(lsnr));
+
+ qry.SetBufferSize(10);
+
+ BOOST_CHECK_EQUAL(qry.GetTimeInterval(), static_cast<int>(QueryType::DEFAULT_TIME_INTERVAL));
+
+ qry.SetTimeInterval(500);
+
+ BOOST_CHECK_EQUAL(qry.GetTimeInterval(), 500);
+
+ ContinuousQueryHandle<int, TestEntry> handle = cache.QueryContinuous(qry);
+
+ BOOST_CHECK_EQUAL(qry.GetTimeInterval(), 500);
+
+ CheckEvents(cache, lsnr);
+}
+
+BOOST_AUTO_TEST_CASE(TestPublicPrivateConstantsConsistence)
+{
+ typedef ContinuousQuery<int, TestEntry> QueryType;
+ typedef impl::cache::query::continuous::ContinuousQueryImpl<int, TestEntry> QueryImplType;
+
+ BOOST_CHECK_EQUAL(static_cast<int>(QueryImplType::DEFAULT_TIME_INTERVAL),
+ static_cast<int>(QueryType::DEFAULT_TIME_INTERVAL));
+
+ BOOST_CHECK_EQUAL(static_cast<int>(QueryImplType::DEFAULT_BUFFER_SIZE),
+ static_cast<int>(QueryType::DEFAULT_BUFFER_SIZE));
+}
+
+BOOST_AUTO_TEST_SUITE_END()
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core-test/src/handle_registry_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/src/handle_registry_test.cpp b/modules/platforms/cpp/core-test/src/handle_registry_test.cpp
index bc4a654..0956d9b 100644
--- a/modules/platforms/cpp/core-test/src/handle_registry_test.cpp
+++ b/modules/platforms/cpp/core-test/src/handle_registry_test.cpp
@@ -36,7 +36,7 @@ struct HandleRegistryTestProbe
}
};
-class HandleRegistryTestEntry : public HandleRegistryEntry
+class HandleRegistryTestEntry
{
public:
HandleRegistryTestEntry(HandleRegistryTestProbe* probe) : probe(probe)
@@ -67,9 +67,9 @@ BOOST_AUTO_TEST_CASE(TestCritical)
HandleRegistryTestEntry* entry1 = new HandleRegistryTestEntry(&probe1);
HandleRegistryTestEntry* entry2 = new HandleRegistryTestEntry(&probe2);
- int64_t hnd0 = reg.AllocateCritical(SharedPointer<HandleRegistryEntry>(entry0));
- int64_t hnd1 = reg.AllocateCritical(SharedPointer<HandleRegistryEntry>(entry1));
- int64_t hnd2 = reg.AllocateCritical(SharedPointer<HandleRegistryEntry>(entry2));
+ int64_t hnd0 = reg.AllocateCritical(SharedPointer<HandleRegistryTestEntry>(entry0));
+ int64_t hnd1 = reg.AllocateCritical(SharedPointer<HandleRegistryTestEntry>(entry1));
+ int64_t hnd2 = reg.AllocateCritical(SharedPointer<HandleRegistryTestEntry>(entry2));
BOOST_REQUIRE(reg.Get(hnd0).Get() == entry0);
BOOST_REQUIRE(!probe0.deleted);
@@ -109,7 +109,7 @@ BOOST_AUTO_TEST_CASE(TestCritical)
HandleRegistryTestProbe closedProbe;
HandleRegistryTestEntry* closedEntry = new HandleRegistryTestEntry(&closedProbe);
- int64_t closedHnd = closedReg.AllocateCritical(SharedPointer<HandleRegistryEntry>(closedEntry));
+ int64_t closedHnd = closedReg.AllocateCritical(SharedPointer<HandleRegistryTestEntry>(closedEntry));
BOOST_REQUIRE(closedHnd == -1);
BOOST_REQUIRE(closedProbe.deleted);
}
@@ -126,9 +126,9 @@ BOOST_AUTO_TEST_CASE(TestNonCritical)
HandleRegistryTestEntry* entry1 = new HandleRegistryTestEntry(&probe1);
HandleRegistryTestEntry* entry2 = new HandleRegistryTestEntry(&probe2);
- int64_t hnd0 = reg.AllocateCritical(SharedPointer<HandleRegistryEntry>(entry0));
- int64_t hnd1 = reg.Allocate(SharedPointer<HandleRegistryEntry>(entry1));
- int64_t hnd2 = reg.Allocate(SharedPointer<HandleRegistryEntry>(entry2));
+ int64_t hnd0 = reg.AllocateCritical(SharedPointer<HandleRegistryTestEntry>(entry0));
+ int64_t hnd1 = reg.Allocate(SharedPointer<HandleRegistryTestEntry>(entry1));
+ int64_t hnd2 = reg.Allocate(SharedPointer<HandleRegistryTestEntry>(entry2));
BOOST_REQUIRE(reg.Get(hnd0).Get() == entry0);
BOOST_REQUIRE(!probe0.deleted);
@@ -168,7 +168,7 @@ BOOST_AUTO_TEST_CASE(TestNonCritical)
HandleRegistryTestProbe closedProbe;
HandleRegistryTestEntry* closedEntry = new HandleRegistryTestEntry(&closedProbe);
- int64_t closedHnd = closedReg.Allocate(SharedPointer<HandleRegistryEntry>(closedEntry));
+ int64_t closedHnd = closedReg.Allocate(SharedPointer<HandleRegistryTestEntry>(closedEntry));
BOOST_REQUIRE(closedHnd == -1);
BOOST_REQUIRE(closedProbe.deleted);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core-test/src/reference_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/src/reference_test.cpp b/modules/platforms/cpp/core-test/src/reference_test.cpp
new file mode 100644
index 0000000..f5c3e8d
--- /dev/null
+++ b/modules/platforms/cpp/core-test/src/reference_test.cpp
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _MSC_VER
+ #define BOOST_TEST_DYN_LINK
+#endif
+
+#include <memory>
+
+#include <boost/test/unit_test.hpp>
+#include <boost/smart_ptr.hpp>
+#include <boost/interprocess/smart_ptr/unique_ptr.hpp>
+
+#include <ignite/reference.h>
+
+using namespace ignite;
+using namespace boost::unit_test;
+
+class LivenessMarker
+{
+public:
+ LivenessMarker(bool& flag) :
+ flag(flag)
+ {
+ flag = true;
+ }
+
+ LivenessMarker(const LivenessMarker& other) :
+ flag(other.flag)
+ {
+ // No-op.
+ }
+
+ LivenessMarker& operator=(const LivenessMarker& other)
+ {
+ flag = other.flag;
+
+ return *this;
+ }
+
+ ~LivenessMarker()
+ {
+ flag = false;
+ }
+
+private:
+ bool& flag;
+};
+
+class InstanceCounter
+{
+public:
+ InstanceCounter(int& counter) :
+ counter(&counter)
+ {
+ ++(*this->counter);
+ }
+
+ InstanceCounter(const InstanceCounter& other) :
+ counter(other.counter)
+ {
+ ++(*counter);
+ }
+
+ InstanceCounter& operator=(const InstanceCounter& other)
+ {
+ counter = other.counter;
+
+ ++(*counter);
+
+ return *this;
+ }
+
+ ~InstanceCounter()
+ {
+ --(*counter);
+ }
+
+private:
+ int* counter;
+};
+
+
+void TestFunction(Reference<LivenessMarker> ptr)
+{
+ Reference<LivenessMarker> copy(ptr);
+ Reference<LivenessMarker> copy2(ptr);
+}
+
+struct C1
+{
+ int c1;
+};
+
+struct C2
+{
+ int c2;
+};
+
+struct C3 : C1, C2
+{
+ int c3;
+};
+
+void TestFunction1(Reference<C1> c1, int expected)
+{
+ BOOST_CHECK_EQUAL(c1.Get().c1, expected);
+}
+
+void TestFunction2(Reference<C2> c2, int expected)
+{
+ BOOST_CHECK_EQUAL(c2.Get().c2, expected);
+}
+
+void TestFunction3(Reference<C3> c3, int expected)
+{
+ BOOST_CHECK_EQUAL(c3.Get().c3, expected);
+}
+
+void TestFunctionConst1(ConstReference<C1> c1, int expected)
+{
+ BOOST_CHECK_EQUAL(c1.Get().c1, expected);
+}
+
+void TestFunctionConst2(ConstReference<C2> c2, int expected)
+{
+ BOOST_CHECK_EQUAL(c2.Get().c2, expected);
+}
+
+void TestFunctionConst3(ConstReference<C3> c3, int expected)
+{
+ BOOST_CHECK_EQUAL(c3.Get().c3, expected);
+}
+
+BOOST_AUTO_TEST_SUITE(ReferenceTestSuite)
+
+BOOST_AUTO_TEST_CASE(StdSharedPointerTestBefore)
+{
+ bool objAlive = false;
+
+ std::shared_ptr<LivenessMarker> shared = std::make_shared<LivenessMarker>(objAlive);
+
+ BOOST_CHECK(objAlive);
+
+ {
+ Reference<LivenessMarker> smart = MakeReferenceFromSmartPointer(shared);
+
+ BOOST_CHECK(objAlive);
+
+ shared.reset();
+
+ BOOST_CHECK(objAlive);
+ }
+
+ BOOST_CHECK(!objAlive);
+}
+
+BOOST_AUTO_TEST_CASE(StdSharedPointerTestAfter)
+{
+ bool objAlive = false;
+
+ std::shared_ptr<LivenessMarker> shared = std::make_shared<LivenessMarker>(objAlive);
+
+ BOOST_CHECK(objAlive);
+
+ {
+ Reference<LivenessMarker> smart = MakeReferenceFromSmartPointer(shared);
+
+ BOOST_CHECK(objAlive);
+ }
+
+ BOOST_CHECK(objAlive);
+
+ shared.reset();
+
+ BOOST_CHECK(!objAlive);
+}
+
+BOOST_AUTO_TEST_CASE(StdAutoPointerTest)
+{
+ bool objAlive = false;
+
+ std::auto_ptr<LivenessMarker> autop(new LivenessMarker(objAlive));
+
+ BOOST_CHECK(objAlive);
+
+ {
+ Reference<LivenessMarker> smart = MakeReferenceFromSmartPointer(autop);
+
+ BOOST_CHECK(objAlive);
+ }
+
+ BOOST_CHECK(!objAlive);
+}
+
+BOOST_AUTO_TEST_CASE(StdUniquePointerTest)
+{
+ bool objAlive = false;
+
+ std::unique_ptr<LivenessMarker> unique(new LivenessMarker(objAlive));
+
+ BOOST_CHECK(objAlive);
+
+ {
+ Reference<LivenessMarker> smart = MakeReferenceFromSmartPointer(std::move(unique));
+
+ BOOST_CHECK(objAlive);
+ }
+
+ BOOST_CHECK(!objAlive);
+}
+
+BOOST_AUTO_TEST_CASE(BoostSharedPointerTestBefore)
+{
+ bool objAlive = false;
+
+ boost::shared_ptr<LivenessMarker> shared = boost::make_shared<LivenessMarker>(objAlive);
+
+ BOOST_CHECK(objAlive);
+
+ {
+ Reference<LivenessMarker> smart = MakeReferenceFromSmartPointer(shared);
+
+ BOOST_CHECK(objAlive);
+
+ shared.reset();
+
+ BOOST_CHECK(objAlive);
+ }
+
+ BOOST_CHECK(!objAlive);
+}
+
+BOOST_AUTO_TEST_CASE(BoostSharedPointerTestAfter)
+{
+ bool objAlive = false;
+
+ boost::shared_ptr<LivenessMarker> shared = boost::make_shared<LivenessMarker>(objAlive);
+
+ BOOST_CHECK(objAlive);
+
+ {
+ Reference<LivenessMarker> smart = MakeReferenceFromSmartPointer(shared);
+
+ BOOST_CHECK(objAlive);
+ }
+
+ BOOST_CHECK(objAlive);
+
+ shared.reset();
+
+ BOOST_CHECK(!objAlive);
+}
+
+BOOST_AUTO_TEST_CASE(PassingToFunction)
+{
+ bool objAlive = false;
+
+ std::shared_ptr<LivenessMarker> stdShared = std::make_shared<LivenessMarker>(objAlive);
+ std::unique_ptr<LivenessMarker> stdUnique(new LivenessMarker(objAlive));
+ std::auto_ptr<LivenessMarker> stdAuto(new LivenessMarker(objAlive));
+
+ boost::shared_ptr<LivenessMarker> boostShared = boost::make_shared<LivenessMarker>(objAlive);
+
+ TestFunction(MakeReferenceFromSmartPointer(stdShared));
+ TestFunction(MakeReferenceFromSmartPointer(std::move(stdUnique)));
+ TestFunction(MakeReferenceFromSmartPointer(stdAuto));
+
+ TestFunction(MakeReferenceFromSmartPointer(boostShared));
+}
+
+BOOST_AUTO_TEST_CASE(CopyTest)
+{
+ int instances = 0;
+
+ {
+ InstanceCounter counter(instances);
+
+ BOOST_CHECK_EQUAL(instances, 1);
+
+ {
+ Reference<InstanceCounter> copy = MakeReferenceFromCopy(counter);
+
+ BOOST_CHECK_EQUAL(instances, 2);
+ }
+
+ BOOST_CHECK_EQUAL(instances, 1);
+ }
+
+ BOOST_CHECK_EQUAL(instances, 0);
+}
+
+BOOST_AUTO_TEST_CASE(OwningPointerTest)
+{
+ int instances = 0;
+
+ {
+ InstanceCounter *counter = new InstanceCounter(instances);
+
+ BOOST_CHECK_EQUAL(instances, 1);
+
+ {
+ Reference<InstanceCounter> owned = MakeReferenceFromOwningPointer(counter);
+
+ BOOST_CHECK_EQUAL(instances, 1);
+ }
+
+ BOOST_CHECK_EQUAL(instances, 0);
+ }
+
+ BOOST_CHECK_EQUAL(instances, 0);
+}
+
+BOOST_AUTO_TEST_CASE(NonOwningPointerTest1)
+{
+ int instances = 0;
+
+ {
+ InstanceCounter counter(instances);
+
+ BOOST_CHECK_EQUAL(instances, 1);
+
+ {
+ Reference<InstanceCounter> copy = MakeReference(counter);
+
+ BOOST_CHECK_EQUAL(instances, 1);
+ }
+
+ BOOST_CHECK_EQUAL(instances, 1);
+ }
+
+ BOOST_CHECK_EQUAL(instances, 0);
+}
+
+BOOST_AUTO_TEST_CASE(NonOwningPointerTest2)
+{
+ int instances = 0;
+
+ InstanceCounter* counter = new InstanceCounter(instances);
+
+ BOOST_CHECK_EQUAL(instances, 1);
+
+ {
+ Reference<InstanceCounter> copy = MakeReference(*counter);
+
+ BOOST_CHECK_EQUAL(instances, 1);
+
+ delete counter;
+
+ BOOST_CHECK_EQUAL(instances, 0);
+ }
+
+ BOOST_CHECK_EQUAL(instances, 0);
+}
+
+BOOST_AUTO_TEST_CASE(CastTest)
+{
+ C3 testVal;
+
+ testVal.c1 = 1;
+ testVal.c2 = 2;
+ testVal.c3 = 3;
+
+ TestFunction1(MakeReference(testVal), 1);
+ TestFunction2(MakeReference(testVal), 2);
+ TestFunction3(MakeReference(testVal), 3);
+
+ TestFunction1(MakeReferenceFromCopy(testVal), 1);
+ TestFunction2(MakeReferenceFromCopy(testVal), 2);
+ TestFunction3(MakeReferenceFromCopy(testVal), 3);
+}
+
+BOOST_AUTO_TEST_CASE(ConstTest)
+{
+ C3 testVal;
+
+ testVal.c1 = 1;
+ testVal.c2 = 2;
+ testVal.c3 = 3;
+
+ TestFunctionConst1(MakeConstReference(testVal), 1);
+ TestFunctionConst2(MakeConstReference(testVal), 2);
+ TestFunctionConst3(MakeConstReference(testVal), 3);
+
+ TestFunctionConst1(MakeConstReferenceFromCopy(testVal), 1);
+ TestFunctionConst2(MakeConstReferenceFromCopy(testVal), 2);
+ TestFunctionConst3(MakeConstReferenceFromCopy(testVal), 3);
+
+ TestFunctionConst1(MakeReference(testVal), 1);
+ TestFunctionConst2(MakeReference(testVal), 2);
+ TestFunctionConst3(MakeReference(testVal), 3);
+
+ TestFunctionConst1(MakeReferenceFromCopy(testVal), 1);
+ TestFunctionConst2(MakeReferenceFromCopy(testVal), 2);
+ TestFunctionConst3(MakeReferenceFromCopy(testVal), 3);
+}
+
+BOOST_AUTO_TEST_SUITE_END()
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/Makefile.am
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/Makefile.am b/modules/platforms/cpp/core/Makefile.am
index 97523cf..758e68d 100644
--- a/modules/platforms/cpp/core/Makefile.am
+++ b/modules/platforms/cpp/core/Makefile.am
@@ -59,6 +59,7 @@ libignite_la_SOURCES = \
src/impl/ignite_environment.cpp \
src/impl/binary/binary_type_updater_impl.cpp \
src/impl/handle_registry.cpp \
+ src/impl/cache/query/continuous/continuous_query_handle_impl.cpp \
src/impl/cache/query/query_impl.cpp \
src/impl/cache/cache_impl.cpp \
src/impl/cache/query/query_batch.cpp \
[2/3] ignite git commit: IGNITE-1443: Implemented ContinuousQuery for
C++
Posted by pt...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/cache/cache.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/cache/cache.h b/modules/platforms/cpp/core/include/ignite/cache/cache.h
index a975be3..54c0f96 100644
--- a/modules/platforms/cpp/core/include/ignite/cache/cache.h
+++ b/modules/platforms/cpp/core/include/ignite/cache/cache.h
@@ -37,6 +37,8 @@
#include "ignite/cache/query/query_sql.h"
#include "ignite/cache/query/query_text.h"
#include "ignite/cache/query/query_sql_fields.h"
+#include "ignite/cache/query/continuous/continuous_query_handle.h"
+#include "ignite/cache/query/continuous/continuous_query.h"
#include "ignite/impl/cache/cache_impl.h"
#include "ignite/impl/operations.h"
@@ -1339,6 +1341,106 @@ namespace ignite
}
/**
+ * Start continuous query execution.
+ *
+ * @param qry Continuous query.
+ * @return Continuous query handle.
+ */
+ query::continuous::ContinuousQueryHandle<K, V> QueryContinuous(
+ const query::continuous::ContinuousQuery<K, V>& qry)
+ {
+ IgniteError err;
+
+ query::continuous::ContinuousQueryHandle<K, V> res = QueryContinuous(qry, err);
+
+ IgniteError::ThrowIfNeeded(err);
+
+ return res;
+ }
+
+ /**
+ * Start continuous query execution.
+ *
+ * @param qry Continuous query.
+ * @param err Error.
+ * @return Continuous query handle.
+ */
+ query::continuous::ContinuousQueryHandle<K, V> QueryContinuous(
+ const query::continuous::ContinuousQuery<K, V>& qry, IgniteError& err)
+ {
+ using namespace impl::cache::query::continuous;
+
+ if (!qry.impl.IsValid() || !qry.impl.Get()->HasListener())
+ {
+ err = IgniteError(IgniteError::IGNITE_ERR_GENERIC,
+ "Event listener is not set for ContinuousQuery instance");
+
+ return query::continuous::ContinuousQueryHandle<K, V>();
+ }
+
+ ContinuousQueryHandleImpl* cqImpl;
+ cqImpl = impl.Get()->QueryContinuous(qry.impl, err);
+
+ if (cqImpl)
+ cqImpl->SetQuery(qry.impl);
+
+ return query::continuous::ContinuousQueryHandle<K, V>(cqImpl);
+ }
+
+ /**
+ * Start continuous query execution with the initial query.
+ *
+ * @param qry Continuous query.
+ * @param initialQry Initial query to be executed.
+ * @return Continuous query handle.
+ */
+ template<typename Q>
+ query::continuous::ContinuousQueryHandle<K, V> QueryContinuous(
+ const query::continuous::ContinuousQuery<K, V>& qry,
+ const Q& initialQry)
+ {
+ IgniteError err;
+
+ query::continuous::ContinuousQueryHandle<K, V> res = QueryContinuous(qry, initialQry, err);
+
+ IgniteError::ThrowIfNeeded(err);
+
+ return res;
+ }
+
+ /**
+ * Start continuous query execution with the initial query.
+ *
+ * @param qry Continuous query.
+ * @param initialQry Initial query to be executed.
+ * @param err Error.
+ * @return Continuous query handle.
+ */
+ template<typename Q>
+ query::continuous::ContinuousQueryHandle<K, V> QueryContinuous(
+ const query::continuous::ContinuousQuery<K, V>& qry,
+ const Q& initialQry, IgniteError& err)
+ {
+ using namespace impl::cache::query::continuous;
+
+ if (!qry.impl.IsValid() || !qry.impl.Get()->HasListener())
+ {
+ err = IgniteError(IgniteError::IGNITE_ERR_GENERIC,
+ "Event listener is not set for ContinuousQuery instance");
+
+ return query::continuous::ContinuousQueryHandle<K, V>();
+ }
+
+ ContinuousQueryHandleImpl* cqImpl;
+ cqImpl = impl.Get()->QueryContinuous(qry.impl, initialQry, err);
+
+ if (cqImpl)
+ cqImpl->SetQuery(qry.impl);
+
+ return query::continuous::ContinuousQueryHandle<K, V>(cqImpl);
+ }
+
+ /**
* Check if the instance is valid.
*
* Invalid instance can be returned if some of the previous
@@ -1356,7 +1458,7 @@ namespace ignite
private:
/** Implementation delegate. */
- ignite::common::concurrent::SharedPointer<impl::cache::CacheImpl> impl;
+ common::concurrent::SharedPointer<impl::cache::CacheImpl> impl;
};
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/cache/cache_entry.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/cache/cache_entry.h b/modules/platforms/cpp/core/include/ignite/cache/cache_entry.h
index c737940..aea5182 100644
--- a/modules/platforms/cpp/core/include/ignite/cache/cache_entry.h
+++ b/modules/platforms/cpp/core/include/ignite/cache/cache_entry.h
@@ -45,7 +45,9 @@ namespace ignite
* Creates instance with both key and value default-constructed.
*/
CacheEntry() :
- key(), val()
+ key(),
+ val(),
+ hasValue(false)
{
// No-op.
}
@@ -57,7 +59,9 @@ namespace ignite
* @param val Value.
*/
CacheEntry(const K& key, const V& val) :
- key(key), val(val)
+ key(key),
+ val(val),
+ hasValue(true)
{
// No-op.
}
@@ -68,7 +72,17 @@ namespace ignite
* @param other Other instance.
*/
CacheEntry(const CacheEntry& other) :
- key(other.key), val(other.val)
+ key(other.key),
+ val(other.val),
+ hasValue(other.hasValue)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~CacheEntry()
{
// No-op.
}
@@ -84,6 +98,7 @@ namespace ignite
{
key = other.key;
val = other.val;
+ hasValue = other.hasValue;
}
return *this;
@@ -94,7 +109,7 @@ namespace ignite
*
* @return Key.
*/
- K GetKey() const
+ const K& GetKey() const
{
return key;
}
@@ -104,17 +119,30 @@ namespace ignite
*
* @return Value.
*/
- V GetValue() const
+ const V& GetValue() const
{
return val;
}
- private:
+ /**
+ * Check if the value exists.
+ *
+ * @return True, if the value exists.
+ */
+ bool HasValue() const
+ {
+ return hasValue;
+ }
+
+ protected:
/** Key. */
K key;
/** Value. */
V val;
+
+ /** Indicates whether value exists */
+ bool hasValue;
};
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event.h b/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event.h
new file mode 100644
index 0000000..14fa185
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event.h
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::cache::event::CacheEntryEvent class.
+ */
+
+#ifndef _IGNITE_CACHE_EVENT_CACHE_ENTRY_EVENT
+#define _IGNITE_CACHE_EVENT_CACHE_ENTRY_EVENT
+
+#include <ignite/binary/binary_raw_reader.h>
+#include <ignite/cache/cache_entry.h>
+
+namespace ignite
+{
+ namespace cache
+ {
+ /**
+ * Cache entry event class template.
+ *
+ * Both key and value types should be default-constructable,
+ * copy-constructable and assignable.
+ */
+ template<typename K, typename V>
+ class CacheEntryEvent : public CacheEntry<K, V>
+ {
+ public:
+ /**
+ * Default constructor.
+ *
+ * Creates instance with all fields default-constructed.
+ */
+ CacheEntryEvent() :
+ CacheEntry<K, V>(),
+ oldVal(),
+ hasOldValue(false)
+ {
+ // No-op.
+ }
+
+ /**
+ * Copy constructor.
+ *
+ * @param other Other instance.
+ */
+ CacheEntryEvent(const CacheEntryEvent<K, V>& other) :
+ CacheEntry<K, V>(other),
+ oldVal(other.oldVal),
+ hasOldValue(other.hasOldValue)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~CacheEntryEvent()
+ {
+ // No-op.
+ }
+
+ /**
+ * Assignment operator.
+ *
+ * @param other Other instance.
+ * @return *this.
+ */
+ CacheEntryEvent& operator=(const CacheEntryEvent<K, V>& other)
+ {
+ if (this != &other)
+ {
+ CacheEntry<K, V>::operator=(other);
+
+ oldVal = other.oldVal;
+ hasOldValue = other.hasOldValue;
+ }
+
+ return *this;
+ }
+
+ /**
+ * Get old value.
+ *
+ * @return Old value.
+ */
+ const V& GetOldValue() const
+ {
+ return oldVal;
+ }
+
+ /**
+ * Check if the old value exists.
+ *
+ * @return True, if the old value exists.
+ */
+ bool HasOldValue() const
+ {
+ return hasOldValue;
+ }
+
+ /**
+ * Reads cache event using provided raw reader.
+ *
+ * @param reader Reader to use.
+ */
+ void Read(binary::BinaryRawReader& reader)
+ {
+ this->key = reader.ReadObject<K>();
+
+ this->hasOldValue = reader.TryReadObject(this->oldVal);
+ this->hasValue = reader.TryReadObject(this->val);
+ }
+
+ private:
+ /** Old value. */
+ V oldVal;
+
+ /** Indicates whether old value exists */
+ bool hasOldValue;
+ };
+ }
+}
+
+#endif //_IGNITE_CACHE_EVENT_CACHE_ENTRY_EVENT
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event_listener.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event_listener.h b/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event_listener.h
new file mode 100644
index 0000000..dd8f4a2
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event_listener.h
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::cache::event::CacheEntryEventListener class.
+ */
+
+#ifndef _IGNITE_CACHE_EVENT_CACHE_ENTRY_EVENT_LISTENER
+#define _IGNITE_CACHE_EVENT_CACHE_ENTRY_EVENT_LISTENER
+
+#include <stdint.h>
+
+#include <ignite/cache/event/cache_entry_event.h>
+
+namespace ignite
+{
+ namespace cache
+ {
+ namespace event
+ {
+ /**
+ * Cache entry event listener.
+ */
+ template<typename K, typename V>
+ class CacheEntryEventListener
+ {
+ public:
+ /**
+ * Default constructor.
+ */
+ CacheEntryEventListener()
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~CacheEntryEventListener()
+ {
+ // No-op.
+ }
+
+ /**
+ * Event callback.
+ *
+ * @param evts Events.
+ * @param num Events number.
+ */
+ virtual void OnEvent(const CacheEntryEvent<K, V>* evts, uint32_t num) = 0;
+ };
+ }
+ }
+}
+
+#endif //_IGNITE_CACHE_EVENT_CACHE_ENTRY_EVENT_LISTENER
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query.h b/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query.h
new file mode 100644
index 0000000..563b11a
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query.h
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::cache::query::continuous::ContinuousQuery class.
+ */
+
+#ifndef _IGNITE_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY
+#define _IGNITE_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY
+
+#include <ignite/impl/cache/query/continuous/continuous_query_impl.h>
+#include <ignite/cache/event/cache_entry_event_listener.h>
+
+namespace ignite
+{
+ namespace cache
+ {
+ // Forward-declaration.
+ template<typename K, typename V>
+ class IGNITE_IMPORT_EXPORT Cache;
+
+ namespace query
+ {
+ namespace continuous
+ {
+ /**
+ * Continuous query.
+ *
+ * Continuous queries allow to register a remote and a listener
+ * for cache update events. On any update to the related cache
+ * an event is sent to the node that has executed the query and
+ * listener is notified on that node.
+ *
+ * Continuous query can either be executed on the whole topology
+ * or only on local node.
+ *
+ * To execute the query over the cache use method
+ * ignite::cache::Cache::QueryContinuous().
+ */
+ template<typename K, typename V>
+ class ContinuousQuery
+ {
+ friend class Cache<K, V>;
+ public:
+
+ /**
+ * Default value for the buffer size.
+ */
+ enum { DEFAULT_BUFFER_SIZE = 1 };
+
+ /**
+ * Default value for the time interval.
+ */
+ enum { DEFAULT_TIME_INTERVAL = 0 };
+
+ /**
+ * Destructor.
+ */
+ ~ContinuousQuery()
+ {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param lsnr Event listener. Invoked on the node where
+ * continuous query execution has been started.
+ */
+ ContinuousQuery(Reference<event::CacheEntryEventListener<K, V>> lsnr) :
+ impl(new impl::cache::query::continuous::ContinuousQueryImpl<K, V>(lsnr))
+ {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param lsnr Event listener Invoked on the node where
+ * continuous query execution has been started.
+ * @param loc Whether query should be executed locally.
+ */
+ ContinuousQuery(Reference<event::CacheEntryEventListener<K, V>> lsnr, bool loc) :
+ impl(new impl::cache::query::continuous::ContinuousQueryImpl<K, V>(lsnr, loc))
+ {
+ // No-op.
+ }
+
+ /**
+ * Set local flag.
+ *
+ * @param val Value of the flag. If true, query will be
+ * executed only on local node, so only local entries
+ * will be returned as query result.
+ */
+ void SetLocal(bool val)
+ {
+ impl.Get()->SetLocal(val);
+ }
+
+ /**
+ * Get local flag.
+ *
+ * @return Value of the flag. If true, query will be
+ * executed only on local node, so only local entries
+ * will be returned as query result.
+ */
+ bool GetLocal() const
+ {
+ return impl.Get()->GetLocal();
+ }
+
+ /**
+ * Set buffer size.
+ *
+ * When a cache update happens, entry is first
+ * put into a buffer. Entries from buffer will be sent to
+ * the master node only if the buffer is full or time
+ * provided via timeInterval is exceeded.
+ *
+ * @param val Buffer size.
+ */
+ void SetBufferSize(int32_t val)
+ {
+ impl.Get()->SetBufferSize(val);
+ }
+
+ /**
+ * Get buffer size.
+ *
+ * When a cache update happens, entry is first
+ * put into a buffer. Entries from buffer will be sent to
+ * the master node only if the buffer is full or time
+ * provided via timeInterval is exceeded.
+ *
+ * @return Buffer size.
+ */
+ int32_t GetBufferSize() const
+ {
+ return impl.Get()->GetBufferSize();
+ }
+
+ /**
+ * Set time interval.
+ *
+ * When a cache update happens, entry is first put into
+ * a buffer. Entries from buffer are sent to the master node
+ * only if the buffer is full (its size can be changed via
+ * SetBufferSize) or time provided via this method is
+ * exceeded.
+ *
+ * Default value is DEFAULT_TIME_INTERVAL, i.e. 0, which
+ * means that time check is disabled and entries will be
+ * sent only when buffer is full.
+ *
+ * @param val Time interval in miliseconds.
+ */
+ void SetTimeInterval(int64_t val)
+ {
+ impl.Get()->SetTimeInterval(val);
+ }
+
+ /**
+ * Get time interval.
+ *
+ * When a cache update happens, entry is first put into
+ * a buffer. Entries from buffer are sent to the master node
+ * only if the buffer is full (its size can be changed via
+ * SetBufferSize) or time provided via SetTimeInterval
+ * method is exceeded.
+ *
+ * Default value is DEFAULT_TIME_INTERVAL, i.e. 0, which
+ * means that time check is disabled and entries will be
+ * sent only when buffer is full.
+ *
+ * @return Time interval.
+ */
+ int64_t GetTimeInterval() const
+ {
+ return impl.Get()->GetTimeInterval();
+ }
+
+ /**
+ * Set cache entry event listener.
+ *
+ * @param val Cache entry event listener. Invoked on the
+ * node where continuous query execution has been
+ * started.
+ */
+ void SetListener(Reference<event::CacheEntryEventListener<K, V>> lsnr)
+ {
+ impl.Get()->SetListener(val);
+ }
+
+ /**
+ * Get cache entry event listener.
+ *
+ * @return Cache entry event listener.
+ */
+ const event::CacheEntryEventListener<K, V>& GetListener() const
+ {
+ return impl.Get()->GetListener();
+ }
+
+ /**
+ * Get cache entry event listener.
+ *
+ * @return Cache entry event listener.
+ */
+ event::CacheEntryEventListener<K, V>& GetListener()
+ {
+ return impl.Get()->GetListener();
+ }
+
+ private:
+ /** Implementation. */
+ common::concurrent::SharedPointer<impl::cache::query::continuous::ContinuousQueryImpl<K, V>> impl;
+ };
+ }
+ }
+ }
+}
+
+#endif //_IGNITE_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query_handle.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query_handle.h b/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query_handle.h
new file mode 100644
index 0000000..bbefbcc
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query_handle.h
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::cache::query::continuous::ContinuousQueryHandle class.
+ */
+
+#ifndef _IGNITE_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE
+#define _IGNITE_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE
+
+#include <ignite/impl/cache/query/continuous/continuous_query_handle_impl.h>
+
+namespace ignite
+{
+ namespace cache
+ {
+ namespace query
+ {
+ namespace continuous
+ {
+ /**
+ * Continuous query handle.
+ */
+ template<typename K, typename V>
+ class ContinuousQueryHandle
+ {
+ public:
+ /**
+ * Default constructor.
+ */
+ ContinuousQueryHandle() :
+ impl()
+ {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * Internal method. Should not be used by user.
+ *
+ * @param impl Implementation.
+ */
+ ContinuousQueryHandle(impl::cache::query::continuous::ContinuousQueryHandleImpl* impl) :
+ impl(impl)
+ {
+ // No-op.
+ }
+
+ /**
+ * Gets the cursor for initial query.
+ * Can be called only once, throws IgniteError on consequent
+ * calls.
+ *
+ * @return Initial query cursor.
+ */
+ QueryCursor<K, V> GetInitialQueryCursor()
+ {
+ IgniteError err;
+
+ QueryCursor<K, V> res = GetInitialQueryCursor(err);
+
+ IgniteError::ThrowIfNeeded(err);
+
+ return res;
+ }
+
+ /**
+ * Gets the cursor for initial query.
+ * Can be called only once, results in error on consequent
+ * calls.
+ *
+ * @param err Error.
+ * @return Initial query cursor.
+ */
+ QueryCursor<K, V> GetInitialQueryCursor(IgniteError& err)
+ {
+ impl::cache::query::continuous::ContinuousQueryHandleImpl* impl0 = impl.Get();
+
+ if (impl0)
+ return QueryCursor<K, V>(impl0->GetInitialQueryCursor(err));
+ else
+ {
+ err = IgniteError(IgniteError::IGNITE_ERR_GENERIC,
+ "Instance is not usable (did you check for error?).");
+
+ return QueryCursor<K, V>();
+ }
+ }
+
+ /**
+ * Check if the instance is valid.
+ *
+ * Invalid instance can be returned if some of the previous
+ * operations have resulted in a failure. For example invalid
+ * instance can be returned by not-throwing version of method
+ * in case of error. Invalid instances also often can be
+ * created using default constructor.
+ *
+ * @return True if the instance is valid and can be used.
+ */
+ bool IsValid() const
+ {
+ return impl.IsValid();
+ }
+
+ private:
+ typedef impl::cache::query::continuous::ContinuousQueryHandleImpl ContinuousQueryHandleImpl;
+
+ /** Implementation delegate. */
+ common::concurrent::SharedPointer<ContinuousQueryHandleImpl> impl;
+ };
+ }
+ }
+ }
+}
+
+#endif //_IGNITE_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h b/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h
index 3e0f177..535e3ec 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h
@@ -22,7 +22,10 @@
#include <ignite/cache/query/query_sql.h>
#include <ignite/cache/query/query_text.h>
#include <ignite/cache/query/query_sql_fields.h>
+#include <ignite/cache/query/continuous/continuous_query_handle.h>
#include <ignite/impl/cache/query/query_impl.h>
+#include <ignite/impl/cache/query/continuous/continuous_query_handle_impl.h>
+#include <ignite/impl/cache/query/continuous/continuous_query_impl.h>
#include <ignite/impl/interop/interop_target.h>
@@ -309,12 +312,59 @@ namespace ignite
* @return Query cursor.
*/
query::QueryCursorImpl* QuerySqlFields(const ignite::cache::query::SqlFieldsQuery& qry, IgniteError* err);
-
+
+ /**
+ * Start continuous query execution.
+ *
+ * @param qry Continuous query.
+ * @param err Error.
+ * @return Continuous query handle.
+ */
+ query::continuous::ContinuousQueryHandleImpl* QueryContinuous(
+ const common::concurrent::SharedPointer<query::continuous::ContinuousQueryImplBase> qry,
+ IgniteError& err);
+
+ /**
+ * Start continuous query execution with initial query.
+ *
+ * @param qry Continuous query.
+ * @param initialQry Initial query.
+ * @param err Error.
+ * @return Continuous query handle.
+ */
+ query::continuous::ContinuousQueryHandleImpl* QueryContinuous(
+ const common::concurrent::SharedPointer<query::continuous::ContinuousQueryImplBase> qry,
+ const ignite::cache::query::SqlQuery& initialQry, IgniteError& err);
+
+ /**
+ * Start continuous query execution with initial query.
+ *
+ * @param qry Continuous query.
+ * @param initialQry Initial query.
+ * @param err Error.
+ * @return Continuous query handle.
+ */
+ query::continuous::ContinuousQueryHandleImpl* QueryContinuous(
+ const common::concurrent::SharedPointer<query::continuous::ContinuousQueryImplBase> qry,
+ const ignite::cache::query::TextQuery& initialQry, IgniteError& err);
+
+ /**
+ * Start continuous query execution with initial query.
+ *
+ * @param qry Continuous query.
+ * @param initialQry Initial query.
+ * @param err Error.
+ * @return Continuous query handle.
+ */
+ query::continuous::ContinuousQueryHandleImpl* QueryContinuous(
+ const common::concurrent::SharedPointer<query::continuous::ContinuousQueryImplBase> qry,
+ const ignite::cache::query::ScanQuery& initialQry, IgniteError& err);
+
private:
+ IGNITE_NO_COPY_ASSIGNMENT(CacheImpl)
+
/** Name. */
char* name;
-
- IGNITE_NO_COPY_ASSIGNMENT(CacheImpl)
/**
* Internal query execution routine.
@@ -346,11 +396,67 @@ namespace ignite
if (jniErr.code == ignite::java::IGNITE_JNI_ERR_SUCCESS)
return new query::QueryCursorImpl(GetEnvironmentPointer(), qryJavaRef);
else
- return NULL;
+ return 0;
+ }
+
+ /**
+ * Start continuous query execution with the initial query.
+ *
+ * @param qry Continuous query.
+ * @param initialQry Initial query to be executed.
+ * @param err Error.
+ * @return Continuous query handle.
+ */
+ template<typename T>
+ query::continuous::ContinuousQueryHandleImpl* QueryContinuous(
+ const common::concurrent::SharedPointer<query::continuous::ContinuousQueryImplBase> qry,
+ const T& initialQry, int32_t typ, int32_t cmd, IgniteError& err)
+ {
+ jni::java::JniErrorInfo jniErr;
+
+ common::concurrent::SharedPointer<interop::InteropMemory> mem = GetEnvironment().AllocateMemory();
+ interop::InteropMemory* mem0 = mem.Get();
+ interop::InteropOutputStream out(mem0);
+ binary::BinaryWriterImpl writer(&out, GetEnvironment().GetTypeManager());
+ ignite::binary::BinaryRawWriter rawWriter(&writer);
+
+ const query::continuous::ContinuousQueryImplBase& qry0 = *qry.Get();
+
+ int64_t handle = GetEnvironment().GetHandleRegistry().Allocate(qry);
+
+ rawWriter.WriteInt64(handle);
+ rawWriter.WriteBool(qry0.GetLocal());
+
+ // Filters are not supported for now.
+ rawWriter.WriteBool(false);
+ rawWriter.WriteNull();
+
+ rawWriter.WriteInt32(qry0.GetBufferSize());
+ rawWriter.WriteInt64(qry0.GetTimeInterval());
+
+ // Autounsubscribe is a filter feature.
+ rawWriter.WriteBool(false);
+
+ // Writing initial query. When there is not initial query writing -1.
+ rawWriter.WriteInt32(typ);
+ if (typ != -1)
+ initialQry.Write(rawWriter);
+
+ out.Synchronize();
+
+ jobject qryJavaRef = GetEnvironment().Context()->CacheOutOpContinuousQuery(GetTarget(),
+ cmd, mem.Get()->PointerLong(), &jniErr);
+
+ IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, &err);
+
+ if (jniErr.code == java::IGNITE_JNI_ERR_SUCCESS)
+ return new query::continuous::ContinuousQueryHandleImpl(GetEnvironmentPointer(), handle, qryJavaRef);
+
+ return 0;
}
};
}
}
}
-#endif
\ No newline at end of file
+#endif
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_handle_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_handle_impl.h b/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_handle_impl.h
new file mode 100644
index 0000000..75504b1
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_handle_impl.h
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::impl::cache::query::continuous::ContinuousQueryHandleImpl class.
+ */
+
+#ifndef _IGNITE_IMPL_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE_IMPL
+#define _IGNITE_IMPL_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE_IMPL
+
+#include "ignite/cache/query/query_cursor.h"
+#include "ignite/impl/cache/query/continuous/continuous_query_impl.h"
+
+namespace ignite
+{
+ namespace impl
+ {
+ namespace cache
+ {
+ namespace query
+ {
+ namespace continuous
+ {
+ /**
+ * Continuous query handle implementation.
+ */
+ class IGNITE_IMPORT_EXPORT ContinuousQueryHandleImpl
+ {
+ typedef common::concurrent::SharedPointer<IgniteEnvironment> SP_IgniteEnvironment;
+ typedef common::concurrent::SharedPointer<ContinuousQueryImplBase> SP_ContinuousQueryImplBase;
+ public:
+ /**
+ * Default constructor.
+ *
+ * @param env Environment.
+ * @param javaRef Java reference.
+ */
+ ContinuousQueryHandleImpl(SP_IgniteEnvironment env, int64_t handle, jobject javaRef);
+
+ /**
+ * Destructor.
+ */
+ ~ContinuousQueryHandleImpl();
+
+ /**
+ * Gets the cursor for initial query.
+ * Can be called only once, throws exception on consequent calls.
+ *
+ * @param err Error.
+ * @return Initial query cursor.
+ */
+ QueryCursorImpl* GetInitialQueryCursor(IgniteError& err);
+
+ /**
+ * Set query to keep pointer to.
+ *
+ * @param query Query.
+ */
+ void SetQuery(SP_ContinuousQueryImplBase query);
+
+ private:
+ /** Environment. */
+ SP_IgniteEnvironment env;
+
+ /** Local handle for handle registry. */
+ int64_t handle;
+
+ /** Handle to Java object. */
+ jobject javaRef;
+
+ /** Shared pointer to query. Kept for query to live long enough. */
+ SP_ContinuousQueryImplBase qry;
+
+ /** Mutex. */
+ common::concurrent::CriticalSection mutex;
+
+ /** Cursor extracted. */
+ bool extracted;
+ };
+ }
+ }
+ }
+ }
+}
+
+#endif //_IGNITE_IMPL_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE_IMPL
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h b/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h
new file mode 100644
index 0000000..50ced12
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::impl::cache::query::continuous::ContinuousQueryImpl class.
+ */
+
+#ifndef _IGNITE_IMPL_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_IMPL
+#define _IGNITE_IMPL_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_IMPL
+
+#include <stdint.h>
+
+#include <ignite/reference.h>
+
+#include <ignite/cache/event/cache_entry_event_listener.h>
+#include <ignite/binary/binary_raw_reader.h>
+
+namespace ignite
+{
+ namespace impl
+ {
+ namespace cache
+ {
+ namespace query
+ {
+ namespace continuous
+ {
+ /**
+ * Continuous query base implementation class.
+ *
+ * Continuous queries allow to register a remote and a listener
+ * for cache update events. On any update to the related cache
+ * an event is sent to the node that has executed the query and
+ * listener is notified on that node.
+ *
+ * Continuous query can either be executed on the whole topology
+ * or only on local node.
+ *
+ * To execute the query over the cache use method
+ * ignite::cache::Cache::QueryContinuous().
+ */
+ class ContinuousQueryImplBase
+ {
+ public:
+ /**
+ * Destructor.
+ */
+ virtual ~ContinuousQueryImplBase()
+ {
+ // No-op.
+ }
+
+ /**
+ * Default value for the buffer size.
+ */
+ enum { DEFAULT_BUFFER_SIZE = 1 };
+
+ /**
+ * Default value for the time interval.
+ */
+ enum { DEFAULT_TIME_INTERVAL = 0 };
+
+ /**
+ * Constructor.
+ *
+ * @param loc Whether query should be executed locally.
+ */
+ explicit ContinuousQueryImplBase(bool loc) :
+ local(loc),
+ bufferSize(DEFAULT_BUFFER_SIZE),
+ timeInterval(DEFAULT_TIME_INTERVAL)
+ {
+ // No-op.
+ }
+
+ /**
+ * Set local flag.
+ *
+ * @param val Value of the flag. If true, query will be
+ * executed only on local node, so only local entries
+ * will be returned as query result.
+ */
+ void SetLocal(bool val)
+ {
+ local = val;
+ }
+
+ /**
+ * Get local flag.
+ *
+ * @return Value of the flag. If true, query will be
+ * executed only on local node, so only local entries
+ * will be returned as query result.
+ */
+ bool GetLocal() const
+ {
+ return local;
+ }
+
+ /**
+ * Set buffer size.
+ *
+ * When a cache update happens, entry is first
+ * put into a buffer. Entries from buffer will be sent to
+ * the master node only if the buffer is full or time
+ * provided via timeInterval is exceeded.
+ *
+ * @param val Buffer size.
+ */
+ void SetBufferSize(int32_t val)
+ {
+ bufferSize = val;
+ }
+
+ /**
+ * Get buffer size.
+ *
+ * When a cache update happens, entry is first
+ * put into a buffer. Entries from buffer will be sent to
+ * the master node only if the buffer is full or time
+ * provided via timeInterval is exceeded.
+ *
+ * @return Buffer size.
+ */
+ int32_t GetBufferSize() const
+ {
+ return bufferSize;
+ }
+
+ /**
+ * Set time interval.
+ *
+ * When a cache update happens, entry is first put into
+ * a buffer. Entries from buffer are sent to the master node
+ * only if the buffer is full (its size can be changed via
+ * SetBufferSize) or time provided via this method is
+ * exceeded.
+ *
+ * Default value is DEFAULT_TIME_INTERVAL, i.e. 0, which
+ * means that time check is disabled and entries will be
+ * sent only when buffer is full.
+ *
+ * @param val Time interval in miliseconds.
+ */
+ void SetTimeInterval(int64_t val)
+ {
+ timeInterval = val;
+ }
+
+ /**
+ * Get time interval.
+ *
+ * When a cache update happens, entry is first put into
+ * a buffer. Entries from buffer are sent to the master node
+ * only if the buffer is full (its size can be changed via
+ * SetBufferSize) or time provided via SetTimeInterval
+ * method is exceeded.
+ *
+ * Default value is DEFAULT_TIME_INTERVAL, i.e. 0, which
+ * means that time check is disabled and entries will be
+ * sent only when buffer is full.
+ *
+ * @return Time interval.
+ */
+ int64_t GetTimeInterval() const
+ {
+ return timeInterval;
+ }
+
+ /**
+ * Callback that reads and processes cache events.
+ *
+ * @param reader Reader to use.
+ */
+ virtual void ReadAndProcessEvents(ignite::binary::BinaryRawReader& reader) = 0;
+
+ private:
+ /**
+ * Local flag. When set query will be executed only on local
+ * node, so only local entries will be returned as query
+ * result.
+ *
+ * Default value is false.
+ */
+ bool local;
+
+ /**
+ * Buffer size. When a cache update happens, entry is first
+ * put into a buffer. Entries from buffer will be sent to
+ * the master node only if the buffer is full or time
+ * provided via timeInterval is exceeded.
+ *
+ * Default value is DEFAULT_BUFFER_SIZE.
+ */
+ int32_t bufferSize;
+
+ /**
+ * Time interval in miliseconds. When a cache update
+ * happens, entry is first put into a buffer. Entries from
+ * buffer will be sent to the master node only if the buffer
+ * is full (its size can be changed via SetBufferSize) or
+ * time provided via SetTimeInterval method is exceeded.
+ *
+ * Default value is DEFAULT_TIME_INTERVAL, i.e. 0, which
+ * means that time check is disabled and entries will be
+ * sent only when buffer is full.
+ */
+ int64_t timeInterval;
+ };
+
+ /**
+ * Continuous query implementation.
+ *
+ * Continuous queries allow to register a remote and a listener
+ * for cache update events. On any update to the related cache
+ * an event is sent to the node that has executed the query and
+ * listener is notified on that node.
+ *
+ * Continuous query can either be executed on the whole topology
+ * or only on local node.
+ *
+ * To execute the query over the cache use method
+ * ignite::cache::Cache::QueryContinuous().
+ */
+ template<typename K, typename V>
+ class ContinuousQueryImpl : public ContinuousQueryImplBase
+ {
+ public:
+ /**
+ * Destructor.
+ */
+ virtual ~ContinuousQueryImpl()
+ {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param lsnr Event listener. Invoked on the node where
+ * continuous query execution has been started.
+ */
+ ContinuousQueryImpl(Reference<ignite::cache::event::CacheEntryEventListener<K, V>>& lsnr) :
+ ContinuousQueryImplBase(false),
+ lsnr(lsnr)
+ {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param lsnr Event listener Invoked on the node where
+ * continuous query execution has been started.
+ * @param loc Whether query should be executed locally.
+ */
+ ContinuousQueryImpl(Reference<ignite::cache::event::CacheEntryEventListener<K, V>>& lsnr, bool loc) :
+ ContinuousQueryImplBase(loc),
+ lsnr(lsnr)
+ {
+ // No-op.
+ }
+
+ /**
+ * Set cache entry event listener.
+ *
+ * @param val Cache entry event listener. Invoked on the
+ * node where continuous query execution has been
+ * started.
+ */
+ void SetListener(Reference<ignite::cache::event::CacheEntryEventListener<K, V>>& val)
+ {
+ lsnr = val;
+ }
+
+ /**
+ * Check if the query has listener.
+ *
+ * @return True if the query has listener.
+ */
+ bool HasListener() const
+ {
+ return !lsnr.IsNull();
+ }
+
+ /**
+ * Get cache entry event listener.
+ *
+ * @return Cache entry event listener.
+ */
+ const ignite::cache::event::CacheEntryEventListener<K, V>& GetListener() const
+ {
+ return lsnr.Get();
+ }
+
+ /**
+ * Get cache entry event listener.
+ *
+ * @return Cache entry event listener.
+ */
+ ignite::cache::event::CacheEntryEventListener<K, V>& GetListener()
+ {
+ return lsnr.Get();
+ }
+
+ /**
+ * Callback that reads and processes cache events.
+ *
+ * @param reader Reader to use.
+ */
+ virtual void ReadAndProcessEvents(ignite::binary::BinaryRawReader& reader)
+ {
+ // Number of events.
+ int32_t cnt = reader.ReadInt32();
+
+ // Storing events here.
+ std::vector< ignite::cache::CacheEntryEvent<K, V> > events;
+ events.resize(cnt);
+
+ for (int32_t i = 0; i < cnt; ++i)
+ events[i].Read(reader);
+
+ lsnr.Get().OnEvent(events.data(), cnt);
+ }
+
+ private:
+ /** Cache entry event listener. */
+ Reference<ignite::cache::event::CacheEntryEventListener<K, V>> lsnr;
+ };
+ }
+ }
+ }
+ }
+}
+
+#endif //_IGNITE_IMPL_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_IMPL
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/impl/handle_registry.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/handle_registry.h b/modules/platforms/cpp/core/include/ignite/impl/handle_registry.h
index 107042a..3c4d123 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/handle_registry.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/handle_registry.h
@@ -28,21 +28,9 @@ namespace ignite
namespace impl
{
/**
- * Something what can be registered inside handle registry.
- */
- class IGNITE_IMPORT_EXPORT HandleRegistryEntry
- {
- public:
- /**
- * Destructor.
- */
- virtual ~HandleRegistryEntry();
- };
-
- /**
* Handle registry segment containing thread-specific data for slow-path access.
*/
- class IGNITE_IMPORT_EXPORT HandleRegistrySegment
+ class HandleRegistrySegment
{
public:
/**
@@ -61,7 +49,7 @@ namespace ignite
* @param hnd Handle.
* @return Associated entry or NULL if it doesn't exists.
*/
- ignite::common::concurrent::SharedPointer<HandleRegistryEntry> Get(int64_t hnd);
+ common::concurrent::SharedPointer<void> Get(int64_t hnd);
/**
* Put entry into segment.
@@ -69,14 +57,14 @@ namespace ignite
* @param hnd Handle.
* @param entry Associated entry (cannot be NULL).
*/
- void Put(int64_t hnd, const ignite::common::concurrent::SharedPointer<HandleRegistryEntry>& entry);
+ void Put(int64_t hnd, const common::concurrent::SharedPointer<void>& entry);
/**
* Remove entry from the segment.
*
* @param hnd Handle.
*/
- void Remove(int64_t hnd);
+ void Remove(int64_t hnd);
/**
* Clear all entries from the segment.
@@ -84,10 +72,10 @@ namespace ignite
void Clear();
private:
/** Map with data. */
- std::map<int64_t, ignite::common::concurrent::SharedPointer<HandleRegistryEntry>>* map;
+ std::map<int64_t, common::concurrent::SharedPointer<void>> map;
/** Mutex. */
- ignite::common::concurrent::CriticalSection* mux;
+ common::concurrent::CriticalSection mux;
IGNITE_NO_COPY_ASSIGNMENT(HandleRegistrySegment);
};
@@ -102,7 +90,7 @@ namespace ignite
* Constructor.
*
* @param fastCap Fast-path capacity.
- * @param segmentCnt Slow-path segments count.
+ * @param slowSegmentCnt Slow-path segments count.
*/
HandleRegistry(int32_t fastCap, int32_t slowSegmentCnt);
@@ -117,7 +105,7 @@ namespace ignite
* @param target Target.
* @return Handle.
*/
- int64_t Allocate(const ignite::common::concurrent::SharedPointer<HandleRegistryEntry>& target);
+ int64_t Allocate(const common::concurrent::SharedPointer<void>& target);
/**
* Allocate handle in critical mode.
@@ -125,7 +113,7 @@ namespace ignite
* @param target Target.
* @return Handle.
*/
- int64_t AllocateCritical(const ignite::common::concurrent::SharedPointer<HandleRegistryEntry>& target);
+ int64_t AllocateCritical(const common::concurrent::SharedPointer<void>& target);
/**
* Allocate handle in safe mode.
@@ -133,7 +121,7 @@ namespace ignite
* @param target Target.
* @return Handle.
*/
- int64_t AllocateSafe(const ignite::common::concurrent::SharedPointer<HandleRegistryEntry>& target);
+ int64_t AllocateSafe(const common::concurrent::SharedPointer<void>& target);
/**
* Allocate handle in critical and safe modes.
@@ -141,7 +129,7 @@ namespace ignite
* @param target Target.
* @return Handle.
*/
- int64_t AllocateCriticalSafe(const ignite::common::concurrent::SharedPointer<HandleRegistryEntry>& target);
+ int64_t AllocateCriticalSafe(const common::concurrent::SharedPointer<void>& target);
/**
* Release handle.
@@ -154,35 +142,36 @@ namespace ignite
* Get target.
*
* @param hnd Handle.
- * @param Target.
+ * @return Target.
*/
- ignite::common::concurrent::SharedPointer<HandleRegistryEntry> Get(int64_t hnd);
+ common::concurrent::SharedPointer<void> Get(int64_t hnd);
/**
* Close the registry.
*/
void Close();
+
private:
/** Fast-path container capacity. */
- int32_t fastCap;
+ int32_t fastCap;
/** Fast-path counter. */
- int32_t fastCtr;
+ int32_t fastCtr;
/** Fast-path container. */
- ignite::common::concurrent::SharedPointer<HandleRegistryEntry>* fast;
+ common::concurrent::SharedPointer<void>* fast;
/** Amount of slow-path segments. */
- int32_t slowSegmentCnt;
+ int32_t slowSegmentCnt;
/** Slow-path counter. */
- int64_t slowCtr;
-
+ int64_t slowCtr;
+
/** Slow-path segments. */
- HandleRegistrySegment** slow;
+ HandleRegistrySegment** slow;
/** Close flag. */
- int32_t closed;
+ int32_t closed;
IGNITE_NO_COPY_ASSIGNMENT(HandleRegistry);
@@ -190,11 +179,10 @@ namespace ignite
* Internal allocation routine.
*
* @param target Target.
- * @param Critical mode flag.
- * @param Safe mode flag.
+ * @param critical mode flag.
+ * @param safe mode flag.
*/
- int64_t Allocate0(const ignite::common::concurrent::SharedPointer<HandleRegistryEntry>& target,
- bool critical, bool safe);
+ int64_t Allocate0(const common::concurrent::SharedPointer<void>& target, bool critical, bool safe);
};
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h b/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
index fb6f657..2b2a117 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
@@ -20,14 +20,15 @@
#include <ignite/common/concurrent.h>
#include <ignite/jni/java.h>
+#include <ignite/jni/utils.h>
#include "ignite/impl/interop/interop_memory.h"
#include "ignite/impl/binary/binary_type_manager.h"
-#include "ignite/jni/utils.h"
+#include "ignite/impl/handle_registry.h"
-namespace ignite
+namespace ignite
{
- namespace impl
+ namespace impl
{
/**
* Defines environment in which Ignite operates.
@@ -41,6 +42,16 @@ namespace ignite
enum { DEFAULT_ALLOCATION_SIZE = 1024 };
/**
+ * Default fast path handle registry containers capasity.
+ */
+ enum { DEFAULT_FAST_PATH_CONTAINERS_CAP = 1024 };
+
+ /**
+ * Default slow path handle registry containers capasity.
+ */
+ enum { DEFAULT_SLOW_PATH_CONTAINERS_CAP = 16 };
+
+ /**
* Default constructor.
*/
IgniteEnvironment();
@@ -78,6 +89,13 @@ namespace ignite
void OnStartCallback(long long memPtr, jobject proc);
/**
+ * Continuous query listener apply callback.
+ *
+ * @param mem Memory with data.
+ */
+ void OnContinuousQueryListenerApply(common::concurrent::SharedPointer<interop::InteropMemory>& mem);
+
+ /**
* Get name of Ignite instance.
*
* @return Name.
@@ -133,6 +151,13 @@ namespace ignite
*/
void ProcessorReleaseStart();
+ /**
+ * Get handle registry.
+ *
+ * @return Handle registry.
+ */
+ HandleRegistry& GetHandleRegistry();
+
private:
/** Context to access Java. */
common::concurrent::SharedPointer<jni::java::JniContext> ctx;
@@ -152,6 +177,9 @@ namespace ignite
/** Type updater. */
binary::BinaryTypeUpdater* metaUpdater;
+ /** Handle registry. */
+ HandleRegistry registry;
+
IGNITE_NO_COPY_ASSIGNMENT(IgniteEnvironment);
};
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/namespaces.dox
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/namespaces.dox b/modules/platforms/cpp/core/namespaces.dox
index 0f5f11f..49379e6 100644
--- a/modules/platforms/cpp/core/namespaces.dox
+++ b/modules/platforms/cpp/core/namespaces.dox
@@ -22,40 +22,54 @@
* computing and transacting on large-scale data sets in real-time, orders of magnitude faster than possible with
* traditional disk-based or flash-based technologies.
*/
-
+
/**
* Apache %Ignite API.
*/
namespace ignite
{
- /**
- * %Ignite Binary Objects API.
- */
- namespace binary
- {
- // Empty.
- }
+ /**
+ * %Ignite Binary Objects API.
+ */
+ namespace binary
+ {
+ // Empty.
+ }
+
+ /**
+ * %Ignite %Transaction API.
+ */
+ namespace transactions
+ {
+ // Empty.
+ }
- /**
- * %Ignite %Transaction API.
- */
- namespace transactions
- {
- // Empty.
- }
-
- /**
- * %Ignite %Cache API.
- */
- namespace cache
- {
- /**
- * Contains APIs for creating and executing cache queries.
- */
- namespace query
- {
- // Empty.
- }
- }
+ /**
+ * %Ignite %Cache API.
+ */
+ namespace cache
+ {
+ /**
+ * Contains APIs for cache events.
+ */
+ namespace event
+ {
+ // Empty.
+ }
+
+ /**
+ * Contains APIs for creating and executing cache queries.
+ */
+ namespace query
+ {
+ /**
+ * Contains APIs for continuous queries.
+ */
+ namespace continuous
+ {
+ // Empty.
+ }
+ }
+ }
}
-
+
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/project/vs/core.vcxproj
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj b/modules/platforms/cpp/core/project/vs/core.vcxproj
index 6320323..89a2dff 100644
--- a/modules/platforms/cpp/core/project/vs/core.vcxproj
+++ b/modules/platforms/cpp/core/project/vs/core.vcxproj
@@ -193,6 +193,10 @@
<ClInclude Include="..\..\include\ignite\cache\cache.h" />
<ClInclude Include="..\..\include\ignite\cache\cache_entry.h" />
<ClInclude Include="..\..\include\ignite\cache\cache_peek_mode.h" />
+ <ClInclude Include="..\..\include\ignite\cache\event\cache_entry_event.h" />
+ <ClInclude Include="..\..\include\ignite\cache\event\cache_entry_event_listener.h" />
+ <ClInclude Include="..\..\include\ignite\cache\query\continuous\continuous_query.h" />
+ <ClInclude Include="..\..\include\ignite\cache\query\continuous\continuous_query_handle.h" />
<ClInclude Include="..\..\include\ignite\cache\query\query.h" />
<ClInclude Include="..\..\include\ignite\cache\query\query_argument.h" />
<ClInclude Include="..\..\include\ignite\cache\query\query_cursor.h" />
@@ -208,6 +212,8 @@
<ClInclude Include="..\..\include\ignite\impl\binary\binary_type_updater_impl.h" />
<ClInclude Include="..\..\include\ignite\impl\cache\cache_impl.h" />
<ClInclude Include="..\..\include\ignite\impl\cache\query\query_batch.h" />
+ <ClInclude Include="..\..\include\ignite\impl\cache\query\continuous\continuous_query_handle_impl.h" />
+ <ClInclude Include="..\..\include\ignite\impl\cache\query\continuous\continuous_query_impl.h" />
<ClInclude Include="..\..\include\ignite\impl\cache\query\query_fields_row_impl.h" />
<ClInclude Include="..\..\include\ignite\impl\cache\query\query_impl.h" />
<ClInclude Include="..\..\include\ignite\impl\ignite_environment.h" />
@@ -229,6 +235,7 @@
<ClCompile Include="..\..\src\impl\binary\binary_type_updater_impl.cpp" />
<ClCompile Include="..\..\src\impl\cache\cache_impl.cpp" />
<ClCompile Include="..\..\src\impl\cache\query\query_batch.cpp" />
+ <ClCompile Include="..\..\src\impl\cache\query\continuous\continuous_query_handle_impl.cpp" />
<ClCompile Include="..\..\src\impl\cache\query\query_impl.cpp" />
<ClCompile Include="..\..\src\impl\ignite_environment.cpp" />
<ClCompile Include="..\..\src\impl\ignite_impl.cpp" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
index c5fb532..9cb5f78 100644
--- a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
+++ b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
@@ -46,6 +46,9 @@
<ClCompile Include="..\..\src\impl\cache\query\query_batch.cpp">
<Filter>Code\impl\cache\query</Filter>
</ClCompile>
+ <ClCompile Include="..\..\src\impl\cache\query\continuous\continuous_query_handle_impl.cpp">
+ <Filter>Code\impl\cache\query\continuous</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\include\ignite\impl\cache\cache_impl.h">
@@ -144,6 +147,24 @@
<ClInclude Include="..\..\include\ignite\impl\cache\query\query_batch.h">
<Filter>Code\impl\cache\query</Filter>
</ClInclude>
+ <ClInclude Include="..\..\include\ignite\cache\query\continuous\continuous_query_handle.h">
+ <Filter>Code\cache\query\continuous</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\include\ignite\cache\query\continuous\continuous_query.h">
+ <Filter>Code\cache\query\continuous</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\include\ignite\impl\cache\query\continuous\continuous_query_handle_impl.h">
+ <Filter>Code\impl\cache\query\continuous</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\include\ignite\cache\event\cache_entry_event_listener.h">
+ <Filter>Code\cache\event</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\include\ignite\cache\event\cache_entry_event.h">
+ <Filter>Code\cache\event</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\include\ignite\impl\cache\query\continuous\continuous_query_impl.h">
+ <Filter>Code\impl\cache\query\continuous</Filter>
+ </ClInclude>
</ItemGroup>
<ItemGroup>
<Filter Include="Code">
@@ -176,5 +197,14 @@
<Filter Include="Code\transactions">
<UniqueIdentifier>{146fe661-0ad3-4d51-83a3-fce8a897e84d}</UniqueIdentifier>
</Filter>
+ <Filter Include="Code\cache\query\continuous">
+ <UniqueIdentifier>{2056dfc8-4ced-4658-b2b7-a8c81a7ef797}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="Code\impl\cache\query\continuous">
+ <UniqueIdentifier>{d633f819-7b30-4e26-81ec-f708d1c1ff8e}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="Code\cache\event">
+ <UniqueIdentifier>{e03c3690-ff22-4c78-83a0-b77cebb7f980}</UniqueIdentifier>
+ </Filter>
</ItemGroup>
</Project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp b/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
index 0630921..ad69d45 100644
--- a/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
+++ b/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
@@ -26,9 +26,11 @@ using namespace ignite::java;
using namespace ignite::common;
using namespace ignite::cache;
using namespace ignite::cache::query;
+using namespace ignite::cache::query::continuous;
using namespace ignite::impl;
using namespace ignite::impl::binary;
using namespace ignite::impl::cache::query;
+using namespace ignite::impl::cache::query::continuous;
using namespace ignite::impl::interop;
using namespace ignite::binary;
@@ -89,6 +91,9 @@ namespace ignite
/** Operation: PutIfAbsent. */
const int32_t OP_PUT_IF_ABSENT = 28;
+ /** Operation: CONTINUOUS query. */
+ const int32_t OP_QRY_CONTINUOUS = 29;
+
/** Operation: SCAN query. */
const int32_t OP_QRY_SCAN = 30;
@@ -301,6 +306,32 @@ namespace ignite
{
return QueryInternal(qry, OP_QRY_SQL_FIELDS, err);
}
+
+ ContinuousQueryHandleImpl* CacheImpl::QueryContinuous(const SharedPointer<ContinuousQueryImplBase> qry,
+ const SqlQuery& initialQry, IgniteError& err)
+ {
+ return QueryContinuous(qry, initialQry, OP_QRY_SQL, OP_QRY_CONTINUOUS, err);
+ }
+
+ ContinuousQueryHandleImpl* CacheImpl::QueryContinuous(const SharedPointer<ContinuousQueryImplBase> qry,
+ const TextQuery& initialQry, IgniteError& err)
+ {
+ return QueryContinuous(qry, initialQry, OP_QRY_TEXT, OP_QRY_CONTINUOUS, err);
+ }
+
+ ContinuousQueryHandleImpl* CacheImpl::QueryContinuous(const SharedPointer<ContinuousQueryImplBase> qry,
+ const ScanQuery& initialQry, IgniteError& err)
+ {
+ return QueryContinuous(qry, initialQry, OP_QRY_SCAN, OP_QRY_CONTINUOUS, err);
+ }
+
+ ContinuousQueryHandleImpl* CacheImpl::QueryContinuous(const SharedPointer<ContinuousQueryImplBase> qry,
+ IgniteError& err)
+ {
+ struct { void Write(BinaryRawWriter&) const { }} dummy;
+
+ return QueryContinuous(qry, dummy, -1, OP_QRY_CONTINUOUS, err);
+ }
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp b/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp
new file mode 100644
index 0000000..04e64c9
--- /dev/null
+++ b/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ignite/impl/cache/query/continuous/continuous_query_handle_impl.h"
+
+using namespace ignite::common::concurrent;
+using namespace ignite::jni::java;
+using namespace ignite::java;
+using namespace ignite::impl::interop;
+using namespace ignite::impl::binary;
+
+namespace ignite
+{
+ namespace impl
+ {
+ namespace cache
+ {
+ namespace query
+ {
+ namespace continuous
+ {
+ enum Command
+ {
+ GET_INITIAL_QUERY = 0,
+
+ CLOSE = 1
+ };
+
+ ContinuousQueryHandleImpl::ContinuousQueryHandleImpl(SP_IgniteEnvironment env, int64_t handle, jobject javaRef) :
+ env(env),
+ handle(handle),
+ javaRef(javaRef),
+ mutex(),
+ extracted(false)
+ {
+ // No-op.
+ }
+
+ ContinuousQueryHandleImpl::~ContinuousQueryHandleImpl()
+ {
+ env.Get()->Context()->TargetInLongOutLong(javaRef, CLOSE, 0);
+
+ JniContext::Release(javaRef);
+
+ env.Get()->GetHandleRegistry().Release(handle);
+ }
+
+ QueryCursorImpl* ContinuousQueryHandleImpl::GetInitialQueryCursor(IgniteError& err)
+ {
+ CsLockGuard guard(mutex);
+
+ if (extracted)
+ {
+ err = IgniteError(IgniteError::IGNITE_ERR_GENERIC,
+ "GetInitialQueryCursor() can be called only once.");
+
+ return 0;
+ }
+
+ JniErrorInfo jniErr;
+
+ jobject res = env.Get()->Context()->TargetOutObject(javaRef, GET_INITIAL_QUERY, &jniErr);
+
+ IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, &err);
+
+ if (jniErr.code != IGNITE_JNI_ERR_SUCCESS)
+ return 0;
+
+ extracted = true;
+
+ return new QueryCursorImpl(env, res);
+ }
+
+ void ContinuousQueryHandleImpl::SetQuery(SP_ContinuousQueryImplBase query)
+ {
+ qry = query;
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/src/impl/handle_registry.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/handle_registry.cpp b/modules/platforms/cpp/core/src/impl/handle_registry.cpp
index c447faa..069e996 100644
--- a/modules/platforms/cpp/core/src/impl/handle_registry.cpp
+++ b/modules/platforms/cpp/core/src/impl/handle_registry.cpp
@@ -23,83 +23,67 @@ namespace ignite
{
namespace impl
{
- HandleRegistryEntry::~HandleRegistryEntry()
- {
- // No-op.
- }
-
HandleRegistrySegment::HandleRegistrySegment() :
- map(new std::map<int64_t, SharedPointer<HandleRegistryEntry>>()), mux(new CriticalSection())
+ map(),
+ mux()
{
// No-op.
}
HandleRegistrySegment::~HandleRegistrySegment()
{
- delete map;
- delete mux;
+ // No-op.
}
- SharedPointer<HandleRegistryEntry> HandleRegistrySegment::Get(int64_t hnd)
+ SharedPointer<void> HandleRegistrySegment::Get(int64_t hnd)
{
- mux->Enter();
+ typedef std::map<int64_t, SharedPointer<void>> Map;
- SharedPointer<HandleRegistryEntry> res = (*map)[hnd];
+ CsLockGuard guard(mux);
- mux->Leave();
+ Map::const_iterator it = map.find(hnd);
+ if (it == map.end())
+ return SharedPointer<void>();
- return res;
+ return it->second;
}
- void HandleRegistrySegment::Put(int64_t hnd, const SharedPointer<HandleRegistryEntry>& entry)
+ void HandleRegistrySegment::Put(int64_t hnd, const SharedPointer<void>& entry)
{
- mux->Enter();
-
- (*map)[hnd] = entry;
+ CsLockGuard guard(mux);
- mux->Leave();
+ map[hnd] = entry;
}
void HandleRegistrySegment::Remove(int64_t hnd)
{
- mux->Enter();
+ CsLockGuard guard(mux);
- map->erase(hnd);
-
- mux->Leave();
+ map.erase(hnd);
}
void HandleRegistrySegment::Clear()
{
- mux->Enter();
-
- map->erase(map->begin(), map->end());
+ CsLockGuard guard(mux);
- mux->Leave();
+ map.clear();
}
- HandleRegistry::HandleRegistry(int32_t fastCap, int32_t slowSegmentCnt)
+ HandleRegistry::HandleRegistry(int32_t fastCap, int32_t slowSegmentCnt) :
+ fastCap(fastCap),
+ fastCtr(0),
+ fast(new SharedPointer<void>[fastCap]),
+ slowSegmentCnt(slowSegmentCnt),
+ slowCtr(fastCap),
+ slow(new HandleRegistrySegment*[slowSegmentCnt]),
+ closed(0)
{
- this->fastCap = fastCap;
+ for (int32_t i = 0; i < fastCap; i++)
+ fast[i] = SharedPointer<void>();
- fastCtr = 0;
-
- fast = new SharedPointer<HandleRegistryEntry>[fastCap];
-
- for (int i = 0; i < fastCap; i++)
- fast[i] = SharedPointer<HandleRegistryEntry>();
-
- this->slowSegmentCnt = slowSegmentCnt;
-
- slowCtr = fastCap;
-
- slow = new HandleRegistrySegment*[slowSegmentCnt];
-
- for (int i = 0; i < slowSegmentCnt; i++)
+ for (int32_t i = 0; i < slowSegmentCnt; i++)
slow[i] = new HandleRegistrySegment();
- closed = 0;
-
Memory::Fence();
}
@@ -115,22 +99,22 @@ namespace ignite
delete[] slow;
}
- int64_t HandleRegistry::Allocate(const SharedPointer<HandleRegistryEntry>& target)
+ int64_t HandleRegistry::Allocate(const SharedPointer<void>& target)
{
return Allocate0(target, false, false);
}
- int64_t HandleRegistry::AllocateCritical(const SharedPointer<HandleRegistryEntry>& target)
+ int64_t HandleRegistry::AllocateCritical(const SharedPointer<void>& target)
{
return Allocate0(target, true, false);
}
- int64_t HandleRegistry::AllocateSafe(const SharedPointer<HandleRegistryEntry>& target)
+ int64_t HandleRegistry::AllocateSafe(const SharedPointer<void>& target)
{
return Allocate0(target, false, true);
}
- int64_t HandleRegistry::AllocateCriticalSafe(const SharedPointer<HandleRegistryEntry>& target)
+ int64_t HandleRegistry::AllocateCriticalSafe(const SharedPointer<void>& target)
{
return Allocate0(target, true, true);
}
@@ -138,10 +122,10 @@ namespace ignite
void HandleRegistry::Release(int64_t hnd)
{
if (hnd < fastCap)
- fast[static_cast<int32_t>(hnd)] = SharedPointer<HandleRegistryEntry>();
+ fast[static_cast<int32_t>(hnd)] = SharedPointer<void>();
else
{
- HandleRegistrySegment* segment = *(slow + hnd % slowSegmentCnt);
+ HandleRegistrySegment* segment = slow[hnd % slowSegmentCnt];
segment->Remove(hnd);
}
@@ -149,7 +133,7 @@ namespace ignite
Memory::Fence();
}
- SharedPointer<HandleRegistryEntry> HandleRegistry::Get(int64_t hnd)
+ SharedPointer<void> HandleRegistry::Get(int64_t hnd)
{
Memory::Fence();
@@ -157,7 +141,7 @@ namespace ignite
return fast[static_cast<int32_t>(hnd)];
else
{
- HandleRegistrySegment* segment = *(slow + hnd % slowSegmentCnt);
+ HandleRegistrySegment* segment = slow[hnd % slowSegmentCnt];
return segment->Get(hnd);
}
@@ -168,16 +152,16 @@ namespace ignite
if (Atomics::CompareAndSet32(&closed, 0, 1))
{
// Cleanup fast-path handles.
- for (int i = 0; i < fastCap; i++)
- fast[i] = SharedPointer<HandleRegistryEntry>();
+ for (int32_t i = 0; i < fastCap; i++)
+ fast[i] = SharedPointer<void>();
// Cleanup slow-path handles.
- for (int i = 0; i < slowSegmentCnt; i++)
- (*(slow + i))->Clear();
+ for (int32_t i = 0; i < slowSegmentCnt; i++)
+ slow[i]->Clear();
}
}
- int64_t HandleRegistry::Allocate0(const SharedPointer<HandleRegistryEntry>& target, bool critical, bool safe)
+ int64_t HandleRegistry::Allocate0(const SharedPointer<void>& target, bool critical, bool safe)
{
// Check closed state.
Memory::Fence();
@@ -201,7 +185,7 @@ namespace ignite
if (safe && closed == 1)
{
- fast[fastIdx] = SharedPointer<HandleRegistryEntry>();
+ fast[fastIdx] = SharedPointer<void>();
return -1;
}
@@ -214,7 +198,7 @@ namespace ignite
// Either allocating on slow-path, or fast-path can no longer accomodate more entries.
int64_t slowIdx = Atomics::IncrementAndGet64(&slowCtr) - 1;
- HandleRegistrySegment* segment = *(slow + slowIdx % slowSegmentCnt);
+ HandleRegistrySegment* segment = slow[slowIdx % slowSegmentCnt];
segment->Put(slowIdx, target);