You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2016/12/20 16:38:59 UTC
[2/3] ignite git commit: IGNITE-1443: Implemented ContinuousQuery for
C++
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/cache/cache.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/cache/cache.h b/modules/platforms/cpp/core/include/ignite/cache/cache.h
index a975be3..54c0f96 100644
--- a/modules/platforms/cpp/core/include/ignite/cache/cache.h
+++ b/modules/platforms/cpp/core/include/ignite/cache/cache.h
@@ -37,6 +37,8 @@
#include "ignite/cache/query/query_sql.h"
#include "ignite/cache/query/query_text.h"
#include "ignite/cache/query/query_sql_fields.h"
+#include "ignite/cache/query/continuous/continuous_query_handle.h"
+#include "ignite/cache/query/continuous/continuous_query.h"
#include "ignite/impl/cache/cache_impl.h"
#include "ignite/impl/operations.h"
@@ -1339,6 +1341,106 @@ namespace ignite
}
/**
+ * Start continuous query execution.
+ *
+ * @param qry Continuous query.
+ * @return Continuous query handle.
+ */
+ query::continuous::ContinuousQueryHandle<K, V> QueryContinuous(
+ const query::continuous::ContinuousQuery<K, V>& qry)
+ {
+ IgniteError err;
+
+ query::continuous::ContinuousQueryHandle<K, V> res = QueryContinuous(qry, err);
+
+ IgniteError::ThrowIfNeeded(err);
+
+ return res;
+ }
+
+ /**
+ * Start continuous query execution.
+ *
+ * @param qry Continuous query.
+ * @param err Error.
+ * @return Continuous query handle.
+ */
+ query::continuous::ContinuousQueryHandle<K, V> QueryContinuous(
+ const query::continuous::ContinuousQuery<K, V>& qry, IgniteError& err)
+ {
+ using namespace impl::cache::query::continuous;
+
+ if (!qry.impl.IsValid() || !qry.impl.Get()->HasListener())
+ {
+ err = IgniteError(IgniteError::IGNITE_ERR_GENERIC,
+ "Event listener is not set for ContinuousQuery instance");
+
+ return query::continuous::ContinuousQueryHandle<K, V>();
+ }
+
+ ContinuousQueryHandleImpl* cqImpl;
+ cqImpl = impl.Get()->QueryContinuous(qry.impl, err);
+
+ if (cqImpl)
+ cqImpl->SetQuery(qry.impl);
+
+ return query::continuous::ContinuousQueryHandle<K, V>(cqImpl);
+ }
+
+ /**
+ * Start continuous query execution with the initial query.
+ *
+ * @param qry Continuous query.
+ * @param initialQry Initial query to be executed.
+ * @return Continuous query handle.
+ */
+ template<typename Q>
+ query::continuous::ContinuousQueryHandle<K, V> QueryContinuous(
+ const query::continuous::ContinuousQuery<K, V>& qry,
+ const Q& initialQry)
+ {
+ IgniteError err;
+
+ query::continuous::ContinuousQueryHandle<K, V> res = QueryContinuous(qry, initialQry, err);
+
+ IgniteError::ThrowIfNeeded(err);
+
+ return res;
+ }
+
+ /**
+ * Start continuous query execution with the initial query.
+ *
+ * @param qry Continuous query.
+ * @param initialQry Initial query to be executed.
+ * @param err Error.
+ * @return Continuous query handle.
+ */
+ template<typename Q>
+ query::continuous::ContinuousQueryHandle<K, V> QueryContinuous(
+ const query::continuous::ContinuousQuery<K, V>& qry,
+ const Q& initialQry, IgniteError& err)
+ {
+ using namespace impl::cache::query::continuous;
+
+ if (!qry.impl.IsValid() || !qry.impl.Get()->HasListener())
+ {
+ err = IgniteError(IgniteError::IGNITE_ERR_GENERIC,
+ "Event listener is not set for ContinuousQuery instance");
+
+ return query::continuous::ContinuousQueryHandle<K, V>();
+ }
+
+ ContinuousQueryHandleImpl* cqImpl;
+ cqImpl = impl.Get()->QueryContinuous(qry.impl, initialQry, err);
+
+ if (cqImpl)
+ cqImpl->SetQuery(qry.impl);
+
+ return query::continuous::ContinuousQueryHandle<K, V>(cqImpl);
+ }
+
+ /**
* Check if the instance is valid.
*
* Invalid instance can be returned if some of the previous
@@ -1356,7 +1458,7 @@ namespace ignite
private:
/** Implementation delegate. */
- ignite::common::concurrent::SharedPointer<impl::cache::CacheImpl> impl;
+ common::concurrent::SharedPointer<impl::cache::CacheImpl> impl;
};
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/cache/cache_entry.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/cache/cache_entry.h b/modules/platforms/cpp/core/include/ignite/cache/cache_entry.h
index c737940..aea5182 100644
--- a/modules/platforms/cpp/core/include/ignite/cache/cache_entry.h
+++ b/modules/platforms/cpp/core/include/ignite/cache/cache_entry.h
@@ -45,7 +45,9 @@ namespace ignite
* Creates instance with both key and value default-constructed.
*/
CacheEntry() :
- key(), val()
+ key(),
+ val(),
+ hasValue(false)
{
// No-op.
}
@@ -57,7 +59,9 @@ namespace ignite
* @param val Value.
*/
CacheEntry(const K& key, const V& val) :
- key(key), val(val)
+ key(key),
+ val(val),
+ hasValue(true)
{
// No-op.
}
@@ -68,7 +72,17 @@ namespace ignite
* @param other Other instance.
*/
CacheEntry(const CacheEntry& other) :
- key(other.key), val(other.val)
+ key(other.key),
+ val(other.val),
+ hasValue(other.hasValue)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~CacheEntry()
{
// No-op.
}
@@ -84,6 +98,7 @@ namespace ignite
{
key = other.key;
val = other.val;
+ hasValue = other.hasValue;
}
return *this;
@@ -94,7 +109,7 @@ namespace ignite
*
* @return Key.
*/
- K GetKey() const
+ const K& GetKey() const
{
return key;
}
@@ -104,17 +119,30 @@ namespace ignite
*
* @return Value.
*/
- V GetValue() const
+ const V& GetValue() const
{
return val;
}
- private:
+ /**
+ * Check if the value exists.
+ *
+ * @return True, if the value exists.
+ */
+ bool HasValue() const
+ {
+ return hasValue;
+ }
+
+ protected:
/** Key. */
K key;
/** Value. */
V val;
+
+ /** Indicates whether value exists */
+ bool hasValue;
};
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event.h b/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event.h
new file mode 100644
index 0000000..14fa185
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event.h
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::cache::event::CacheEntryEvent class.
+ */
+
+#ifndef _IGNITE_CACHE_EVENT_CACHE_ENTRY_EVENT
+#define _IGNITE_CACHE_EVENT_CACHE_ENTRY_EVENT
+
+#include <ignite/binary/binary_raw_reader.h>
+#include <ignite/cache/cache_entry.h>
+
+namespace ignite
+{
+ namespace cache
+ {
+ /**
+ * Cache entry event class template.
+ *
+ * Both key and value types should be default-constructable,
+ * copy-constructable and assignable.
+ */
+ template<typename K, typename V>
+ class CacheEntryEvent : public CacheEntry<K, V>
+ {
+ public:
+ /**
+ * Default constructor.
+ *
+ * Creates instance with all fields default-constructed.
+ */
+ CacheEntryEvent() :
+ CacheEntry<K, V>(),
+ oldVal(),
+ hasOldValue(false)
+ {
+ // No-op.
+ }
+
+ /**
+ * Copy constructor.
+ *
+ * @param other Other instance.
+ */
+ CacheEntryEvent(const CacheEntryEvent<K, V>& other) :
+ CacheEntry<K, V>(other),
+ oldVal(other.oldVal),
+ hasOldValue(other.hasOldValue)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~CacheEntryEvent()
+ {
+ // No-op.
+ }
+
+ /**
+ * Assignment operator.
+ *
+ * @param other Other instance.
+ * @return *this.
+ */
+ CacheEntryEvent& operator=(const CacheEntryEvent<K, V>& other)
+ {
+ if (this != &other)
+ {
+ CacheEntry<K, V>::operator=(other);
+
+ oldVal = other.oldVal;
+ hasOldValue = other.hasOldValue;
+ }
+
+ return *this;
+ }
+
+ /**
+ * Get old value.
+ *
+ * @return Old value.
+ */
+ const V& GetOldValue() const
+ {
+ return oldVal;
+ }
+
+ /**
+ * Check if the old value exists.
+ *
+ * @return True, if the old value exists.
+ */
+ bool HasOldValue() const
+ {
+ return hasOldValue;
+ }
+
+ /**
+ * Reads cache event using provided raw reader.
+ *
+ * @param reader Reader to use.
+ */
+ void Read(binary::BinaryRawReader& reader)
+ {
+ this->key = reader.ReadObject<K>();
+
+ this->hasOldValue = reader.TryReadObject(this->oldVal);
+ this->hasValue = reader.TryReadObject(this->val);
+ }
+
+ private:
+ /** Old value. */
+ V oldVal;
+
+ /** Indicates whether old value exists */
+ bool hasOldValue;
+ };
+ }
+}
+
+#endif //_IGNITE_CACHE_EVENT_CACHE_ENTRY_EVENT
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event_listener.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event_listener.h b/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event_listener.h
new file mode 100644
index 0000000..dd8f4a2
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event_listener.h
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::cache::event::CacheEntryEventListener class.
+ */
+
+#ifndef _IGNITE_CACHE_EVENT_CACHE_ENTRY_EVENT_LISTENER
+#define _IGNITE_CACHE_EVENT_CACHE_ENTRY_EVENT_LISTENER
+
+#include <stdint.h>
+
+#include <ignite/cache/event/cache_entry_event.h>
+
+namespace ignite
+{
+ namespace cache
+ {
+ namespace event
+ {
+ /**
+ * Cache entry event listener.
+ */
+ template<typename K, typename V>
+ class CacheEntryEventListener
+ {
+ public:
+ /**
+ * Default constructor.
+ */
+ CacheEntryEventListener()
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ virtual ~CacheEntryEventListener()
+ {
+ // No-op.
+ }
+
+ /**
+ * Event callback.
+ *
+ * @param evts Events.
+ * @param num Events number.
+ */
+ virtual void OnEvent(const CacheEntryEvent<K, V>* evts, uint32_t num) = 0;
+ };
+ }
+ }
+}
+
+#endif //_IGNITE_CACHE_EVENT_CACHE_ENTRY_EVENT_LISTENER
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query.h b/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query.h
new file mode 100644
index 0000000..563b11a
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query.h
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::cache::query::continuous::ContinuousQuery class.
+ */
+
+#ifndef _IGNITE_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY
+#define _IGNITE_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY
+
+#include <ignite/impl/cache/query/continuous/continuous_query_impl.h>
+#include <ignite/cache/event/cache_entry_event_listener.h>
+
+namespace ignite
+{
+ namespace cache
+ {
+ // Forward-declaration.
+ template<typename K, typename V>
+ class IGNITE_IMPORT_EXPORT Cache;
+
+ namespace query
+ {
+ namespace continuous
+ {
+ /**
+ * Continuous query.
+ *
+ * Continuous queries allow to register a remote and a listener
+ * for cache update events. On any update to the related cache
+ * an event is sent to the node that has executed the query and
+ * listener is notified on that node.
+ *
+ * Continuous query can either be executed on the whole topology
+ * or only on local node.
+ *
+ * To execute the query over the cache use method
+ * ignite::cache::Cache::QueryContinuous().
+ */
+ template<typename K, typename V>
+ class ContinuousQuery
+ {
+ friend class Cache<K, V>;
+ public:
+
+ /**
+ * Default value for the buffer size.
+ */
+ enum { DEFAULT_BUFFER_SIZE = 1 };
+
+ /**
+ * Default value for the time interval.
+ */
+ enum { DEFAULT_TIME_INTERVAL = 0 };
+
+ /**
+ * Destructor.
+ */
+ ~ContinuousQuery()
+ {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param lsnr Event listener. Invoked on the node where
+ * continuous query execution has been started.
+ */
+ ContinuousQuery(Reference<event::CacheEntryEventListener<K, V>> lsnr) :
+ impl(new impl::cache::query::continuous::ContinuousQueryImpl<K, V>(lsnr))
+ {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param lsnr Event listener Invoked on the node where
+ * continuous query execution has been started.
+ * @param loc Whether query should be executed locally.
+ */
+ ContinuousQuery(Reference<event::CacheEntryEventListener<K, V>> lsnr, bool loc) :
+ impl(new impl::cache::query::continuous::ContinuousQueryImpl<K, V>(lsnr, loc))
+ {
+ // No-op.
+ }
+
+ /**
+ * Set local flag.
+ *
+ * @param val Value of the flag. If true, query will be
+ * executed only on local node, so only local entries
+ * will be returned as query result.
+ */
+ void SetLocal(bool val)
+ {
+ impl.Get()->SetLocal(val);
+ }
+
+ /**
+ * Get local flag.
+ *
+ * @return Value of the flag. If true, query will be
+ * executed only on local node, so only local entries
+ * will be returned as query result.
+ */
+ bool GetLocal() const
+ {
+ return impl.Get()->GetLocal();
+ }
+
+ /**
+ * Set buffer size.
+ *
+ * When a cache update happens, entry is first
+ * put into a buffer. Entries from buffer will be sent to
+ * the master node only if the buffer is full or time
+ * provided via timeInterval is exceeded.
+ *
+ * @param val Buffer size.
+ */
+ void SetBufferSize(int32_t val)
+ {
+ impl.Get()->SetBufferSize(val);
+ }
+
+ /**
+ * Get buffer size.
+ *
+ * When a cache update happens, entry is first
+ * put into a buffer. Entries from buffer will be sent to
+ * the master node only if the buffer is full or time
+ * provided via timeInterval is exceeded.
+ *
+ * @return Buffer size.
+ */
+ int32_t GetBufferSize() const
+ {
+ return impl.Get()->GetBufferSize();
+ }
+
+ /**
+ * Set time interval.
+ *
+ * When a cache update happens, entry is first put into
+ * a buffer. Entries from buffer are sent to the master node
+ * only if the buffer is full (its size can be changed via
+ * SetBufferSize) or time provided via this method is
+ * exceeded.
+ *
+ * Default value is DEFAULT_TIME_INTERVAL, i.e. 0, which
+ * means that time check is disabled and entries will be
+ * sent only when buffer is full.
+ *
+ * @param val Time interval in miliseconds.
+ */
+ void SetTimeInterval(int64_t val)
+ {
+ impl.Get()->SetTimeInterval(val);
+ }
+
+ /**
+ * Get time interval.
+ *
+ * When a cache update happens, entry is first put into
+ * a buffer. Entries from buffer are sent to the master node
+ * only if the buffer is full (its size can be changed via
+ * SetBufferSize) or time provided via SetTimeInterval
+ * method is exceeded.
+ *
+ * Default value is DEFAULT_TIME_INTERVAL, i.e. 0, which
+ * means that time check is disabled and entries will be
+ * sent only when buffer is full.
+ *
+ * @return Time interval.
+ */
+ int64_t GetTimeInterval() const
+ {
+ return impl.Get()->GetTimeInterval();
+ }
+
+ /**
+ * Set cache entry event listener.
+ *
+ * @param val Cache entry event listener. Invoked on the
+ * node where continuous query execution has been
+ * started.
+ */
+ void SetListener(Reference<event::CacheEntryEventListener<K, V>> lsnr)
+ {
+ impl.Get()->SetListener(val);
+ }
+
+ /**
+ * Get cache entry event listener.
+ *
+ * @return Cache entry event listener.
+ */
+ const event::CacheEntryEventListener<K, V>& GetListener() const
+ {
+ return impl.Get()->GetListener();
+ }
+
+ /**
+ * Get cache entry event listener.
+ *
+ * @return Cache entry event listener.
+ */
+ event::CacheEntryEventListener<K, V>& GetListener()
+ {
+ return impl.Get()->GetListener();
+ }
+
+ private:
+ /** Implementation. */
+ common::concurrent::SharedPointer<impl::cache::query::continuous::ContinuousQueryImpl<K, V>> impl;
+ };
+ }
+ }
+ }
+}
+
+#endif //_IGNITE_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query_handle.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query_handle.h b/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query_handle.h
new file mode 100644
index 0000000..bbefbcc
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query_handle.h
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::cache::query::continuous::ContinuousQueryHandle class.
+ */
+
+#ifndef _IGNITE_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE
+#define _IGNITE_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE
+
+#include <ignite/impl/cache/query/continuous/continuous_query_handle_impl.h>
+
+namespace ignite
+{
+ namespace cache
+ {
+ namespace query
+ {
+ namespace continuous
+ {
+ /**
+ * Continuous query handle.
+ */
+ template<typename K, typename V>
+ class ContinuousQueryHandle
+ {
+ public:
+ /**
+ * Default constructor.
+ */
+ ContinuousQueryHandle() :
+ impl()
+ {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * Internal method. Should not be used by user.
+ *
+ * @param impl Implementation.
+ */
+ ContinuousQueryHandle(impl::cache::query::continuous::ContinuousQueryHandleImpl* impl) :
+ impl(impl)
+ {
+ // No-op.
+ }
+
+ /**
+ * Gets the cursor for initial query.
+ * Can be called only once, throws IgniteError on consequent
+ * calls.
+ *
+ * @return Initial query cursor.
+ */
+ QueryCursor<K, V> GetInitialQueryCursor()
+ {
+ IgniteError err;
+
+ QueryCursor<K, V> res = GetInitialQueryCursor(err);
+
+ IgniteError::ThrowIfNeeded(err);
+
+ return res;
+ }
+
+ /**
+ * Gets the cursor for initial query.
+ * Can be called only once, results in error on consequent
+ * calls.
+ *
+ * @param err Error.
+ * @return Initial query cursor.
+ */
+ QueryCursor<K, V> GetInitialQueryCursor(IgniteError& err)
+ {
+ impl::cache::query::continuous::ContinuousQueryHandleImpl* impl0 = impl.Get();
+
+ if (impl0)
+ return QueryCursor<K, V>(impl0->GetInitialQueryCursor(err));
+ else
+ {
+ err = IgniteError(IgniteError::IGNITE_ERR_GENERIC,
+ "Instance is not usable (did you check for error?).");
+
+ return QueryCursor<K, V>();
+ }
+ }
+
+ /**
+ * Check if the instance is valid.
+ *
+ * Invalid instance can be returned if some of the previous
+ * operations have resulted in a failure. For example invalid
+ * instance can be returned by not-throwing version of method
+ * in case of error. Invalid instances also often can be
+ * created using default constructor.
+ *
+ * @return True if the instance is valid and can be used.
+ */
+ bool IsValid() const
+ {
+ return impl.IsValid();
+ }
+
+ private:
+ typedef impl::cache::query::continuous::ContinuousQueryHandleImpl ContinuousQueryHandleImpl;
+
+ /** Implementation delegate. */
+ common::concurrent::SharedPointer<ContinuousQueryHandleImpl> impl;
+ };
+ }
+ }
+ }
+}
+
+#endif //_IGNITE_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h b/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h
index 3e0f177..535e3ec 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h
@@ -22,7 +22,10 @@
#include <ignite/cache/query/query_sql.h>
#include <ignite/cache/query/query_text.h>
#include <ignite/cache/query/query_sql_fields.h>
+#include <ignite/cache/query/continuous/continuous_query_handle.h>
#include <ignite/impl/cache/query/query_impl.h>
+#include <ignite/impl/cache/query/continuous/continuous_query_handle_impl.h>
+#include <ignite/impl/cache/query/continuous/continuous_query_impl.h>
#include <ignite/impl/interop/interop_target.h>
@@ -309,12 +312,59 @@ namespace ignite
* @return Query cursor.
*/
query::QueryCursorImpl* QuerySqlFields(const ignite::cache::query::SqlFieldsQuery& qry, IgniteError* err);
-
+
+ /**
+ * Start continuous query execution.
+ *
+ * @param qry Continuous query.
+ * @param err Error.
+ * @return Continuous query handle.
+ */
+ query::continuous::ContinuousQueryHandleImpl* QueryContinuous(
+ const common::concurrent::SharedPointer<query::continuous::ContinuousQueryImplBase> qry,
+ IgniteError& err);
+
+ /**
+ * Start continuous query execution with initial query.
+ *
+ * @param qry Continuous query.
+ * @param initialQry Initial query.
+ * @param err Error.
+ * @return Continuous query handle.
+ */
+ query::continuous::ContinuousQueryHandleImpl* QueryContinuous(
+ const common::concurrent::SharedPointer<query::continuous::ContinuousQueryImplBase> qry,
+ const ignite::cache::query::SqlQuery& initialQry, IgniteError& err);
+
+ /**
+ * Start continuous query execution with initial query.
+ *
+ * @param qry Continuous query.
+ * @param initialQry Initial query.
+ * @param err Error.
+ * @return Continuous query handle.
+ */
+ query::continuous::ContinuousQueryHandleImpl* QueryContinuous(
+ const common::concurrent::SharedPointer<query::continuous::ContinuousQueryImplBase> qry,
+ const ignite::cache::query::TextQuery& initialQry, IgniteError& err);
+
+ /**
+ * Start continuous query execution with initial query.
+ *
+ * @param qry Continuous query.
+ * @param initialQry Initial query.
+ * @param err Error.
+ * @return Continuous query handle.
+ */
+ query::continuous::ContinuousQueryHandleImpl* QueryContinuous(
+ const common::concurrent::SharedPointer<query::continuous::ContinuousQueryImplBase> qry,
+ const ignite::cache::query::ScanQuery& initialQry, IgniteError& err);
+
private:
+ IGNITE_NO_COPY_ASSIGNMENT(CacheImpl)
+
/** Name. */
char* name;
-
- IGNITE_NO_COPY_ASSIGNMENT(CacheImpl)
/**
* Internal query execution routine.
@@ -346,11 +396,67 @@ namespace ignite
if (jniErr.code == ignite::java::IGNITE_JNI_ERR_SUCCESS)
return new query::QueryCursorImpl(GetEnvironmentPointer(), qryJavaRef);
else
- return NULL;
+ return 0;
+ }
+
+ /**
+ * Start continuous query execution with the initial query.
+ *
+ * @param qry Continuous query.
+ * @param initialQry Initial query to be executed.
+ * @param err Error.
+ * @return Continuous query handle.
+ */
+ template<typename T>
+ query::continuous::ContinuousQueryHandleImpl* QueryContinuous(
+ const common::concurrent::SharedPointer<query::continuous::ContinuousQueryImplBase> qry,
+ const T& initialQry, int32_t typ, int32_t cmd, IgniteError& err)
+ {
+ jni::java::JniErrorInfo jniErr;
+
+ common::concurrent::SharedPointer<interop::InteropMemory> mem = GetEnvironment().AllocateMemory();
+ interop::InteropMemory* mem0 = mem.Get();
+ interop::InteropOutputStream out(mem0);
+ binary::BinaryWriterImpl writer(&out, GetEnvironment().GetTypeManager());
+ ignite::binary::BinaryRawWriter rawWriter(&writer);
+
+ const query::continuous::ContinuousQueryImplBase& qry0 = *qry.Get();
+
+ int64_t handle = GetEnvironment().GetHandleRegistry().Allocate(qry);
+
+ rawWriter.WriteInt64(handle);
+ rawWriter.WriteBool(qry0.GetLocal());
+
+ // Filters are not supported for now.
+ rawWriter.WriteBool(false);
+ rawWriter.WriteNull();
+
+ rawWriter.WriteInt32(qry0.GetBufferSize());
+ rawWriter.WriteInt64(qry0.GetTimeInterval());
+
+ // Autounsubscribe is a filter feature.
+ rawWriter.WriteBool(false);
+
+ // Writing initial query. When there is not initial query writing -1.
+ rawWriter.WriteInt32(typ);
+ if (typ != -1)
+ initialQry.Write(rawWriter);
+
+ out.Synchronize();
+
+ jobject qryJavaRef = GetEnvironment().Context()->CacheOutOpContinuousQuery(GetTarget(),
+ cmd, mem.Get()->PointerLong(), &jniErr);
+
+ IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, &err);
+
+ if (jniErr.code == java::IGNITE_JNI_ERR_SUCCESS)
+ return new query::continuous::ContinuousQueryHandleImpl(GetEnvironmentPointer(), handle, qryJavaRef);
+
+ return 0;
}
};
}
}
}
-#endif
\ No newline at end of file
+#endif
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_handle_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_handle_impl.h b/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_handle_impl.h
new file mode 100644
index 0000000..75504b1
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_handle_impl.h
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::impl::cache::query::continuous::ContinuousQueryHandleImpl class.
+ */
+
+#ifndef _IGNITE_IMPL_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE_IMPL
+#define _IGNITE_IMPL_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE_IMPL
+
+#include "ignite/cache/query/query_cursor.h"
+#include "ignite/impl/cache/query/continuous/continuous_query_impl.h"
+
+namespace ignite
+{
+ namespace impl
+ {
+ namespace cache
+ {
+ namespace query
+ {
+ namespace continuous
+ {
+ /**
+ * Continuous query handle implementation.
+ */
+ class IGNITE_IMPORT_EXPORT ContinuousQueryHandleImpl
+ {
+ typedef common::concurrent::SharedPointer<IgniteEnvironment> SP_IgniteEnvironment;
+ typedef common::concurrent::SharedPointer<ContinuousQueryImplBase> SP_ContinuousQueryImplBase;
+ public:
+ /**
+ * Default constructor.
+ *
+ * @param env Environment.
+ * @param javaRef Java reference.
+ */
+ ContinuousQueryHandleImpl(SP_IgniteEnvironment env, int64_t handle, jobject javaRef);
+
+ /**
+ * Destructor.
+ */
+ ~ContinuousQueryHandleImpl();
+
+ /**
+ * Gets the cursor for initial query.
+ * Can be called only once, throws exception on consequent calls.
+ *
+ * @param err Error.
+ * @return Initial query cursor.
+ */
+ QueryCursorImpl* GetInitialQueryCursor(IgniteError& err);
+
+ /**
+ * Set query to keep pointer to.
+ *
+ * @param query Query.
+ */
+ void SetQuery(SP_ContinuousQueryImplBase query);
+
+ private:
+ /** Environment. */
+ SP_IgniteEnvironment env;
+
+ /** Local handle for handle registry. */
+ int64_t handle;
+
+ /** Handle to Java object. */
+ jobject javaRef;
+
+ /** Shared pointer to query. Kept for query to live long enough. */
+ SP_ContinuousQueryImplBase qry;
+
+ /** Mutex. */
+ common::concurrent::CriticalSection mutex;
+
+ /** Cursor extracted. */
+ bool extracted;
+ };
+ }
+ }
+ }
+ }
+}
+
+#endif //_IGNITE_IMPL_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE_IMPL
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h b/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h
new file mode 100644
index 0000000..50ced12
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::impl::cache::query::continuous::ContinuousQueryImpl class.
+ */
+
+#ifndef _IGNITE_IMPL_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_IMPL
+#define _IGNITE_IMPL_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_IMPL
+
+#include <stdint.h>
+
+#include <ignite/reference.h>
+
+#include <ignite/cache/event/cache_entry_event_listener.h>
+#include <ignite/binary/binary_raw_reader.h>
+
+namespace ignite
+{
+ namespace impl
+ {
+ namespace cache
+ {
+ namespace query
+ {
+ namespace continuous
+ {
+ /**
+ * Continuous query base implementation class.
+ *
+ * Continuous queries allow to register a remote and a listener
+ * for cache update events. On any update to the related cache
+ * an event is sent to the node that has executed the query and
+ * listener is notified on that node.
+ *
+ * Continuous query can either be executed on the whole topology
+ * or only on local node.
+ *
+ * To execute the query over the cache use method
+ * ignite::cache::Cache::QueryContinuous().
+ */
+ class ContinuousQueryImplBase
+ {
+ public:
+ /**
+ * Destructor.
+ */
+ virtual ~ContinuousQueryImplBase()
+ {
+ // No-op.
+ }
+
+ /**
+ * Default value for the buffer size.
+ */
+ enum { DEFAULT_BUFFER_SIZE = 1 };
+
+ /**
+ * Default value for the time interval.
+ */
+ enum { DEFAULT_TIME_INTERVAL = 0 };
+
+ /**
+ * Constructor.
+ *
+ * @param loc Whether query should be executed locally.
+ */
+ explicit ContinuousQueryImplBase(bool loc) :
+ local(loc),
+ bufferSize(DEFAULT_BUFFER_SIZE),
+ timeInterval(DEFAULT_TIME_INTERVAL)
+ {
+ // No-op.
+ }
+
+ /**
+ * Set local flag.
+ *
+ * @param val Value of the flag. If true, query will be
+ * executed only on local node, so only local entries
+ * will be returned as query result.
+ */
+ void SetLocal(bool val)
+ {
+ local = val;
+ }
+
+ /**
+ * Get local flag.
+ *
+ * @return Value of the flag. If true, query will be
+ * executed only on local node, so only local entries
+ * will be returned as query result.
+ */
+ bool GetLocal() const
+ {
+ return local;
+ }
+
+ /**
+ * Set buffer size.
+ *
+ * When a cache update happens, entry is first
+ * put into a buffer. Entries from buffer will be sent to
+ * the master node only if the buffer is full or time
+ * provided via timeInterval is exceeded.
+ *
+ * @param val Buffer size.
+ */
+ void SetBufferSize(int32_t val)
+ {
+ bufferSize = val;
+ }
+
+ /**
+ * Get buffer size.
+ *
+ * When a cache update happens, entry is first
+ * put into a buffer. Entries from buffer will be sent to
+ * the master node only if the buffer is full or time
+ * provided via timeInterval is exceeded.
+ *
+ * @return Buffer size.
+ */
+ int32_t GetBufferSize() const
+ {
+ return bufferSize;
+ }
+
+ /**
+ * Set time interval.
+ *
+ * When a cache update happens, entry is first put into
+ * a buffer. Entries from buffer are sent to the master node
+ * only if the buffer is full (its size can be changed via
+ * SetBufferSize) or time provided via this method is
+ * exceeded.
+ *
+ * Default value is DEFAULT_TIME_INTERVAL, i.e. 0, which
+ * means that time check is disabled and entries will be
+ * sent only when buffer is full.
+ *
+ * @param val Time interval in miliseconds.
+ */
+ void SetTimeInterval(int64_t val)
+ {
+ timeInterval = val;
+ }
+
+ /**
+ * Get time interval.
+ *
+ * When a cache update happens, entry is first put into
+ * a buffer. Entries from buffer are sent to the master node
+ * only if the buffer is full (its size can be changed via
+ * SetBufferSize) or time provided via SetTimeInterval
+ * method is exceeded.
+ *
+ * Default value is DEFAULT_TIME_INTERVAL, i.e. 0, which
+ * means that time check is disabled and entries will be
+ * sent only when buffer is full.
+ *
+ * @return Time interval.
+ */
+ int64_t GetTimeInterval() const
+ {
+ return timeInterval;
+ }
+
+ /**
+ * Callback that reads and processes cache events.
+ *
+ * @param reader Reader to use.
+ */
+ virtual void ReadAndProcessEvents(ignite::binary::BinaryRawReader& reader) = 0;
+
+ private:
+ /**
+ * Local flag. When set query will be executed only on local
+ * node, so only local entries will be returned as query
+ * result.
+ *
+ * Default value is false.
+ */
+ bool local;
+
+ /**
+ * Buffer size. When a cache update happens, entry is first
+ * put into a buffer. Entries from buffer will be sent to
+ * the master node only if the buffer is full or time
+ * provided via timeInterval is exceeded.
+ *
+ * Default value is DEFAULT_BUFFER_SIZE.
+ */
+ int32_t bufferSize;
+
+ /**
+ * Time interval in miliseconds. When a cache update
+ * happens, entry is first put into a buffer. Entries from
+ * buffer will be sent to the master node only if the buffer
+ * is full (its size can be changed via SetBufferSize) or
+ * time provided via SetTimeInterval method is exceeded.
+ *
+ * Default value is DEFAULT_TIME_INTERVAL, i.e. 0, which
+ * means that time check is disabled and entries will be
+ * sent only when buffer is full.
+ */
+ int64_t timeInterval;
+ };
+
+ /**
+ * Continuous query implementation.
+ *
+ * Continuous queries allow to register a remote and a listener
+ * for cache update events. On any update to the related cache
+ * an event is sent to the node that has executed the query and
+ * listener is notified on that node.
+ *
+ * Continuous query can either be executed on the whole topology
+ * or only on local node.
+ *
+ * To execute the query over the cache use method
+ * ignite::cache::Cache::QueryContinuous().
+ */
+ template<typename K, typename V>
+ class ContinuousQueryImpl : public ContinuousQueryImplBase
+ {
+ public:
+ /**
+ * Destructor.
+ */
+ virtual ~ContinuousQueryImpl()
+ {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param lsnr Event listener. Invoked on the node where
+ * continuous query execution has been started.
+ */
+ ContinuousQueryImpl(Reference<ignite::cache::event::CacheEntryEventListener<K, V>>& lsnr) :
+ ContinuousQueryImplBase(false),
+ lsnr(lsnr)
+ {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param lsnr Event listener Invoked on the node where
+ * continuous query execution has been started.
+ * @param loc Whether query should be executed locally.
+ */
+ ContinuousQueryImpl(Reference<ignite::cache::event::CacheEntryEventListener<K, V>>& lsnr, bool loc) :
+ ContinuousQueryImplBase(loc),
+ lsnr(lsnr)
+ {
+ // No-op.
+ }
+
+ /**
+ * Set cache entry event listener.
+ *
+ * @param val Cache entry event listener. Invoked on the
+ * node where continuous query execution has been
+ * started.
+ */
+ void SetListener(Reference<ignite::cache::event::CacheEntryEventListener<K, V>>& val)
+ {
+ lsnr = val;
+ }
+
+ /**
+ * Check if the query has listener.
+ *
+ * @return True if the query has listener.
+ */
+ bool HasListener() const
+ {
+ return !lsnr.IsNull();
+ }
+
+ /**
+ * Get cache entry event listener.
+ *
+ * @return Cache entry event listener.
+ */
+ const ignite::cache::event::CacheEntryEventListener<K, V>& GetListener() const
+ {
+ return lsnr.Get();
+ }
+
+ /**
+ * Get cache entry event listener.
+ *
+ * @return Cache entry event listener.
+ */
+ ignite::cache::event::CacheEntryEventListener<K, V>& GetListener()
+ {
+ return lsnr.Get();
+ }
+
+ /**
+ * Callback that reads and processes cache events.
+ *
+ * @param reader Reader to use.
+ */
+ virtual void ReadAndProcessEvents(ignite::binary::BinaryRawReader& reader)
+ {
+ // Number of events.
+ int32_t cnt = reader.ReadInt32();
+
+ // Storing events here.
+ std::vector< ignite::cache::CacheEntryEvent<K, V> > events;
+ events.resize(cnt);
+
+ for (int32_t i = 0; i < cnt; ++i)
+ events[i].Read(reader);
+
+ lsnr.Get().OnEvent(events.data(), cnt);
+ }
+
+ private:
+ /** Cache entry event listener. */
+ Reference<ignite::cache::event::CacheEntryEventListener<K, V>> lsnr;
+ };
+ }
+ }
+ }
+ }
+}
+
+#endif //_IGNITE_IMPL_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_IMPL
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/impl/handle_registry.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/handle_registry.h b/modules/platforms/cpp/core/include/ignite/impl/handle_registry.h
index 107042a..3c4d123 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/handle_registry.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/handle_registry.h
@@ -28,21 +28,9 @@ namespace ignite
namespace impl
{
/**
- * Something what can be registered inside handle registry.
- */
- class IGNITE_IMPORT_EXPORT HandleRegistryEntry
- {
- public:
- /**
- * Destructor.
- */
- virtual ~HandleRegistryEntry();
- };
-
- /**
* Handle registry segment containing thread-specific data for slow-path access.
*/
- class IGNITE_IMPORT_EXPORT HandleRegistrySegment
+ class HandleRegistrySegment
{
public:
/**
@@ -61,7 +49,7 @@ namespace ignite
* @param hnd Handle.
* @return Associated entry or NULL if it doesn't exists.
*/
- ignite::common::concurrent::SharedPointer<HandleRegistryEntry> Get(int64_t hnd);
+ common::concurrent::SharedPointer<void> Get(int64_t hnd);
/**
* Put entry into segment.
@@ -69,14 +57,14 @@ namespace ignite
* @param hnd Handle.
* @param entry Associated entry (cannot be NULL).
*/
- void Put(int64_t hnd, const ignite::common::concurrent::SharedPointer<HandleRegistryEntry>& entry);
+ void Put(int64_t hnd, const common::concurrent::SharedPointer<void>& entry);
/**
* Remove entry from the segment.
*
* @param hnd Handle.
*/
- void Remove(int64_t hnd);
+ void Remove(int64_t hnd);
/**
* Clear all entries from the segment.
@@ -84,10 +72,10 @@ namespace ignite
void Clear();
private:
/** Map with data. */
- std::map<int64_t, ignite::common::concurrent::SharedPointer<HandleRegistryEntry>>* map;
+ std::map<int64_t, common::concurrent::SharedPointer<void>> map;
/** Mutex. */
- ignite::common::concurrent::CriticalSection* mux;
+ common::concurrent::CriticalSection mux;
IGNITE_NO_COPY_ASSIGNMENT(HandleRegistrySegment);
};
@@ -102,7 +90,7 @@ namespace ignite
* Constructor.
*
* @param fastCap Fast-path capacity.
- * @param segmentCnt Slow-path segments count.
+ * @param slowSegmentCnt Slow-path segments count.
*/
HandleRegistry(int32_t fastCap, int32_t slowSegmentCnt);
@@ -117,7 +105,7 @@ namespace ignite
* @param target Target.
* @return Handle.
*/
- int64_t Allocate(const ignite::common::concurrent::SharedPointer<HandleRegistryEntry>& target);
+ int64_t Allocate(const common::concurrent::SharedPointer<void>& target);
/**
* Allocate handle in critical mode.
@@ -125,7 +113,7 @@ namespace ignite
* @param target Target.
* @return Handle.
*/
- int64_t AllocateCritical(const ignite::common::concurrent::SharedPointer<HandleRegistryEntry>& target);
+ int64_t AllocateCritical(const common::concurrent::SharedPointer<void>& target);
/**
* Allocate handle in safe mode.
@@ -133,7 +121,7 @@ namespace ignite
* @param target Target.
* @return Handle.
*/
- int64_t AllocateSafe(const ignite::common::concurrent::SharedPointer<HandleRegistryEntry>& target);
+ int64_t AllocateSafe(const common::concurrent::SharedPointer<void>& target);
/**
* Allocate handle in critical and safe modes.
@@ -141,7 +129,7 @@ namespace ignite
* @param target Target.
* @return Handle.
*/
- int64_t AllocateCriticalSafe(const ignite::common::concurrent::SharedPointer<HandleRegistryEntry>& target);
+ int64_t AllocateCriticalSafe(const common::concurrent::SharedPointer<void>& target);
/**
* Release handle.
@@ -154,35 +142,36 @@ namespace ignite
* Get target.
*
* @param hnd Handle.
- * @param Target.
+ * @return Target.
*/
- ignite::common::concurrent::SharedPointer<HandleRegistryEntry> Get(int64_t hnd);
+ common::concurrent::SharedPointer<void> Get(int64_t hnd);
/**
* Close the registry.
*/
void Close();
+
private:
/** Fast-path container capacity. */
- int32_t fastCap;
+ int32_t fastCap;
/** Fast-path counter. */
- int32_t fastCtr;
+ int32_t fastCtr;
/** Fast-path container. */
- ignite::common::concurrent::SharedPointer<HandleRegistryEntry>* fast;
+ common::concurrent::SharedPointer<void>* fast;
/** Amount of slow-path segments. */
- int32_t slowSegmentCnt;
+ int32_t slowSegmentCnt;
/** Slow-path counter. */
- int64_t slowCtr;
-
+ int64_t slowCtr;
+
/** Slow-path segments. */
- HandleRegistrySegment** slow;
+ HandleRegistrySegment** slow;
/** Close flag. */
- int32_t closed;
+ int32_t closed;
IGNITE_NO_COPY_ASSIGNMENT(HandleRegistry);
@@ -190,11 +179,10 @@ namespace ignite
* Internal allocation routine.
*
* @param target Target.
- * @param Critical mode flag.
- * @param Safe mode flag.
+ * @param critical mode flag.
+ * @param safe mode flag.
*/
- int64_t Allocate0(const ignite::common::concurrent::SharedPointer<HandleRegistryEntry>& target,
- bool critical, bool safe);
+ int64_t Allocate0(const common::concurrent::SharedPointer<void>& target, bool critical, bool safe);
};
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h b/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
index fb6f657..2b2a117 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
@@ -20,14 +20,15 @@
#include <ignite/common/concurrent.h>
#include <ignite/jni/java.h>
+#include <ignite/jni/utils.h>
#include "ignite/impl/interop/interop_memory.h"
#include "ignite/impl/binary/binary_type_manager.h"
-#include "ignite/jni/utils.h"
+#include "ignite/impl/handle_registry.h"
-namespace ignite
+namespace ignite
{
- namespace impl
+ namespace impl
{
/**
* Defines environment in which Ignite operates.
@@ -41,6 +42,16 @@ namespace ignite
enum { DEFAULT_ALLOCATION_SIZE = 1024 };
/**
+ * Default fast path handle registry containers capasity.
+ */
+ enum { DEFAULT_FAST_PATH_CONTAINERS_CAP = 1024 };
+
+ /**
+ * Default slow path handle registry containers capasity.
+ */
+ enum { DEFAULT_SLOW_PATH_CONTAINERS_CAP = 16 };
+
+ /**
* Default constructor.
*/
IgniteEnvironment();
@@ -78,6 +89,13 @@ namespace ignite
void OnStartCallback(long long memPtr, jobject proc);
/**
+ * Continuous query listener apply callback.
+ *
+ * @param mem Memory with data.
+ */
+ void OnContinuousQueryListenerApply(common::concurrent::SharedPointer<interop::InteropMemory>& mem);
+
+ /**
* Get name of Ignite instance.
*
* @return Name.
@@ -133,6 +151,13 @@ namespace ignite
*/
void ProcessorReleaseStart();
+ /**
+ * Get handle registry.
+ *
+ * @return Handle registry.
+ */
+ HandleRegistry& GetHandleRegistry();
+
private:
/** Context to access Java. */
common::concurrent::SharedPointer<jni::java::JniContext> ctx;
@@ -152,6 +177,9 @@ namespace ignite
/** Type updater. */
binary::BinaryTypeUpdater* metaUpdater;
+ /** Handle registry. */
+ HandleRegistry registry;
+
IGNITE_NO_COPY_ASSIGNMENT(IgniteEnvironment);
};
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/namespaces.dox
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/namespaces.dox b/modules/platforms/cpp/core/namespaces.dox
index 0f5f11f..49379e6 100644
--- a/modules/platforms/cpp/core/namespaces.dox
+++ b/modules/platforms/cpp/core/namespaces.dox
@@ -22,40 +22,54 @@
* computing and transacting on large-scale data sets in real-time, orders of magnitude faster than possible with
* traditional disk-based or flash-based technologies.
*/
-
+
/**
* Apache %Ignite API.
*/
namespace ignite
{
- /**
- * %Ignite Binary Objects API.
- */
- namespace binary
- {
- // Empty.
- }
+ /**
+ * %Ignite Binary Objects API.
+ */
+ namespace binary
+ {
+ // Empty.
+ }
+
+ /**
+ * %Ignite %Transaction API.
+ */
+ namespace transactions
+ {
+ // Empty.
+ }
- /**
- * %Ignite %Transaction API.
- */
- namespace transactions
- {
- // Empty.
- }
-
- /**
- * %Ignite %Cache API.
- */
- namespace cache
- {
- /**
- * Contains APIs for creating and executing cache queries.
- */
- namespace query
- {
- // Empty.
- }
- }
+ /**
+ * %Ignite %Cache API.
+ */
+ namespace cache
+ {
+ /**
+ * Contains APIs for cache events.
+ */
+ namespace event
+ {
+ // Empty.
+ }
+
+ /**
+ * Contains APIs for creating and executing cache queries.
+ */
+ namespace query
+ {
+ /**
+ * Contains APIs for continuous queries.
+ */
+ namespace continuous
+ {
+ // Empty.
+ }
+ }
+ }
}
-
+
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/project/vs/core.vcxproj
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj b/modules/platforms/cpp/core/project/vs/core.vcxproj
index 6320323..89a2dff 100644
--- a/modules/platforms/cpp/core/project/vs/core.vcxproj
+++ b/modules/platforms/cpp/core/project/vs/core.vcxproj
@@ -193,6 +193,10 @@
<ClInclude Include="..\..\include\ignite\cache\cache.h" />
<ClInclude Include="..\..\include\ignite\cache\cache_entry.h" />
<ClInclude Include="..\..\include\ignite\cache\cache_peek_mode.h" />
+ <ClInclude Include="..\..\include\ignite\cache\event\cache_entry_event.h" />
+ <ClInclude Include="..\..\include\ignite\cache\event\cache_entry_event_listener.h" />
+ <ClInclude Include="..\..\include\ignite\cache\query\continuous\continuous_query.h" />
+ <ClInclude Include="..\..\include\ignite\cache\query\continuous\continuous_query_handle.h" />
<ClInclude Include="..\..\include\ignite\cache\query\query.h" />
<ClInclude Include="..\..\include\ignite\cache\query\query_argument.h" />
<ClInclude Include="..\..\include\ignite\cache\query\query_cursor.h" />
@@ -208,6 +212,8 @@
<ClInclude Include="..\..\include\ignite\impl\binary\binary_type_updater_impl.h" />
<ClInclude Include="..\..\include\ignite\impl\cache\cache_impl.h" />
<ClInclude Include="..\..\include\ignite\impl\cache\query\query_batch.h" />
+ <ClInclude Include="..\..\include\ignite\impl\cache\query\continuous\continuous_query_handle_impl.h" />
+ <ClInclude Include="..\..\include\ignite\impl\cache\query\continuous\continuous_query_impl.h" />
<ClInclude Include="..\..\include\ignite\impl\cache\query\query_fields_row_impl.h" />
<ClInclude Include="..\..\include\ignite\impl\cache\query\query_impl.h" />
<ClInclude Include="..\..\include\ignite\impl\ignite_environment.h" />
@@ -229,6 +235,7 @@
<ClCompile Include="..\..\src\impl\binary\binary_type_updater_impl.cpp" />
<ClCompile Include="..\..\src\impl\cache\cache_impl.cpp" />
<ClCompile Include="..\..\src\impl\cache\query\query_batch.cpp" />
+ <ClCompile Include="..\..\src\impl\cache\query\continuous\continuous_query_handle_impl.cpp" />
<ClCompile Include="..\..\src\impl\cache\query\query_impl.cpp" />
<ClCompile Include="..\..\src\impl\ignite_environment.cpp" />
<ClCompile Include="..\..\src\impl\ignite_impl.cpp" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
index c5fb532..9cb5f78 100644
--- a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
+++ b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
@@ -46,6 +46,9 @@
<ClCompile Include="..\..\src\impl\cache\query\query_batch.cpp">
<Filter>Code\impl\cache\query</Filter>
</ClCompile>
+ <ClCompile Include="..\..\src\impl\cache\query\continuous\continuous_query_handle_impl.cpp">
+ <Filter>Code\impl\cache\query\continuous</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\include\ignite\impl\cache\cache_impl.h">
@@ -144,6 +147,24 @@
<ClInclude Include="..\..\include\ignite\impl\cache\query\query_batch.h">
<Filter>Code\impl\cache\query</Filter>
</ClInclude>
+ <ClInclude Include="..\..\include\ignite\cache\query\continuous\continuous_query_handle.h">
+ <Filter>Code\cache\query\continuous</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\include\ignite\cache\query\continuous\continuous_query.h">
+ <Filter>Code\cache\query\continuous</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\include\ignite\impl\cache\query\continuous\continuous_query_handle_impl.h">
+ <Filter>Code\impl\cache\query\continuous</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\include\ignite\cache\event\cache_entry_event_listener.h">
+ <Filter>Code\cache\event</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\include\ignite\cache\event\cache_entry_event.h">
+ <Filter>Code\cache\event</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\include\ignite\impl\cache\query\continuous\continuous_query_impl.h">
+ <Filter>Code\impl\cache\query\continuous</Filter>
+ </ClInclude>
</ItemGroup>
<ItemGroup>
<Filter Include="Code">
@@ -176,5 +197,14 @@
<Filter Include="Code\transactions">
<UniqueIdentifier>{146fe661-0ad3-4d51-83a3-fce8a897e84d}</UniqueIdentifier>
</Filter>
+ <Filter Include="Code\cache\query\continuous">
+ <UniqueIdentifier>{2056dfc8-4ced-4658-b2b7-a8c81a7ef797}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="Code\impl\cache\query\continuous">
+ <UniqueIdentifier>{d633f819-7b30-4e26-81ec-f708d1c1ff8e}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="Code\cache\event">
+ <UniqueIdentifier>{e03c3690-ff22-4c78-83a0-b77cebb7f980}</UniqueIdentifier>
+ </Filter>
</ItemGroup>
</Project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp b/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
index 0630921..ad69d45 100644
--- a/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
+++ b/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
@@ -26,9 +26,11 @@ using namespace ignite::java;
using namespace ignite::common;
using namespace ignite::cache;
using namespace ignite::cache::query;
+using namespace ignite::cache::query::continuous;
using namespace ignite::impl;
using namespace ignite::impl::binary;
using namespace ignite::impl::cache::query;
+using namespace ignite::impl::cache::query::continuous;
using namespace ignite::impl::interop;
using namespace ignite::binary;
@@ -89,6 +91,9 @@ namespace ignite
/** Operation: PutIfAbsent. */
const int32_t OP_PUT_IF_ABSENT = 28;
+ /** Operation: CONTINUOUS query. */
+ const int32_t OP_QRY_CONTINUOUS = 29;
+
/** Operation: SCAN query. */
const int32_t OP_QRY_SCAN = 30;
@@ -301,6 +306,32 @@ namespace ignite
{
return QueryInternal(qry, OP_QRY_SQL_FIELDS, err);
}
+
+ ContinuousQueryHandleImpl* CacheImpl::QueryContinuous(const SharedPointer<ContinuousQueryImplBase> qry,
+ const SqlQuery& initialQry, IgniteError& err)
+ {
+ return QueryContinuous(qry, initialQry, OP_QRY_SQL, OP_QRY_CONTINUOUS, err);
+ }
+
+ ContinuousQueryHandleImpl* CacheImpl::QueryContinuous(const SharedPointer<ContinuousQueryImplBase> qry,
+ const TextQuery& initialQry, IgniteError& err)
+ {
+ return QueryContinuous(qry, initialQry, OP_QRY_TEXT, OP_QRY_CONTINUOUS, err);
+ }
+
+ ContinuousQueryHandleImpl* CacheImpl::QueryContinuous(const SharedPointer<ContinuousQueryImplBase> qry,
+ const ScanQuery& initialQry, IgniteError& err)
+ {
+ return QueryContinuous(qry, initialQry, OP_QRY_SCAN, OP_QRY_CONTINUOUS, err);
+ }
+
+ ContinuousQueryHandleImpl* CacheImpl::QueryContinuous(const SharedPointer<ContinuousQueryImplBase> qry,
+ IgniteError& err)
+ {
+ struct { void Write(BinaryRawWriter&) const { }} dummy;
+
+ return QueryContinuous(qry, dummy, -1, OP_QRY_CONTINUOUS, err);
+ }
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp b/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp
new file mode 100644
index 0000000..04e64c9
--- /dev/null
+++ b/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ignite/impl/cache/query/continuous/continuous_query_handle_impl.h"
+
+using namespace ignite::common::concurrent;
+using namespace ignite::jni::java;
+using namespace ignite::java;
+using namespace ignite::impl::interop;
+using namespace ignite::impl::binary;
+
+namespace ignite
+{
+ namespace impl
+ {
+ namespace cache
+ {
+ namespace query
+ {
+ namespace continuous
+ {
+ enum Command
+ {
+ GET_INITIAL_QUERY = 0,
+
+ CLOSE = 1
+ };
+
+ ContinuousQueryHandleImpl::ContinuousQueryHandleImpl(SP_IgniteEnvironment env, int64_t handle, jobject javaRef) :
+ env(env),
+ handle(handle),
+ javaRef(javaRef),
+ mutex(),
+ extracted(false)
+ {
+ // No-op.
+ }
+
+ ContinuousQueryHandleImpl::~ContinuousQueryHandleImpl()
+ {
+ env.Get()->Context()->TargetInLongOutLong(javaRef, CLOSE, 0);
+
+ JniContext::Release(javaRef);
+
+ env.Get()->GetHandleRegistry().Release(handle);
+ }
+
+ QueryCursorImpl* ContinuousQueryHandleImpl::GetInitialQueryCursor(IgniteError& err)
+ {
+ CsLockGuard guard(mutex);
+
+ if (extracted)
+ {
+ err = IgniteError(IgniteError::IGNITE_ERR_GENERIC,
+ "GetInitialQueryCursor() can be called only once.");
+
+ return 0;
+ }
+
+ JniErrorInfo jniErr;
+
+ jobject res = env.Get()->Context()->TargetOutObject(javaRef, GET_INITIAL_QUERY, &jniErr);
+
+ IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, &err);
+
+ if (jniErr.code != IGNITE_JNI_ERR_SUCCESS)
+ return 0;
+
+ extracted = true;
+
+ return new QueryCursorImpl(env, res);
+ }
+
+ void ContinuousQueryHandleImpl::SetQuery(SP_ContinuousQueryImplBase query)
+ {
+ qry = query;
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/src/impl/handle_registry.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/handle_registry.cpp b/modules/platforms/cpp/core/src/impl/handle_registry.cpp
index c447faa..069e996 100644
--- a/modules/platforms/cpp/core/src/impl/handle_registry.cpp
+++ b/modules/platforms/cpp/core/src/impl/handle_registry.cpp
@@ -23,83 +23,67 @@ namespace ignite
{
namespace impl
{
- HandleRegistryEntry::~HandleRegistryEntry()
- {
- // No-op.
- }
-
HandleRegistrySegment::HandleRegistrySegment() :
- map(new std::map<int64_t, SharedPointer<HandleRegistryEntry>>()), mux(new CriticalSection())
+ map(),
+ mux()
{
// No-op.
}
HandleRegistrySegment::~HandleRegistrySegment()
{
- delete map;
- delete mux;
+ // No-op.
}
- SharedPointer<HandleRegistryEntry> HandleRegistrySegment::Get(int64_t hnd)
+ SharedPointer<void> HandleRegistrySegment::Get(int64_t hnd)
{
- mux->Enter();
+ typedef std::map<int64_t, SharedPointer<void>> Map;
- SharedPointer<HandleRegistryEntry> res = (*map)[hnd];
+ CsLockGuard guard(mux);
- mux->Leave();
+ Map::const_iterator it = map.find(hnd);
+ if (it == map.end())
+ return SharedPointer<void>();
- return res;
+ return it->second;
}
- void HandleRegistrySegment::Put(int64_t hnd, const SharedPointer<HandleRegistryEntry>& entry)
+ void HandleRegistrySegment::Put(int64_t hnd, const SharedPointer<void>& entry)
{
- mux->Enter();
-
- (*map)[hnd] = entry;
+ CsLockGuard guard(mux);
- mux->Leave();
+ map[hnd] = entry;
}
void HandleRegistrySegment::Remove(int64_t hnd)
{
- mux->Enter();
+ CsLockGuard guard(mux);
- map->erase(hnd);
-
- mux->Leave();
+ map.erase(hnd);
}
void HandleRegistrySegment::Clear()
{
- mux->Enter();
-
- map->erase(map->begin(), map->end());
+ CsLockGuard guard(mux);
- mux->Leave();
+ map.clear();
}
- HandleRegistry::HandleRegistry(int32_t fastCap, int32_t slowSegmentCnt)
+ HandleRegistry::HandleRegistry(int32_t fastCap, int32_t slowSegmentCnt) :
+ fastCap(fastCap),
+ fastCtr(0),
+ fast(new SharedPointer<void>[fastCap]),
+ slowSegmentCnt(slowSegmentCnt),
+ slowCtr(fastCap),
+ slow(new HandleRegistrySegment*[slowSegmentCnt]),
+ closed(0)
{
- this->fastCap = fastCap;
+ for (int32_t i = 0; i < fastCap; i++)
+ fast[i] = SharedPointer<void>();
- fastCtr = 0;
-
- fast = new SharedPointer<HandleRegistryEntry>[fastCap];
-
- for (int i = 0; i < fastCap; i++)
- fast[i] = SharedPointer<HandleRegistryEntry>();
-
- this->slowSegmentCnt = slowSegmentCnt;
-
- slowCtr = fastCap;
-
- slow = new HandleRegistrySegment*[slowSegmentCnt];
-
- for (int i = 0; i < slowSegmentCnt; i++)
+ for (int32_t i = 0; i < slowSegmentCnt; i++)
slow[i] = new HandleRegistrySegment();
- closed = 0;
-
Memory::Fence();
}
@@ -115,22 +99,22 @@ namespace ignite
delete[] slow;
}
- int64_t HandleRegistry::Allocate(const SharedPointer<HandleRegistryEntry>& target)
+ int64_t HandleRegistry::Allocate(const SharedPointer<void>& target)
{
return Allocate0(target, false, false);
}
- int64_t HandleRegistry::AllocateCritical(const SharedPointer<HandleRegistryEntry>& target)
+ int64_t HandleRegistry::AllocateCritical(const SharedPointer<void>& target)
{
return Allocate0(target, true, false);
}
- int64_t HandleRegistry::AllocateSafe(const SharedPointer<HandleRegistryEntry>& target)
+ int64_t HandleRegistry::AllocateSafe(const SharedPointer<void>& target)
{
return Allocate0(target, false, true);
}
- int64_t HandleRegistry::AllocateCriticalSafe(const SharedPointer<HandleRegistryEntry>& target)
+ int64_t HandleRegistry::AllocateCriticalSafe(const SharedPointer<void>& target)
{
return Allocate0(target, true, true);
}
@@ -138,10 +122,10 @@ namespace ignite
void HandleRegistry::Release(int64_t hnd)
{
if (hnd < fastCap)
- fast[static_cast<int32_t>(hnd)] = SharedPointer<HandleRegistryEntry>();
+ fast[static_cast<int32_t>(hnd)] = SharedPointer<void>();
else
{
- HandleRegistrySegment* segment = *(slow + hnd % slowSegmentCnt);
+ HandleRegistrySegment* segment = slow[hnd % slowSegmentCnt];
segment->Remove(hnd);
}
@@ -149,7 +133,7 @@ namespace ignite
Memory::Fence();
}
- SharedPointer<HandleRegistryEntry> HandleRegistry::Get(int64_t hnd)
+ SharedPointer<void> HandleRegistry::Get(int64_t hnd)
{
Memory::Fence();
@@ -157,7 +141,7 @@ namespace ignite
return fast[static_cast<int32_t>(hnd)];
else
{
- HandleRegistrySegment* segment = *(slow + hnd % slowSegmentCnt);
+ HandleRegistrySegment* segment = slow[hnd % slowSegmentCnt];
return segment->Get(hnd);
}
@@ -168,16 +152,16 @@ namespace ignite
if (Atomics::CompareAndSet32(&closed, 0, 1))
{
// Cleanup fast-path handles.
- for (int i = 0; i < fastCap; i++)
- fast[i] = SharedPointer<HandleRegistryEntry>();
+ for (int32_t i = 0; i < fastCap; i++)
+ fast[i] = SharedPointer<void>();
// Cleanup slow-path handles.
- for (int i = 0; i < slowSegmentCnt; i++)
- (*(slow + i))->Clear();
+ for (int32_t i = 0; i < slowSegmentCnt; i++)
+ slow[i]->Clear();
}
}
- int64_t HandleRegistry::Allocate0(const SharedPointer<HandleRegistryEntry>& target, bool critical, bool safe)
+ int64_t HandleRegistry::Allocate0(const SharedPointer<void>& target, bool critical, bool safe)
{
// Check closed state.
Memory::Fence();
@@ -201,7 +185,7 @@ namespace ignite
if (safe && closed == 1)
{
- fast[fastIdx] = SharedPointer<HandleRegistryEntry>();
+ fast[fastIdx] = SharedPointer<void>();
return -1;
}
@@ -214,7 +198,7 @@ namespace ignite
// Either allocating on slow-path, or fast-path can no longer accomodate more entries.
int64_t slowIdx = Atomics::IncrementAndGet64(&slowCtr) - 1;
- HandleRegistrySegment* segment = *(slow + slowIdx % slowSegmentCnt);
+ HandleRegistrySegment* segment = slow[slowIdx % slowSegmentCnt];
segment->Put(slowIdx, target);