You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sh...@apache.org on 2016/11/01 02:37:24 UTC
[05/50] [abbrv] ignite git commit: IGNITE-2004 Fixed "Asynchronous
execution of ContinuousQuery's remote filter & local list".
IGNITE-2004 Fixed "Asynchronous execution of ContinuousQuery's remote filter & local list".
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/395f4738
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/395f4738
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/395f4738
Branch: refs/heads/ignite-2788
Commit: 395f47388ef515301e7e49cea7a444063712b9f9
Parents: 24b24bf
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Fri Apr 22 18:41:58 2016 +0300
Committer: shtykh_roman <rs...@yahoo.com>
Committed: Fri May 13 16:11:14 2016 +0900
----------------------------------------------------------------------
.../CacheContinuousAsyncQueryExample.java | 138 +++
.../datagrid/CacheContinuousQueryExample.java | 13 +-
.../ignite/cache/query/ContinuousQuery.java | 27 +
.../configuration/IgniteConfiguration.java | 32 +
.../ignite/internal/GridKernalContext.java | 8 +
.../ignite/internal/GridKernalContextImpl.java | 12 +
.../apache/ignite/internal/IgniteKernal.java | 3 +
.../org/apache/ignite/internal/IgnitionEx.java | 16 +-
.../processors/cache/GridCacheEntryEx.java | 5 +-
.../processors/cache/GridCacheMapEntry.java | 49 +-
.../dht/atomic/GridDhtAtomicCache.java | 98 +-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 121 +--
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 33 +-
.../distributed/near/GridNearAtomicCache.java | 3 +-
.../continuous/CacheContinuousQueryEvent.java | 7 +
.../continuous/CacheContinuousQueryHandler.java | 446 +++++++--
.../CacheContinuousQueryListener.java | 6 +-
.../continuous/CacheContinuousQueryManager.java | 62 +-
.../apache/ignite/lang/IgniteAsyncCallback.java | 111 +++
.../thread/IgniteStripedThreadPoolExecutor.java | 164 +--
.../processors/cache/GridCacheTestEntryEx.java | 4 +-
...FailoverAtomicPrimaryWriteOrderSelfTest.java | 50 +
...sQueryAsyncFailoverTxReplicatedSelfTest.java | 37 +
...eContinuousQueryAsyncFailoverTxSelfTest.java | 44 +
...eContinuousQueryAsyncFilterListenerTest.java | 986 +++++++++++++++++++
...ryFactoryAsyncFilterRandomOperationTest.java | 131 +++
...usQueryFactoryFilterRandomOperationTest.java | 725 ++++++++++++++
.../CacheContinuousQueryFactoryFilterTest.java | 714 --------------
...ContinuousQueryFailoverAbstractSelfTest.java | 63 +-
.../CacheContinuousQueryLostPartitionTest.java | 14 +
...ontinuousQueryOperationFromCallbackTest.java | 627 ++++++++++++
.../CacheContinuousQueryOrderingEventTest.java | 722 ++++++++++++++
...acheContinuousQueryRandomOperationsTest.java | 23 +
.../junits/GridTestKernalContext.java | 1 +
.../IgniteBinaryCacheQueryTestSuite.java | 1 -
.../IgniteCacheQuerySelfTestSuite3.java | 14 +-
.../IgniteCacheQuerySelfTestSuite4.java | 7 +
.../cache/CacheEntryEventAsyncProbe.java | 61 ++
.../yardstick/cache/CacheEntryEventProbe.java | 33 +-
39 files changed, 4408 insertions(+), 1203 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java
new file mode 100644
index 0000000..4ac7ecb
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.datagrid;
+
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.examples.ExampleNodeStartup;
+import org.apache.ignite.lang.IgniteAsyncCallback;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
+
+/**
+ * This examples demonstrates asynchronous continuous query API.
+ * <p>
+ * Remote nodes should always be started with special configuration file which
+ * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
+ * <p>
+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will
+ * start node with {@code examples/config/example-ignite.xml} configuration.
+ */
+public class CacheContinuousAsyncQueryExample {
+ /** Cache name. */
+ private static final String CACHE_NAME = CacheContinuousAsyncQueryExample.class.getSimpleName();
+
+ /**
+ * Executes example.
+ *
+ * @param args Command line arguments, none required.
+ * @throws Exception If example execution failed.
+ */
+ public static void main(String[] args) throws Exception {
+ try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+ System.out.println();
+ System.out.println(">>> Cache continuous query example started.");
+
+ // Auto-close cache at the end of the example.
+ try (IgniteCache<Integer, String> cache = ignite.getOrCreateCache(CACHE_NAME)) {
+ int keyCnt = 20;
+
+ // These entries will be queried by initial predicate.
+ for (int i = 0; i < keyCnt; i++)
+ cache.put(i, Integer.toString(i));
+
+ // Create new continuous query.
+ ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();
+
+ qry.setInitialQuery(new ScanQuery<>(new IgniteBiPredicate<Integer, String>() {
+ @Override public boolean apply(Integer key, String val) {
+ return key > 10;
+ }
+ }));
+
+ // Callback that is called locally when update notifications are received.
+ qry.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) {
+ for (CacheEntryEvent<? extends Integer, ? extends String> e : evts)
+ System.out.println("Updated entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
+ }
+ });
+
+ // This filter will be evaluated remotely on all nodes.
+ // Entry that pass this filter will be sent to the caller.
+ qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter<Integer, String>>() {
+ @Override public CacheEntryEventFilter<Integer, String> create() {
+ return new CacheEntryFilter();
+ }
+ });
+
+ // Execute query.
+ try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) {
+ // Iterate through existing data.
+ for (Cache.Entry<Integer, String> e : cur)
+ System.out.println("Queried existing entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
+
+ // Add a few more keys and watch more query notifications.
+ for (int i = 0; i < keyCnt; i++)
+ cache.put(i, Integer.toString(i));
+
+ // Wait for a while while callback is notified about remaining puts.
+ Thread.sleep(2000);
+ }
+
+ // Iterate through entries which was updated from filter.
+ for (int i = 0; i < 10; i++)
+ System.out.println("Entry updated from filter [key=" + i + ", val=" + cache.get(i) + ']');
+ }
+ finally {
+ // Distributed cache could be removed from cluster only by #destroyCache() call.
+ ignite.destroyCache(CACHE_NAME);
+ }
+ }
+ }
+
+ /**
+ * Filter returns {@code true} for entries which have key bigger than 10.
+ */
+ @IgniteAsyncCallback
+ private static class CacheEntryFilter implements CacheEntryEventFilter<Integer, String> {
+ /** Ignite instance. */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e)
+ throws CacheEntryListenerException {
+ // This cache operation is safe because filter has Ignite AsyncCallback annotation.
+ if (e.getKey() < 10 && String.valueOf(e.getKey()).equals(e.getValue()))
+ ignite.cache(CACHE_NAME).put(e.getKey(), e.getValue() + "_less_than_10");
+
+ return e.getKey() > 10;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
index 59759af..aad5b5d 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
@@ -18,12 +18,13 @@
package org.apache.ignite.examples.datagrid;
import javax.cache.Cache;
+import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryUpdatedListener;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
-import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
@@ -81,9 +82,13 @@ public class CacheContinuousQueryExample {
// This filter will be evaluated remotely on all nodes.
// Entry that pass this filter will be sent to the caller.
- qry.setRemoteFilter(new CacheEntryEventSerializableFilter<Integer, String>() {
- @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) {
- return e.getKey() > 10;
+ qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter<Integer, String>>() {
+ @Override public CacheEntryEventFilter<Integer, String> create() {
+ return new CacheEntryEventFilter<Integer, String>() {
+ @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) {
+ return e.getKey() > 10;
+ }
+ };
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
index 3ea8f93..bbfe8cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
@@ -23,6 +23,8 @@ import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryUpdatedListener;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.lang.IgniteAsyncCallback;
/**
* API for configuring continuous cache queries.
@@ -92,6 +94,16 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
* Note that this works even if you didn't provide initial query. Cursor will
* be empty in this case, but it will still unregister listeners when {@link QueryCursor#close()}
* is called.
+ * <p>
+ * {@link IgniteAsyncCallback} annotation is supported for {@link CacheEntryEventFilter}
+ * (see {@link #setRemoteFilterFactory(Factory)}) and {@link CacheEntryUpdatedListener}
+ * (see {@link #setLocalListener(CacheEntryUpdatedListener)}).
+ * If filter and/or listener are annotated with {@link IgniteAsyncCallback} then annotated callback
+ * is executed in async callback pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()})
+ * and notification order is kept the same as update order for given cache key.
+ *
+ * @see IgniteAsyncCallback
+ * @see IgniteConfiguration#getAsyncCallbackPoolSize()
*/
public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
/** */
@@ -173,9 +185,14 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
* <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking (e.g.,
* synchronization or transactional cache operations), should be executed asynchronously without
* blocking the thread that called the callback. Otherwise, you can get deadlocks.
+ * <p>
+ * If local listener are annotated with {@link IgniteAsyncCallback} then it is executed in async callback pool
+ * (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) that allow to perform a cache operations.
*
* @param locLsnr Local callback.
* @return {@code this} for chaining.
+ * @see IgniteAsyncCallback
+ * @see IgniteConfiguration#getAsyncCallbackPoolSize()
*/
public ContinuousQuery<K, V> setLocalListener(CacheEntryUpdatedListener<K, V> locLsnr) {
this.locLsnr = locLsnr;
@@ -198,11 +215,16 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
* <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking
* (e.g., synchronization or transactional cache operations), should be executed asynchronously
* without blocking the thread that called the filter. Otherwise, you can get deadlocks.
+ * <p>
+ * If remote filter are annotated with {@link IgniteAsyncCallback} then it is executed in async callback
+ * pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) that allow to perform a cache operations.
*
* @param rmtFilter Key-value filter.
* @return {@code this} for chaining.
*
* @deprecated Use {@link #setRemoteFilterFactory(Factory)} instead.
+ * @see IgniteAsyncCallback
+ * @see IgniteConfiguration#getAsyncCallbackPoolSize()
*/
@Deprecated
public ContinuousQuery<K, V> setRemoteFilter(CacheEntryEventSerializableFilter<K, V> rmtFilter) {
@@ -227,9 +249,14 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
* <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking
* (e.g., synchronization or transactional cache operations), should be executed asynchronously
* without blocking the thread that called the filter. Otherwise, you can get deadlocks.
+ * <p>
+ * If remote filter are annotated with {@link IgniteAsyncCallback} then it is executed in async callback
+ * pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) that allow to perform a cache operations.
*
* @param rmtFilterFactory Key-value filter factory.
* @return {@code this} for chaining.
+ * @see IgniteAsyncCallback
+ * @see IgniteConfiguration#getAsyncCallbackPoolSize()
*/
public ContinuousQuery<K, V> setRemoteFilterFactory(
Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index ca112c2..c7ebbb6 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -43,6 +43,7 @@ import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteAsyncCallback;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lifecycle.LifecycleBean;
@@ -223,6 +224,9 @@ public class IgniteConfiguration {
/** Public pool size. */
private int pubPoolSize = DFLT_PUBLIC_THREAD_CNT;
+ /** Async Callback pool size. */
+ private int callbackPoolSize = DFLT_PUBLIC_THREAD_CNT;
+
/** System pool size. */
private int sysPoolSize = DFLT_SYSTEM_CORE_THREAD_CNT;
@@ -723,6 +727,20 @@ public class IgniteConfiguration {
}
/**
+ * Size of thread pool that is in charge of processing asynchronous callbacks.
+ * <p>
+ * This pool is used for callbacks annotated with {@link IgniteAsyncCallback}.
+ * <p>
+ * If not provided, executor service will have size {@link #DFLT_PUBLIC_THREAD_CNT}.
+ *
+ * @return Thread pool size to be used.
+ * @see IgniteAsyncCallback
+ */
+ public int getAsyncCallbackPoolSize() {
+ return callbackPoolSize;
+ }
+
+ /**
* Size of thread pool that is in charge of processing internal and Visor
* {@link ComputeJob GridJobs}.
* <p>
@@ -831,6 +849,20 @@ public class IgniteConfiguration {
}
/**
+ * Sets async callback thread pool size to use within grid.
+ *
+ * @param poolSize Thread pool size to use within grid.
+ * @return {@code this} for chaining.
+ * @see IgniteConfiguration#getAsyncCallbackPoolSize()
+ * @see IgniteAsyncCallback
+ */
+ public IgniteConfiguration setAsyncCallbackPoolSize(int poolSize) {
+ this.callbackPoolSize = poolSize;
+
+ return this;
+ }
+
+ /**
* Sets management thread pool size to use within grid.
*
* @param poolSize Thread pool size to use within grid.
http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 77c8794..f51727d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.util.IgniteExceptionRegistry;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.plugin.PluginNotFoundException;
import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
/**
*
@@ -298,6 +299,13 @@ public interface GridKernalContext extends Iterable<GridComponent> {
public ExecutorService marshallerCachePool();
/**
+ * Gets async callback pool.
+ *
+ * @return Async callback pool.
+ */
+ public IgniteStripedThreadPoolExecutor asyncCallbackPool();
+
+ /**
* Gets cache object processor.
*
* @return Cache object processor.
http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 753dbe8..79d67df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -89,6 +89,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.PluginNotFoundException;
import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DAEMON;
@@ -305,6 +306,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
/** */
@GridToStringExclude
+ protected IgniteStripedThreadPoolExecutor callbackExecSvc;
+
+ /** */
+ @GridToStringExclude
private Map<String, Object> attrs = new HashMap<>();
/** */
@@ -379,6 +384,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
ExecutorService mgmtExecSvc,
ExecutorService igfsExecSvc,
ExecutorService restExecSvc,
+ IgniteStripedThreadPoolExecutor callbackExecSvc,
List<PluginProvider> plugins) throws IgniteCheckedException {
assert grid != null;
assert cfg != null;
@@ -395,6 +401,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
this.mgmtExecSvc = mgmtExecSvc;
this.igfsExecSvc = igfsExecSvc;
this.restExecSvc = restExecSvc;
+ this.callbackExecSvc = callbackExecSvc;
marshCtx = new MarshallerContextImpl(plugins);
@@ -746,6 +753,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
}
/** {@inheritDoc} */
+ @Override public IgniteStripedThreadPoolExecutor asyncCallbackPool() {
+ return callbackExecSvc;
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteCacheObjectProcessor cacheObjects() {
return cacheObjProc;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 20795fc..d6655d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -167,6 +167,7 @@ import org.apache.ignite.plugin.PluginProvider;
import org.apache.ignite.spi.IgniteSpi;
import org.apache.ignite.spi.IgniteSpiVersionCheckException;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
+import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONFIG_URL;
@@ -667,6 +668,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
ExecutorService mgmtExecSvc,
ExecutorService igfsExecSvc,
ExecutorService restExecSvc,
+ IgniteStripedThreadPoolExecutor callbackExecSvc,
GridAbsClosure errHnd)
throws IgniteCheckedException
{
@@ -771,6 +773,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
mgmtExecSvc,
igfsExecSvc,
restExecSvc,
+ callbackExecSvc,
plugins);
cfg.getMarshaller().setContext(ctx.marshallerContext());
http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index c46a05c..7776687 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -95,6 +95,7 @@ import org.apache.ignite.spi.indexing.noop.NoopIndexingSpi;
import org.apache.ignite.spi.loadbalancing.roundrobin.RoundRobinLoadBalancingSpi;
import org.apache.ignite.spi.swapspace.file.FileSwapSpaceSpi;
import org.apache.ignite.spi.swapspace.noop.NoopSwapSpaceSpi;
+import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;
@@ -1476,6 +1477,9 @@ public class IgnitionEx {
/** Marshaller cache executor service. */
private ExecutorService marshCacheExecSvc;
+ /** Continuous query executor service. */
+ private IgniteStripedThreadPoolExecutor callbackExecSvc;
+
/** Grid state. */
private volatile IgniteState state = STOPPED;
@@ -1688,6 +1692,12 @@ public class IgnitionEx {
0,
new LinkedBlockingQueue<Runnable>());
+ // Note that we do not pre-start threads here as this pool may not be needed.
+ callbackExecSvc = new IgniteStripedThreadPoolExecutor(
+ cfg.getAsyncCallbackPoolSize(),
+ cfg.getGridName(),
+ "callback");
+
if (myCfg.getConnectorConfiguration() != null) {
restExecSvc = new IgniteThreadPoolExecutor(
"rest",
@@ -1727,7 +1737,7 @@ public class IgnitionEx {
grid = grid0;
grid0.start(myCfg, utilityCacheExecSvc, marshCacheExecSvc, execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc,
- igfsExecSvc, restExecSvc,
+ igfsExecSvc, restExecSvc, callbackExecSvc,
new CA() {
@Override public void apply() {
startLatch.countDown();
@@ -2335,6 +2345,10 @@ public class IgnitionEx {
U.shutdownNow(getClass(), marshCacheExecSvc, log);
marshCacheExecSvc = null;
+
+ U.shutdownNow(getClass(), callbackExecSvc, log);
+
+ callbackExecSvc = null;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index d6d7335..e679dfd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -27,6 +27,7 @@ import org.apache.ignite.cache.eviction.EvictableEntry;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -462,6 +463,7 @@ public interface GridCacheEntryEx {
* @param subjId Subject ID initiated this update.
* @param taskName Task name.
* @param updateCntr Update counter.
+ * @param fut Dht atomic future.
* @return Tuple where first value is flag showing whether operation succeeded,
* second value is old entry value if return value is requested, third is updated entry value,
* fourth is the version to enqueue for deferred delete the fifth is DR conflict context
@@ -497,7 +499,8 @@ public interface GridCacheEntryEx {
@Nullable UUID subjId,
String taskName,
@Nullable CacheObject prevVal,
- @Nullable Long updateCntr
+ @Nullable Long updateCntr,
+ @Nullable GridDhtAtomicUpdateFuture fut
) throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 735e20a..75d96d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
import org.apache.ignite.internal.processors.cache.extras.GridCacheEntryExtras;
import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtras;
@@ -1248,6 +1249,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
tx.local(),
false,
updateCntr0,
+ null,
topVer);
}
@@ -1445,6 +1447,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
tx.local(),
false,
updateCntr0,
+ null,
topVer);
}
@@ -1821,6 +1824,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
true,
false,
updateCntr,
+ null,
AffinityTopologyVersion.NONE);
}
@@ -1870,7 +1874,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
@Nullable final UUID subjId,
final String taskName,
@Nullable final CacheObject prevVal,
- @Nullable final Long updateCntr
+ @Nullable final Long updateCntr,
+ @Nullable GridDhtAtomicUpdateFuture fut
) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException {
assert cctx.atomic();
@@ -1899,7 +1904,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
Long updateCntr0 = null;
synchronized (this) {
- boolean needVal = intercept || retval || op == GridCacheOperation.TRANSFORM || !F.isEmptyOrNulls(filter);
+ boolean internal = isInternal() || !context().userCache();
+
+ Map<UUID, CacheContinuousQueryListener> lsnrs = cctx.continuousQueries().updateListeners(internal, false);
+
+ boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM
+ || !F.isEmptyOrNulls(filter);
checkObsolete();
@@ -2093,6 +2103,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
primary,
false,
updateCntr0,
+ null,
topVer);
}
@@ -2501,13 +2512,42 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (res)
updateMetrics(op, metrics);
+ // Continuous query filter should be perform under lock.
+ if (lsnrs != null) {
+ CacheObject evtVal = updated;
+ CacheObject evtOldVal = oldVal;
+
+ if (isOffHeapValuesOnly()) {
+ evtVal = cctx.toCacheObject(cctx.unwrapTemporary(evtVal));
+
+ evtOldVal = cctx.toCacheObject(cctx.unwrapTemporary(evtOldVal));
+ }
+
+ cctx.continuousQueries().onEntryUpdated(lsnrs, key, evtVal, evtOldVal, internal,
+ partition(), primary, false, updateCntr0, fut, topVer);
+ }
+
cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepBinary);
if (intercept) {
if (op == GridCacheOperation.UPDATE)
- cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, updated, updated0, keepBinary, updateCntr0));
+ cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(
+ cctx,
+ key,
+ key0,
+ updated,
+ updated0,
+ keepBinary,
+ updateCntr0));
else
- cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(cctx, key, key0, oldVal, old0, keepBinary, updateCntr0));
+ cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(
+ cctx,
+ key,
+ key0,
+ oldVal,
+ old0,
+ keepBinary,
+ updateCntr0));
if (interceptRes != null)
oldVal = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2()));
@@ -3302,6 +3342,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
true,
preload,
updateCntr,
+ null,
topVer);
cctx.dataStructures().onEntryUpdated(key, false, true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 013184b..d28aaaa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -76,7 +76,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
-import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
@@ -2141,10 +2140,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean intercept = ctx.config().getInterceptor() != null;
- boolean initLsnrs = false;
- Map<UUID, CacheContinuousQueryListener> lsnrs = null;
- boolean internal = false;
-
// Avoid iterator creation.
for (int i = 0; i < keys.size(); i++) {
KeyCacheObject k = keys.get(i);
@@ -2159,14 +2154,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (entry == null)
continue;
- if (!initLsnrs) {
- internal = entry.isInternal() || !context().userCache();
-
- lsnrs = ctx.continuousQueries().updateListeners(internal, false);
-
- initLsnrs = true;
- }
-
GridCacheVersion newConflictVer = req.conflictVersion(i);
long newConflictTtl = req.conflictTtl(i);
long newConflictExpireTime = req.conflictExpireTime(i);
@@ -2195,7 +2182,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
req.invokeArguments(),
primary && writeThrough() && !req.skipStore(),
!req.skipStore(),
- lsnrs != null || sndPrevVal || req.returnValue(),
+ sndPrevVal || req.returnValue(),
req.keepBinary(),
expiry,
true,
@@ -2213,7 +2200,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
req.subjectId(),
taskName,
null,
- null);
+ null,
+ dhtFut);
if (dhtFut == null && !F.isEmpty(filteredReaders)) {
dhtFut = createDhtFuture(ver, req, res, completionCb, true);
@@ -2222,8 +2210,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
if (dhtFut != null) {
- dhtFut.listeners(lsnrs);
-
if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios.
GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult();
@@ -2260,19 +2246,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
"[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']');
}
}
- else if (lsnrs != null && updRes.updateCounter() != 0) {
- ctx.continuousQueries().onEntryUpdated(
- lsnrs,
- entry.key(),
- updRes.newValue(),
- updRes.oldValue(),
- internal,
- entry.partition(),
- primary,
- false,
- updRes.updateCounter(),
- topVer);
- }
if (hasNear) {
if (primary && updRes.sendToDht()) {
@@ -2446,9 +2419,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean intercept = ctx.config().getInterceptor() != null;
- boolean initLsnrs = false;
- Map<UUID, CacheContinuousQueryListener> lsnrs = null;
-
// Avoid iterator creation.
for (int i = 0; i < entries.size(); i++) {
GridDhtCacheEntry entry = entries.get(i);
@@ -2482,14 +2452,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id()));
}
- if (!initLsnrs) {
- lsnrs = ctx.continuousQueries().updateListeners(
- entry.isInternal() || !context().userCache(),
- false);
-
- initLsnrs = true;
- }
-
GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
ver,
node.id(),
@@ -2499,7 +2461,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
null,
/*write-through*/false,
/*read-through*/false,
- /*retval*/sndPrevVal || lsnrs != null,
+ /*retval*/sndPrevVal,
req.keepBinary(),
expiry,
/*event*/true,
@@ -2517,7 +2479,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
req.subjectId(),
taskName,
null,
- null);
+ null,
+ dhtFut);
assert !updRes.success() || updRes.newTtl() == CU.TTL_NOT_CHANGED || expiry != null :
"success=" + updRes.success() + ", newTtl=" + updRes.newTtl() + ", expiry=" + expiry;
@@ -2548,12 +2511,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
if (dhtFut != null) {
- dhtFut.listeners(lsnrs);
-
EntryProcessor<Object, Object, Object> entryProcessor =
entryProcessorMap == null ? null : entryProcessorMap.get(entry.key());
- if (!batchRes.readersOnly())
+ if (!batchRes.readersOnly()) {
dhtFut.addWriteEntry(entry,
writeVal,
entryProcessor,
@@ -2563,6 +2524,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
sndPrevVal,
updRes.oldValue(),
updRes.updateCounter());
+ }
if (!F.isEmpty(filteredReaders))
dhtFut.addNearWriteEntries(filteredReaders,
@@ -2572,19 +2534,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
updRes.newTtl(),
CU.EXPIRE_TIME_CALCULATE);
}
- else if (lsnrs != null && updRes.updateCounter() != 0) {
- ctx.continuousQueries().onEntryUpdated(
- lsnrs,
- entry.key(),
- updRes.newValue(),
- updRes.oldValue(),
- entry.isInternal() || !context().userCache(),
- entry.partition(),
- primary,
- false,
- updRes.updateCounter(),
- topVer);
- }
if (hasNear) {
if (primary) {
@@ -2965,10 +2914,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
- boolean initLsnrs = false;
- Map<UUID, CacheContinuousQueryListener> lsnrs = null;
- boolean internal = false;
-
for (int i = 0; i < req.size(); i++) {
KeyCacheObject key = req.key(i);
@@ -2991,14 +2936,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
long ttl = req.ttl(i);
long expireTime = req.conflictExpireTime(i);
- if (!initLsnrs) {
- internal = entry.isInternal() || !context().userCache();
-
- lsnrs = ctx.continuousQueries().updateListeners(internal, false);
-
- initLsnrs = true;
- }
-
GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
ver,
nodeId,
@@ -3008,7 +2945,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
op == TRANSFORM ? req.invokeArguments() : null,
/*write-through*/false,
/*read-through*/false,
- /*retval*/lsnrs != null,
+ /*retval*/false,
req.keepBinary(),
/*expiry policy*/null,
/*event*/true,
@@ -3026,25 +2963,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
req.subjectId(),
taskName,
prevVal,
- updateIdx);
+ updateIdx,
+ null);
if (updRes.removeVersion() != null)
ctx.onDeferredDelete(entry, updRes.removeVersion());
- if (lsnrs != null && updRes.updateCounter() != 0) {
- ctx.continuousQueries().onEntryUpdated(
- lsnrs,
- entry.key(),
- updRes.newValue(),
- updRes.oldValue(),
- internal,
- entry.partition(),
- false,
- false,
- updRes.updateCounter(),
- req.topologyVersion());
- }
-
entry.onUnlock();
break; // While.
http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 4721d6e..5760596 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -37,11 +37,11 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
-import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -97,15 +97,15 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
/** Future keys. */
private final Collection<KeyCacheObject> keys;
+ /** Continuous query closures. */
+ private Collection<CI1<Boolean>> cntQryClsrs;
+
/** */
private final boolean waitForExchange;
/** Response count. */
private volatile int resCnt;
- /** */
- private Map<UUID, CacheContinuousQueryListener> lsnrs;
-
/**
* @param cctx Cache context.
* @param completionCb Callback to invoke when future is completed.
@@ -138,13 +138,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
waitForExchange = !(updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest()));
}
- /**
- * @param lsnrs Continuous query listeners.
- */
- void listeners(@Nullable Map<UUID, CacheContinuousQueryListener> lsnrs) {
- this.lsnrs = lsnrs;
- }
-
/** {@inheritDoc} */
@Override public IgniteUuid futureId() {
return futVer.asGridUuid();
@@ -276,27 +269,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
addPrevVal,
entry.partition(),
prevVal,
- updateCntr,
- lsnrs != null);
- }
- else if (lsnrs != null && dhtNodes.size() == 1) {
- try {
- cctx.continuousQueries().onEntryUpdated(
- lsnrs,
- entry.key(),
- val,
- prevVal,
- entry.key().internal() || !cctx.userCache(),
- entry.partition(),
- true,
- false,
- updateCntr,
- updateReq.topologyVersion());
- }
- catch (IgniteCheckedException e) {
- U.warn(log, "Failed to send continuous query message. [key=" + entry.key() + ", newVal="
- + val + ", err=" + e + "]");
- }
+ updateCntr);
}
}
}
@@ -361,77 +334,33 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
}
}
+ /**
+ * @param clsr Continuous query closure.
+ */
+ public void addContinuousQueryClosure(CI1<Boolean> clsr){
+ assert !isDone() : this;
+
+ if (cntQryClsrs == null)
+ cntQryClsrs = new ArrayList<>(10);
+
+ cntQryClsrs.add(clsr);
+ }
+
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
if (super.onDone(res, err)) {
cctx.mvcc().removeAtomicFuture(version());
- if (err != null) {
- if (!mappings.isEmpty() && lsnrs != null) {
- Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
-
- exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
- for (int i = 0; i < req.size(); i++) {
- KeyCacheObject key = req.key(i);
-
- if (!hndKeys.contains(key)) {
- updateRes.addFailedKey(key, err);
+ boolean suc = err == null;
- cctx.continuousQueries().skipUpdateEvent(
- lsnrs,
- key,
- req.partitionId(i),
- req.updateCounter(i),
- updateReq.topologyVersion());
-
- hndKeys.add(key);
-
- if (hndKeys.size() == keys.size())
- break exit;
- }
- }
- }
- }
- else
- for (KeyCacheObject key : keys)
- updateRes.addFailedKey(key, err);
+ if (!suc) {
+ for (KeyCacheObject key : keys)
+ updateRes.addFailedKey(key, err);
}
- else {
- if (lsnrs != null) {
- Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
-
- exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
- for (int i = 0; i < req.size(); i++) {
- KeyCacheObject key = req.key(i);
-
- if (!hndKeys.contains(key)) {
- try {
- cctx.continuousQueries().onEntryUpdated(
- lsnrs,
- key,
- req.value(i),
- req.localPreviousValue(i),
- key.internal() || !cctx.userCache(),
- req.partitionId(i),
- true,
- false,
- req.updateCounter(i),
- updateReq.topologyVersion());
- }
- catch (IgniteCheckedException e) {
- U.warn(log, "Failed to send continuous query message. [key=" + key +
- ", newVal=" + req.value(i) +
- ", err=" + e + "]");
- }
-
- hndKeys.add(key);
-
- if (hndKeys.size() == keys.size())
- break exit;
- }
- }
- }
- }
+
+ if (cntQryClsrs != null) {
+ for (CI1<Boolean> clsr : cntQryClsrs)
+ clsr.apply(suc);
}
if (updateReq.writeSynchronizationMode() == FULL_SYNC)
http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index c8e33c2..b5e2835 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -155,10 +155,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
@GridDirectTransient
private List<Integer> partIds;
- /** */
- @GridDirectTransient
- private List<CacheObject> locPrevVals;
-
/** Keep binary flag. */
private boolean keepBinary;
@@ -242,7 +238,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
* @param partId Partition.
* @param prevVal Previous value.
* @param updateCntr Update counter.
- * @param storeLocPrevVal If {@code true} stores previous value.
*/
public void addWriteValue(KeyCacheObject key,
@Nullable CacheObject val,
@@ -253,19 +248,12 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
boolean addPrevVal,
int partId,
@Nullable CacheObject prevVal,
- @Nullable Long updateCntr,
- boolean storeLocPrevVal) {
+ @Nullable Long updateCntr
+ ) {
keys.add(key);
partIds.add(partId);
- if (storeLocPrevVal) {
- if (locPrevVals == null)
- locPrevVals = new ArrayList<>();
-
- locPrevVals.add(prevVal);
- }
-
if (forceTransformBackups) {
assert entryProcessor != null;
@@ -526,16 +514,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
/**
* @param idx Key index.
- * @return Value.
- */
- @Nullable public CacheObject localPreviousValue(int idx) {
- assert locPrevVals != null;
-
- return locPrevVals.get(idx);
- }
-
- /**
- * @param idx Key index.
* @return Entry processor.
*/
@Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
@@ -1069,13 +1047,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
private void cleanup() {
nearVals = null;
prevVals = null;
-
- // Do not keep values if they are not needed for continuous query notification.
- if (locPrevVals == null) {
- keys = null;
- vals = null;
- locPrevVals = null;
- }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index a7481d3..3e0e392 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -37,7 +37,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
-import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
@@ -271,6 +270,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
subjId,
taskName,
null,
+ null,
null);
if (updRes.removeVersion() != null)
@@ -372,6 +372,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
req.subjectId(),
taskName,
null,
+ null,
null);
if (updRes.removeVersion() != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
index eab5dbd..db70e2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
@@ -56,6 +56,13 @@ class CacheContinuousQueryEvent<K, V> extends CacheQueryEntryEvent<K, V> {
return e;
}
+ /**
+ * @return Partition ID.
+ */
+ public int partitionId() {
+ return e.partition();
+ }
+
/** {@inheritDoc} */
@Override public K getKey() {
return (K)cctx.cacheObjectContext().unwrapBinaryIfNeeded(e.key(), e.isKeepBinary(), false);
http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 16513b0..9ae2972 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -48,6 +48,7 @@ import org.apache.ignite.events.CacheQueryExecutedEvent;
import org.apache.ignite.events.CacheQueryReadEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteDeploymentCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
@@ -58,7 +59,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryLocalListener;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter;
import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
@@ -66,13 +70,14 @@ import org.apache.ignite.internal.processors.platform.cache.query.PlatformContin
import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteAsyncCallback;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedDeque8;
@@ -159,6 +164,21 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
/** */
private transient boolean ignoreClsNotFound;
+ /** */
+ private transient boolean asyncCallback;
+
+ /** */
+ private transient UUID nodeId;
+
+ /** */
+ private transient UUID routineId;
+
+ /** */
+ private transient GridKernalContext ctx;
+
+ /** */
+ private transient IgniteLogger log;
+
/**
* Required by {@link Externalizable}.
*/
@@ -283,13 +303,36 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
assert routineId != null;
assert ctx != null;
- if (locLsnr != null)
- ctx.resource().injectGeneric(locLsnr);
+ if (locLsnr != null) {
+ if (locLsnr instanceof JCacheQueryLocalListener) {
+ ctx.resource().injectGeneric(((JCacheQueryLocalListener)locLsnr).impl);
+
+ asyncCallback = ((JCacheQueryLocalListener)locLsnr).async();
+ }
+ else {
+ ctx.resource().injectGeneric(locLsnr);
+
+ asyncCallback = U.hasAnnotation(locLsnr, IgniteAsyncCallback.class);
+ }
+ }
final CacheEntryEventFilter filter = getEventFilter();
- if (filter != null)
- ctx.resource().injectGeneric(filter);
+ if (filter != null) {
+ if (filter instanceof JCacheQueryRemoteFilter) {
+ if (((JCacheQueryRemoteFilter)filter).impl != null)
+ ctx.resource().injectGeneric(((JCacheQueryRemoteFilter)filter).impl);
+
+ if (!asyncCallback)
+ asyncCallback = ((JCacheQueryRemoteFilter)filter).async();
+ }
+ else {
+ ctx.resource().injectGeneric(filter);
+
+ if (!asyncCallback)
+ asyncCallback = U.hasAnnotation(filter, IgniteAsyncCallback.class);
+ }
+ }
entryBufs = new ConcurrentHashMap<>();
@@ -299,10 +342,18 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
rcvs = new ConcurrentHashMap<>();
+ this.nodeId = nodeId;
+
+ this.routineId = routineId;
+
+ this.ctx = ctx;
+
final boolean loc = nodeId.equals(ctx.localNodeId());
assert !skipPrimaryCheck || loc;
+ log = ctx.log(CacheContinuousQueryHandler.class);
+
CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() {
@Override public void onExecution() {
if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
@@ -324,15 +375,16 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
}
- /** {@inheritDoc} */
@Override public boolean keepBinary() {
return keepBinary;
}
- @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary,
- boolean recordIgniteEvt) {
+ @Override public void onEntryUpdated(final CacheContinuousQueryEvent<K, V> evt,
+ boolean primary,
+ final boolean recordIgniteEvt,
+ GridDhtAtomicUpdateFuture fut) {
if (ignoreExpired && evt.getEventType() == EventType.EXPIRED)
- return;
+ return ;
final GridCacheContext<K, V> cctx = cacheContext(ctx);
@@ -343,93 +395,33 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
// skipPrimaryCheck is set only when listen locally for replicated cache events.
assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId));
- boolean notify = !evt.entry().isFiltered();
+ if (asyncCallback) {
+ ContinuousQueryAsyncClosure clsr = new ContinuousQueryAsyncClosure(
+ primary,
+ evt,
+ recordIgniteEvt,
+ fut);
- if (notify && filter != null) {
- try {
- notify = filter.evaluate(evt);
- }
- catch (Exception e) {
- U.error(cctx.logger(CacheContinuousQueryHandler.class), "CacheEntryEventFilter failed: " + e);
- }
+ ctx.asyncCallbackPool().execute(clsr, evt.partitionId());
}
-
- try {
- final CacheContinuousQueryEntry entry = evt.entry();
-
- if (!notify)
- entry.markFiltered();
+ else {
+ final boolean notify = filter(evt, primary);
if (primary || skipPrimaryCheck) {
- if (loc) {
- if (!locCache) {
- Collection<CacheEntryEvent<? extends K, ? extends V>> entries = handleEvent(ctx, entry);
-
- if (!entries.isEmpty()) {
- locLsnr.onUpdated(entries);
-
- if (!internal && !skipPrimaryCheck)
- sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
- }
- }
- else {
- if (!entry.isFiltered())
- locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
- }
- }
+ if (fut == null)
+ onEntryUpdate(evt, notify, loc, recordIgniteEvt);
else {
- if (!entry.isFiltered())
- prepareEntry(cctx, nodeId, entry);
-
- CacheContinuousQueryEntry e = handleEntry(entry);
-
- if (e != null)
- ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
- }
- }
- else {
- if (!internal) {
- // Skip init query and expire entries.
- if (entry.updateCounter() != -1L) {
- entry.markBackup();
+ fut.addContinuousQueryClosure(new CI1<Boolean>() {
+ @Override public void apply(Boolean suc) {
+ if (!suc)
+ evt.entry().markFiltered();
- backupQueue.add(entry);
- }
+ onEntryUpdate(evt, notify, loc, recordIgniteEvt);
+ }
+ });
}
}
}
- catch (ClusterTopologyCheckedException ex) {
- IgniteLogger log = ctx.log(getClass());
-
- if (log.isDebugEnabled())
- log.debug("Failed to send event notification to node, node left cluster " +
- "[node=" + nodeId + ", err=" + ex + ']');
- }
- catch (IgniteCheckedException ex) {
- U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
- }
-
- if (recordIgniteEvt && notify) {
- ctx.event().record(new CacheQueryReadEvent<>(
- ctx.discovery().localNode(),
- "Continuous query executed.",
- EVT_CACHE_QUERY_OBJECT_READ,
- CacheQueryType.CONTINUOUS.name(),
- cacheName,
- null,
- null,
- null,
- filter instanceof CacheEntryEventSerializableFilter ?
- (CacheEntryEventSerializableFilter)filter : null,
- null,
- nodeId,
- taskName(),
- evt.getKey(),
- evt.getValue(),
- evt.getOldValue(),
- null
- ));
- }
}
@Override public void onUnregister() {
@@ -475,15 +467,15 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx);
}
- @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer,
- boolean primary) {
+ @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt,
+ AffinityTopologyVersion topVer, boolean primary) {
assert evt != null;
CacheContinuousQueryEntry e = evt.entry();
e.markFiltered();
- onEntryUpdated(evt, primary, false);
+ onEntryUpdated(evt, primary, false, null);
}
@Override public void onPartitionEvicted(int part) {
@@ -580,17 +572,73 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public void notifyCallback(UUID nodeId, UUID routineId, Collection<?> objs, GridKernalContext ctx) {
+ @Override public void notifyCallback(final UUID nodeId,
+ final UUID routineId,
+ Collection<?> objs,
+ final GridKernalContext ctx) {
assert nodeId != null;
assert routineId != null;
assert objs != null;
assert ctx != null;
- Collection<CacheContinuousQueryEntry> entries = (Collection<CacheContinuousQueryEntry>)objs;
+ final List<CacheContinuousQueryEntry> entries = (List<CacheContinuousQueryEntry>)objs;
+
+ if (entries.isEmpty())
+ return;
+
+ if (asyncCallback) {
+ IgniteStripedThreadPoolExecutor asyncPool = ctx.asyncCallbackPool();
+
+ int threadId = asyncPool.threadId(entries.get(0).partition());
+
+ int startIdx = 0;
+
+ if (entries.size() != 1) {
+ for (int i = 1; i < entries.size(); i++) {
+ int curThreadId = asyncPool.threadId(entries.get(i).partition());
+
+ // If all entries from one partition avoid creation new collections.
+ if (curThreadId == threadId)
+ continue;
+
+ final int i0 = i;
+ final int startIdx0 = startIdx;
+
+ asyncPool.execute(new Runnable() {
+ @Override public void run() {
+ notifyCallback0(nodeId, ctx, entries.subList(startIdx0, i0));
+ }
+ }, threadId);
+
+ startIdx = i0;
+ threadId = curThreadId;
+ }
+ }
+
+ final int startIdx0 = startIdx;
+
+ asyncPool.execute(new Runnable() {
+ @Override public void run() {
+ notifyCallback0(nodeId, ctx,
+ startIdx0 == 0 ? entries : entries.subList(startIdx0, entries.size()));
+ }
+ }, threadId);
+ }
+ else
+ notifyCallback0(nodeId, ctx, entries);
+ }
+ /**
+ * @param nodeId Node id.
+ * @param ctx Kernal context.
+ * @param entries Entries.
+ */
+ private void notifyCallback0(UUID nodeId,
+ final GridKernalContext ctx,
+ Collection<CacheContinuousQueryEntry> entries) {
final GridCacheContext cctx = cacheContext(ctx);
- Collection<CacheEntryEvent<? extends K, ? extends V>> entries0 = new ArrayList<>();
+ final Collection<CacheEntryEvent<? extends K, ? extends V>> entries0 = new ArrayList<>(entries.size());
for (CacheContinuousQueryEntry e : entries) {
GridCacheDeploymentManager depMgr = cctx.deploy();
@@ -609,7 +657,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
try {
e.unmarshal(cctx, ldr);
- entries0.addAll(handleEvent(ctx, e));
+ Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, e);
+
+ if (evts != null && !evts.isEmpty())
+ entries0.addAll(evts);
}
catch (IgniteCheckedException ex) {
if (ignoreClsNotFound)
@@ -640,8 +691,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
if (e.isFiltered())
return Collections.emptyList();
else
- return F.<CacheEntryEvent<? extends K, ? extends V>>asList(
- new CacheContinuousQueryEvent<K, V>(cache, cctx, e));
+ return F.<CacheEntryEvent<? extends K, ? extends V>>
+ asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, e));
}
// Initial query entry or evicted entry. These events should be fired immediately.
@@ -653,7 +704,117 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition());
- return rec.collectEntries(cctx, cache, e);
+ return rec.collectEntries(e, cctx, cache);
+ }
+
+ /**
+ * @param primary Primary.
+ * @param evt Query event.
+ * @return {@code True} if event passed filter otherwise {@code true}.
+ */
+ public boolean filter(CacheContinuousQueryEvent evt, boolean primary) {
+ CacheContinuousQueryEntry entry = evt.entry();
+
+ boolean notify = !entry.isFiltered();
+
+ try {
+ if (notify && getEventFilter() != null)
+ notify = getEventFilter().evaluate(evt);
+ }
+ catch (Exception e) {
+ U.error(log, "CacheEntryEventFilter failed: " + e);
+ }
+
+ if (!notify)
+ entry.markFiltered();
+
+ if (!primary && !internal && entry.updateCounter() != -1L /* Skip init query and expire entries */) {
+ entry.markBackup();
+
+ backupQueue.add(entry);
+ }
+
+ return notify;
+ }
+
+ /**
+ * @param evt Continuous query event.
+ * @param notify Notify flag.
+ * @param loc Listener deployed on this node.
+ * @param recordIgniteEvt Record ignite event.
+ */
+ private void onEntryUpdate(CacheContinuousQueryEvent evt, boolean notify, boolean loc, boolean recordIgniteEvt) {
+ try {
+ GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+ if (cctx == null)
+ return;
+
+ final CacheContinuousQueryEntry entry = evt.entry();
+
+ if (loc) {
+ if (!locCache) {
+ Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, entry);
+
+ if (!evts.isEmpty()) {
+ locLsnr.onUpdated(evts);
+
+ if (!internal && !skipPrimaryCheck)
+ sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
+ }
+ }
+ else {
+ if (!entry.isFiltered())
+ locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
+ }
+ }
+ else {
+ if (!entry.isFiltered())
+ prepareEntry(cctx, nodeId, entry);
+
+ CacheContinuousQueryEntry e = handleEntry(entry);
+
+ if (e != null)
+ ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
+ }
+ }
+ catch (ClusterTopologyCheckedException ex) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send event notification to node, node left cluster " +
+ "[node=" + nodeId + ", err=" + ex + ']');
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
+ }
+
+ if (recordIgniteEvt && notify) {
+ ctx.event().record(new CacheQueryReadEvent<>(
+ ctx.discovery().localNode(),
+ "Continuous query executed.",
+ EVT_CACHE_QUERY_OBJECT_READ,
+ CacheQueryType.CONTINUOUS.name(),
+ cacheName,
+ null,
+ null,
+ null,
+ getEventFilter() instanceof CacheEntryEventSerializableFilter ?
+ (CacheEntryEventSerializableFilter)getEventFilter() : null,
+ null,
+ nodeId,
+ taskName(),
+ evt.getKey(),
+ evt.getValue(),
+ evt.getOldValue(),
+ null
+ ));
+ }
+ }
+
+ /**
+ * @return Task name.
+ */
+ private String taskName() {
+ return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) : null;
}
/**
@@ -781,9 +942,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
* @param entry Cache continuous query entry.
* @return Collection entries which will be fired. This collection should contains only non-filtered events.
*/
- public <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>> collectEntries(GridCacheContext cctx,
- IgniteCache cache,
- CacheContinuousQueryEntry entry) {
+ <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>> collectEntries(
+ CacheContinuousQueryEntry entry,
+ GridCacheContext cctx,
+ IgniteCache cache
+ ) {
assert entry != null;
if (entry.topologyVersion() == null) { // Possible if entry is sent from old node.
@@ -1241,6 +1404,87 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
/**
+ *
+ */
+ private class ContinuousQueryAsyncClosure implements Runnable {
+ /** */
+ private final CacheContinuousQueryEvent<K, V> evt;
+
+ /** */
+ private final boolean primary;
+
+ /** */
+ private final boolean recordIgniteEvt;
+
+ /** */
+ private final IgniteInternalFuture<?> fut;
+
+ /**
+ * @param primary Primary flag.
+ * @param evt Event.
+ * @param recordIgniteEvt Fired event.
+ * @param fut Dht future.
+ */
+ ContinuousQueryAsyncClosure(
+ boolean primary,
+ CacheContinuousQueryEvent<K, V> evt,
+ boolean recordIgniteEvt,
+ IgniteInternalFuture<?> fut) {
+ this.primary = primary;
+ this.evt = evt;
+ this.recordIgniteEvt = recordIgniteEvt;
+ this.fut = fut;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ final boolean notify = filter(evt, primary);
+
+ if (!primary())
+ return;
+
+ if (fut == null) {
+ onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
+
+ return;
+ }
+
+ if (fut.isDone()) {
+ if (fut.error() != null)
+ evt.entry().markFiltered();
+
+ onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
+ }
+ else {
+ fut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
+ if (f.error() != null)
+ evt.entry().markFiltered();
+
+ ctx.asyncCallbackPool().execute(new Runnable() {
+ @Override public void run() {
+ onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
+ }
+ }, evt.entry().partition());
+ }
+ });
+ }
+ }
+
+ /**
+ * @return {@code True} if event fired on this node.
+ */
+ private boolean primary() {
+ return primary || skipPrimaryCheck;
+ }
+
+ /** {@inheritDoc} */
+ public String toString() {
+ return S.toString(ContinuousQueryAsyncClosure.class, this);
+ }
+ }
+
+ /**
* Deployable object.
*/
protected static class DeployableObject implements Externalizable {
http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 83ff32c..8eca81c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.processors.cache.query.continuous;
import java.util.Map;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
+import org.jetbrains.annotations.Nullable;
/**
* Continuous query listener.
@@ -36,8 +38,10 @@ public interface CacheContinuousQueryListener<K, V> {
* @param evt Event
* @param primary Primary flag.
* @param recordIgniteEvt Whether to record event.
+ * @param fut Dht atomic future.
*/
- public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, boolean recordIgniteEvt);
+ public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary,
+ boolean recordIgniteEvt, @Nullable GridDhtAtomicUpdateFuture fut);
/**
* Listener unregistered callback.