You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/03/03 14:22:24 UTC
[1/2] ignite git commit: Fixed IGNITE-1186 "Filter is sent instead of
factory when continuous query is created".
Repository: ignite
Updated Branches:
refs/heads/master c13339fef -> baa131220
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
index 62ed66f..cdf4ffd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
@@ -28,11 +28,14 @@ import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
@@ -40,7 +43,9 @@ import javax.cache.processor.EntryProcessor;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cache.CacheMemoryMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.Affinity;
@@ -55,6 +60,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
@@ -132,6 +138,51 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
/**
* @throws Exception If failed.
*/
+ public void testFilterAndFactoryProvided() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
+ final IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(ccfg);
+
+ try {
+ final ContinuousQuery qry = new ContinuousQuery();
+
+ qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter>() {
+ @Override public CacheEntryEventFilter create() {
+ return null;
+ }
+ });
+
+ qry.setRemoteFilter(new CacheEntryEventSerializableFilter() {
+ @Override public boolean evaluate(CacheEntryEvent event) throws CacheEntryListenerException {
+ return false;
+ }
+ });
+
+ qry.setLocalListener(new CacheEntryUpdatedListener() {
+ @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
+ // No-op.
+ }
+ });
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return cache.query(qry);
+ }
+ }, IgniteException.class, null);
+
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testAtomicClient() throws Exception {
CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
1,
@@ -576,7 +627,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
* @param deploy The place where continuous query will be started.
* @throws Exception If failed.
*/
- private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy)
+ protected void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy)
throws Exception {
ignite(0).createCache(ccfg);
@@ -1124,7 +1175,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
* @param store If {@code true} configures dummy cache store.
* @return Cache configuration.
*/
- private CacheConfiguration<Object, Object> cacheConfiguration(
+ protected CacheConfiguration<Object, Object> cacheConfiguration(
CacheMode cacheMode,
int backups,
CacheAtomicityMode atomicityMode,
@@ -1176,7 +1227,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
/**
*
*/
- static class QueryTestKey implements Serializable, Comparable {
+ public static class QueryTestKey implements Serializable, Comparable {
/** */
private final Integer key;
@@ -1219,12 +1270,12 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
/**
*
*/
- static class QueryTestValue implements Serializable {
+ public static class QueryTestValue implements Serializable {
/** */
- private final Integer val1;
+ protected final Integer val1;
/** */
- private final String val2;
+ protected final String val2;
/**
* @param val Value.
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilter.java
----------------------------------------------------------------------
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilter.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilter.java
new file mode 100644
index 0000000..359dd58
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p;
+
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListenerException;
+
+/**
+ * Event filter for deployment.
+ */
+public class CacheDeploymentEntryEventFilter implements CacheEntryEventFilter<Integer, Integer> {
+ /** {@inheritDoc} */
+ @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt)
+ throws CacheEntryListenerException {
+ return evt.getValue() == null || evt.getValue() % 2 != 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilterFactory.java
----------------------------------------------------------------------
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilterFactory.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilterFactory.java
new file mode 100644
index 0000000..0d6eceb
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilterFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p;
+
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEventFilter;
+
+/**
+ * Event filter factory for deployment.
+ */
+public class CacheDeploymentEntryEventFilterFactory implements Factory<CacheEntryEventFilter<Integer, Integer>> {
+ /** {@inheritDoc} */
+ @Override public CacheEntryEventFilter<Integer, Integer> create() {
+ return new CacheDeploymentEntryEventFilter();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 968dbf6..083af1e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -79,12 +79,14 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinu
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterReplicatedTxTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchAckTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchForceServerModeAckTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxOffheapTieredTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxReplicatedSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOperationP2PTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapTieredTest;
@@ -225,6 +227,8 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.class);
suite.addTestSuite(CacheContinuousQueryFailoverTxOffheapTieredTest.class);
suite.addTestSuite(CacheContinuousQueryRandomOperationsTest.class);
+ suite.addTestSuite(CacheContinuousQueryFactoryFilterTest.class);
+ suite.addTestSuite(CacheContinuousQueryOperationP2PTest.class);
suite.addTestSuite(CacheContinuousBatchAckTest.class);
suite.addTestSuite(CacheContinuousBatchForceServerModeAckTest.class);
[2/2] ignite git commit: Fixed IGNITE-1186 "Filter is sent instead of
factory when continuous query is created".
Posted by nt...@apache.org.
Fixed IGNITE-1186 "Filter is sent instead of factory when continuous query is created".
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/baa13122
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/baa13122
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/baa13122
Branch: refs/heads/master
Commit: baa131220bf503da0908e4ecfee92966317e209c
Parents: c13339f
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Thu Mar 3 16:21:53 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Thu Mar 3 16:21:53 2016 +0300
----------------------------------------------------------------------
.../ignite/cache/query/ContinuousQuery.java | 35 +
.../processors/cache/IgniteCacheProxy.java | 4 +
.../continuous/CacheContinuousQueryHandler.java | 86 ++-
.../CacheContinuousQueryHandlerV2.java | 176 +++++
.../continuous/CacheContinuousQueryManager.java | 238 +++++--
.../continuous/GridContinuousProcessor.java | 7 +-
.../IgniteCacheEntryListenerAbstractTest.java | 75 +-
.../cache/IgniteCacheEntryListenerTxTest.java | 5 -
.../GridCacheReplicatedPreloadSelfTest.java | 39 +-
.../CacheContinuousQueryFactoryFilterTest.java | 714 +++++++++++++++++++
...ContinuousQueryFailoverAbstractSelfTest.java | 2 +-
.../CacheContinuousQueryOperationP2PTest.java | 326 +++++++++
...acheContinuousQueryRandomOperationsTest.java | 63 +-
.../p2p/CacheDeploymentEntryEventFilter.java | 33 +
.../CacheDeploymentEntryEventFilterFactory.java | 31 +
.../IgniteCacheQuerySelfTestSuite.java | 4 +
16 files changed, 1706 insertions(+), 132 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
index df1bad3..3ea8f93 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
@@ -18,6 +18,8 @@
package org.apache.ignite.cache.query;
import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryUpdatedListener;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
@@ -119,6 +121,9 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
/** Remote filter. */
private CacheEntryEventSerializableFilter<K, V> rmtFilter;
+ /** Remote filter factory. */
+ private Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory;
+
/** Time interval. */
private long timeInterval = DFLT_TIME_INTERVAL;
@@ -196,7 +201,10 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
*
* @param rmtFilter Key-value filter.
* @return {@code this} for chaining.
+ *
+ * @deprecated Use {@link #setRemoteFilterFactory(Factory)} instead.
*/
+ @Deprecated
public ContinuousQuery<K, V> setRemoteFilter(CacheEntryEventSerializableFilter<K, V> rmtFilter) {
this.rmtFilter = rmtFilter;
@@ -213,6 +221,33 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
}
/**
+ * Sets optional key-value filter factory. This factory produces filter is called before entry is
+ * sent to the master node.
+ * <p>
+ * <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking
+ * (e.g., synchronization or transactional cache operations), should be executed asynchronously
+ * without blocking the thread that called the filter. Otherwise, you can get deadlocks.
+ *
+ * @param rmtFilterFactory Key-value filter factory.
+ * @return {@code this} for chaining.
+ */
+ public ContinuousQuery<K, V> setRemoteFilterFactory(
+ Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory) {
+ this.rmtFilterFactory = rmtFilterFactory;
+
+ return this;
+ }
+
+ /**
+ * Gets remote filter.
+ *
+ * @return Remote filter.
+ */
+ public Factory<? extends CacheEntryEventFilter<K, V>> getRemoteFilterFactory() {
+ return rmtFilterFactory;
+ }
+
+ /**
* Sets time interval.
* <p>
* When a cache update happens, entry is first put into a buffer. Entries from buffer will
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 5ed8753..6e8bcbf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -565,10 +565,14 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
if (qry.getLocalListener() == null)
throw new IgniteException("Mandatory local listener is not set for the query: " + qry);
+ if (qry.getRemoteFilter() != null && qry.getRemoteFilterFactory() != null)
+ throw new IgniteException("Should be used either RemoterFilter or RemoteFilterFactory.");
+
try {
final UUID routineId = ctx.continuousQueries().executeQuery(
qry.getLocalListener(),
qry.getRemoteFilter(),
+ qry.getRemoteFilterFactory(),
qry.getPageSize(),
qry.getTimeInterval(),
qry.isAutoUnsubscribe(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 1938edb..10fbd89 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -37,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.event.EventType;
import org.apache.ignite.IgniteCache;
@@ -168,30 +168,18 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
* @param topic Topic for ordered messages.
* @param locLsnr Local listener.
* @param rmtFilter Remote filter.
- * @param internal Internal flag.
- * @param notifyExisting Notify existing flag.
* @param oldValRequired Old value required flag.
* @param sync Synchronous flag.
* @param ignoreExpired Ignore expired events flag.
- * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache.
- * @param taskHash Task name hash code.
- * @param locCache {@code True} if local cache.
- * @param keepBinary Keep binary flag.
*/
public CacheContinuousQueryHandler(
String cacheName,
Object topic,
CacheEntryUpdatedListener<K, V> locLsnr,
CacheEntryEventSerializableFilter<K, V> rmtFilter,
- boolean internal,
- boolean notifyExisting,
boolean oldValRequired,
boolean sync,
boolean ignoreExpired,
- int taskHash,
- boolean skipPrimaryCheck,
- boolean locCache,
- boolean keepBinary,
boolean ignoreClsNotFound) {
assert topic != null;
assert locLsnr != null;
@@ -200,20 +188,49 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
this.topic = topic;
this.locLsnr = locLsnr;
this.rmtFilter = rmtFilter;
- this.internal = internal;
- this.notifyExisting = notifyExisting;
this.oldValRequired = oldValRequired;
this.sync = sync;
this.ignoreExpired = ignoreExpired;
- this.taskHash = taskHash;
- this.skipPrimaryCheck = skipPrimaryCheck;
- this.locCache = locCache;
- this.keepBinary = keepBinary;
this.ignoreClsNotFound = ignoreClsNotFound;
cacheId = CU.cacheId(cacheName);
}
+ /**
+ * @param internal Internal query.
+ */
+ public void internal(boolean internal) {
+ this.internal = internal;
+ }
+
+ /**
+ * @param notifyExisting Notify existing.
+ */
+ public void notifyExisting(boolean notifyExisting) {
+ this.notifyExisting = notifyExisting;
+ }
+
+ /**
+ * @param locCache Local cache.
+ */
+ public void localCache(boolean locCache) {
+ this.locCache = locCache;
+ }
+
+ /**
+ * @param taskHash Task hash.
+ */
+ public void taskNameHash(int taskHash) {
+ this.taskHash = taskHash;
+ }
+
+ /**
+ * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache.
+ */
+ public void skipPrimaryCheck(boolean skipPrimaryCheck) {
+ this.skipPrimaryCheck = skipPrimaryCheck;
+ }
+
/** {@inheritDoc} */
@Override public boolean isEvents() {
return false;
@@ -262,8 +279,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
if (locLsnr != null)
ctx.resource().injectGeneric(locLsnr);
- if (rmtFilter != null)
- ctx.resource().injectGeneric(rmtFilter);
+ final CacheEntryEventFilter filter = getEventFilter();
+
+ if (filter != null)
+ ctx.resource().injectGeneric(filter);
entryBufs = new ConcurrentHashMap<>();
@@ -303,7 +322,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
null,
null,
null,
- rmtFilter,
+ filter instanceof CacheEntryEventSerializableFilter ?
+ (CacheEntryEventSerializableFilter)filter : null,
null,
nodeId,
taskName()
@@ -332,9 +352,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
boolean notify = !evt.entry().isFiltered();
- if (notify && rmtFilter != null) {
+ if (notify && filter != null) {
try {
- notify = rmtFilter.evaluate(evt);
+ notify = filter.evaluate(evt);
}
catch (Exception e) {
U.error(cctx.logger(CacheContinuousQueryHandler.class), "CacheEntryEventFilter failed: " + e);
@@ -422,7 +442,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
null,
null,
null,
- rmtFilter,
+ filter instanceof CacheEntryEventSerializableFilter ?
+ (CacheEntryEventSerializableFilter)filter : null,
null,
nodeId,
taskName(),
@@ -435,8 +456,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
@Override public void onUnregister() {
- if (rmtFilter instanceof PlatformContinuousQueryFilter)
- ((PlatformContinuousQueryFilter)rmtFilter).onQueryUnregister();
+ if (filter instanceof PlatformContinuousQueryFilter)
+ ((PlatformContinuousQueryFilter)filter).onQueryUnregister();
}
@Override public void cleanupBackupQueue(Map<Integer, Long> updateCntrs) {
@@ -517,6 +538,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
/**
+ * @return Cache entry event filter.
+ */
+ public CacheEntryEventFilter getEventFilter() {
+ return rmtFilter;
+ }
+
+ /**
* @param cctx Context.
* @param nodeId ID of the node that started routine.
* @param entry Entry.
@@ -1189,7 +1217,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
/**
* Deployable object.
*/
- private static class DeployableObject implements Externalizable {
+ protected static class DeployableObject implements Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -1214,7 +1242,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
* @param ctx Kernal context.
* @throws IgniteCheckedException In case of error.
*/
- private DeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException {
+ protected DeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException {
assert obj != null;
assert ctx != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
new file mode 100644
index 0000000..7aef4dd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.UUID;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter;
+import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Continuous query handler V2 version. Contains {@link Factory} for remote listener.
+ */
+public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHandler<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Remote filter factory. */
+ private Factory<? extends CacheEntryEventFilter> rmtFilterFactory;
+
+ /** Deployable object for filter factory. */
+ private DeployableObject rmtFilterFactoryDep;
+
+ /** Event types for JCache API. */
+ private byte types;
+
+ /** */
+ protected transient CacheEntryEventFilter filter;
+
+ /**
+ * Required by {@link Externalizable}.
+ */
+ public CacheContinuousQueryHandlerV2() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param cacheName Cache name.
+ * @param topic Topic for ordered messages.
+ * @param locLsnr Local listener.
+ * @param rmtFilterFactory Remote filter factory.
+ * @param oldValRequired Old value required flag.
+ * @param sync Synchronous flag.
+ * @param ignoreExpired Ignore expired events flag.
+ * @param types Event types.
+ */
+ public CacheContinuousQueryHandlerV2(
+ String cacheName,
+ Object topic,
+ CacheEntryUpdatedListener<K, V> locLsnr,
+ Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory,
+ boolean oldValRequired,
+ boolean sync,
+ boolean ignoreExpired,
+ boolean ignoreClsNotFound,
+ @Nullable Byte types) {
+ super(cacheName,
+ topic,
+ locLsnr,
+ null,
+ oldValRequired,
+ sync,
+ ignoreExpired,
+ ignoreClsNotFound);
+
+ assert rmtFilterFactory != null;
+
+ this.rmtFilterFactory = rmtFilterFactory;
+
+ if (types != null) {
+ assert types != 0;
+
+ this.types = types;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheEntryEventFilter getEventFilter() {
+ if (filter == null) {
+ assert rmtFilterFactory != null;
+
+ Factory<? extends CacheEntryEventFilter> factory = rmtFilterFactory;
+
+ filter = factory.create();
+
+ if (types != 0)
+ filter = new JCacheQueryRemoteFilter(filter, types);
+ }
+
+ return filter;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException {
+ super.p2pMarshal(ctx);
+
+ if (rmtFilterFactory != null && !U.isGrid(rmtFilterFactory.getClass()))
+ rmtFilterFactoryDep = new DeployableObject(rmtFilterFactory, ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
+ super.p2pUnmarshal(nodeId, ctx);
+
+ if (rmtFilterFactoryDep != null)
+ rmtFilterFactory = rmtFilterFactoryDep.unmarshal(nodeId, ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridContinuousHandler clone() {
+ return super.clone();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheContinuousQueryHandlerV2.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+
+ boolean b = rmtFilterFactoryDep != null;
+
+ out.writeBoolean(b);
+
+ if (b)
+ out.writeObject(rmtFilterFactoryDep);
+ else
+ out.writeObject(rmtFilterFactory);
+
+ out.writeByte(types);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+
+ boolean b = in.readBoolean();
+
+ if (b)
+ rmtFilterFactoryDep = (DeployableObject)in.readObject();
+ else
+ rmtFilterFactory = (Factory)in.readObject();
+
+ types = in.readByte();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 409c1da..353043f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -23,15 +23,16 @@ import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Map;
import java.util.Collection;
import java.util.Iterator;
+import java.util.Map;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryCreatedListener;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
@@ -54,10 +55,11 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.resources.LoggerResource;
@@ -70,6 +72,7 @@ import static javax.cache.event.EventType.REMOVED;
import static javax.cache.event.EventType.UPDATED;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
+import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.QUERY_MSG_VER_2_SINCE;
/**
* Continuous queries manager.
@@ -413,28 +416,80 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
* @return Continuous routine ID.
* @throws IgniteCheckedException In case of error.
*/
- public UUID executeQuery(CacheEntryUpdatedListener locLsnr,
- CacheEntryEventSerializableFilter rmtFilter,
+ public UUID executeQuery(final CacheEntryUpdatedListener locLsnr,
+ @Nullable final CacheEntryEventSerializableFilter rmtFilter,
+ @Nullable final Factory<? extends CacheEntryEventFilter> rmtFilterFactory,
int bufSize,
long timeInterval,
boolean autoUnsubscribe,
boolean loc,
- boolean keepBinary) throws IgniteCheckedException
+ final boolean keepBinary) throws IgniteCheckedException
{
+ IgniteClosure<Boolean, CacheContinuousQueryHandler> clsr;
+
+ if (rmtFilterFactory != null)
+ clsr = new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
+ @Override public CacheContinuousQueryHandler apply(Boolean v2) {
+ CacheContinuousQueryHandler hnd;
+
+ if (v2)
+ hnd = new CacheContinuousQueryHandlerV2(
+ cctx.name(),
+ TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+ locLsnr,
+ rmtFilterFactory,
+ true,
+ false,
+ true,
+ false,
+ null);
+ else {
+ CacheEntryEventFilter fltr = rmtFilterFactory.create();
+
+ if (!(fltr instanceof CacheEntryEventSerializableFilter))
+ throw new IgniteException("Topology has nodes of the old versions. In this case " +
+ "EntryEventFilter should implement " +
+ "org.apache.ignite.cache.CacheEntryEventSerializableFilter interface. Filter: " + fltr);
+
+ hnd = new CacheContinuousQueryHandler(
+ cctx.name(),
+ TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+ locLsnr,
+ (CacheEntryEventSerializableFilter)fltr,
+ true,
+ false,
+ true,
+ false);
+ }
+
+ return hnd;
+ }
+ };
+ else
+ clsr = new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
+ @Override public CacheContinuousQueryHandler apply(Boolean ignore) {
+ return new CacheContinuousQueryHandler(
+ cctx.name(),
+ TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+ locLsnr,
+ rmtFilter,
+ true,
+ false,
+ true,
+ false);
+ }
+ };
+
return executeQuery0(
locLsnr,
- rmtFilter,
+ clsr,
bufSize,
timeInterval,
autoUnsubscribe,
false,
false,
- true,
- false,
- true,
loc,
- keepBinary,
- false);
+ keepBinary);
}
/**
@@ -445,27 +500,35 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
* @return Continuous routine ID.
* @throws IgniteCheckedException In case of error.
*/
- public UUID executeInternalQuery(CacheEntryUpdatedListener<?, ?> locLsnr,
- CacheEntryEventSerializableFilter rmtFilter,
- boolean loc,
- boolean notifyExisting,
- boolean ignoreClassNotFound)
+ public UUID executeInternalQuery(final CacheEntryUpdatedListener<?, ?> locLsnr,
+ final CacheEntryEventSerializableFilter rmtFilter,
+ final boolean loc,
+ final boolean notifyExisting,
+ final boolean ignoreClassNotFound)
throws IgniteCheckedException
{
return executeQuery0(
locLsnr,
- rmtFilter,
+ new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
+ @Override public CacheContinuousQueryHandler apply(Boolean aBoolean) {
+ return new CacheContinuousQueryHandler(
+ cctx.name(),
+ TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+ locLsnr,
+ rmtFilter,
+ true,
+ false,
+ true,
+ ignoreClassNotFound);
+ }
+ },
ContinuousQuery.DFLT_PAGE_SIZE,
ContinuousQuery.DFLT_TIME_INTERVAL,
ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE,
true,
notifyExisting,
- true,
- false,
- true,
loc,
- false,
- ignoreClassNotFound);
+ false);
}
/**
@@ -539,32 +602,24 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
/**
* @param locLsnr Local listener.
- * @param rmtFilter Remote filter.
* @param bufSize Buffer size.
* @param timeInterval Time interval.
* @param autoUnsubscribe Auto unsubscribe flag.
* @param internal Internal flag.
* @param notifyExisting Notify existing flag.
- * @param oldValRequired Old value required flag.
- * @param sync Synchronous flag.
- * @param ignoreExpired Ignore expired event flag.
* @param loc Local flag.
* @return Continuous routine ID.
* @throws IgniteCheckedException In case of error.
*/
private UUID executeQuery0(CacheEntryUpdatedListener locLsnr,
- final CacheEntryEventSerializableFilter rmtFilter,
+ IgniteClosure<Boolean, CacheContinuousQueryHandler> clsr,
int bufSize,
long timeInterval,
boolean autoUnsubscribe,
boolean internal,
boolean notifyExisting,
- boolean oldValRequired,
- boolean sync,
- boolean ignoreExpired,
boolean loc,
- final boolean keepBinary,
- boolean ignoreClassNotFound) throws IgniteCheckedException
+ final boolean keepBinary) throws IgniteCheckedException
{
cctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -573,21 +628,16 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED && cctx.affinityNode();
- GridContinuousHandler hnd = new CacheContinuousQueryHandler(
- cctx.name(),
- TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
- locLsnr,
- rmtFilter,
- internal,
- notifyExisting,
- oldValRequired,
- sync,
- ignoreExpired,
- taskNameHash,
- skipPrimaryCheck,
- cctx.isLocal(),
- keepBinary,
- ignoreClassNotFound);
+ boolean v2 = useV2Protocol(cctx.discovery().allNodes());
+
+ final CacheContinuousQueryHandler hnd = clsr.apply(v2);
+
+ hnd.taskNameHash(taskNameHash);
+ hnd.skipPrimaryCheck(skipPrimaryCheck);
+ hnd.notifyExisting(notifyExisting);
+ hnd.internal(internal);
+ hnd.keepBinary(keepBinary);
+ hnd.localCache(cctx.isLocal());
IgnitePredicate<ClusterNode> pred = (loc || cctx.config().getCacheMode() == CacheMode.LOCAL) ?
F.nodeForNodeId(cctx.localNodeId()) : F.<ClusterNode>alwaysTrue();
@@ -654,7 +704,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
cctx.kernalContext().cache().jcache(cctx.name()),
cctx, entry);
- if (rmtFilter != null && !rmtFilter.evaluate(next))
+ if (hnd.getEventFilter() != null && !hnd.getEventFilter().evaluate(next))
next = null;
}
}
@@ -667,6 +717,20 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
/**
+ * @param nodes Nodes.
+ * @return {@code True} if all nodes greater than {@link GridContinuousProcessor#QUERY_MSG_VER_2_SINCE},
+ * otherwise {@code false}.
+ */
+ private boolean useV2Protocol(Collection<ClusterNode> nodes) {
+ for (ClusterNode node : nodes) {
+ if (QUERY_MSG_VER_2_SINCE.compareTo(node.version()) > 0)
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
* @param lsnrId Listener ID.
* @param lsnr Listener.
* @param internal Internal flag.
@@ -767,36 +831,70 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
if (types == 0)
throw new IgniteCheckedException("Listener must implement one of CacheEntryListener sub-interfaces.");
- CacheEntryUpdatedListener locLsnr = new JCacheQueryLocalListener(
+ final byte types0 = types;
+
+ final CacheEntryUpdatedListener locLsnr = new JCacheQueryLocalListener(
locLsnrImpl,
log);
- CacheEntryEventFilter fltr = null;
-
- if (cfg.getCacheEntryEventFilterFactory() != null) {
- fltr = (CacheEntryEventFilter)cfg.getCacheEntryEventFilterFactory().create();
-
- if (!(fltr instanceof Serializable))
- throw new IgniteCheckedException("Cache entry event filter must implement java.io.Serializable: "
- + fltr);
- }
-
- CacheEntryEventSerializableFilter rmtFilter = new JCacheQueryRemoteFilter(fltr, types);
-
routineId = executeQuery0(
locLsnr,
- rmtFilter,
+ new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
+ @Override public CacheContinuousQueryHandler apply(Boolean v2) {
+ CacheContinuousQueryHandler hnd;
+ Factory<CacheEntryEventFilter> rmtFilterFactory = cfg.getCacheEntryEventFilterFactory();
+
+ v2 = rmtFilterFactory != null && v2;
+
+ if (v2)
+ hnd = new CacheContinuousQueryHandlerV2(
+ cctx.name(),
+ TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+ locLsnr,
+ rmtFilterFactory,
+ cfg.isOldValueRequired(),
+ cfg.isSynchronous(),
+ false,
+ false,
+ types0);
+ else {
+ JCacheQueryRemoteFilter jCacheFilter;
+
+ CacheEntryEventFilter filter = null;
+
+ if (rmtFilterFactory != null) {
+ filter = rmtFilterFactory.create();
+
+ if (!(filter instanceof Serializable))
+ throw new IgniteException("Topology has nodes of the old versions. " +
+ "In this case EntryEventFilter must implement java.io.Serializable " +
+ "interface. Filter: " + filter);
+ }
+
+ jCacheFilter = new JCacheQueryRemoteFilter(filter, types0);
+
+ hnd = new CacheContinuousQueryHandler(
+ cctx.name(),
+ TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+ locLsnr,
+ jCacheFilter,
+ cfg.isOldValueRequired(),
+ cfg.isSynchronous(),
+ false,
+ false);
+ }
+
+ return hnd;
+ }
+ },
ContinuousQuery.DFLT_PAGE_SIZE,
ContinuousQuery.DFLT_TIME_INTERVAL,
ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE,
false,
false,
- cfg.isOldValueRequired(),
- cfg.isSynchronous(),
false,
- false,
- keepBinary,
- false);
+ keepBinary
+ );
}
/**
@@ -814,6 +912,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
/**
+ *
*/
private static class JCacheQueryLocalListener<K, V> implements CacheEntryUpdatedListener<K, V> {
/** */
@@ -896,8 +995,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
/**
+ * For handler version 2.0 this filter should not be serialized.
*/
- private static class JCacheQueryRemoteFilter implements CacheEntryEventSerializableFilter, Externalizable {
+ protected static class JCacheQueryRemoteFilter implements CacheEntryEventSerializableFilter, Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -922,7 +1022,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
* @param impl Filter.
* @param types Types.
*/
- JCacheQueryRemoteFilter(CacheEntryEventFilter impl, byte types) {
+ JCacheQueryRemoteFilter(@Nullable CacheEntryEventFilter impl, byte types) {
assert types != 0;
this.impl = impl;
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 1ec69c2..1776748 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -73,6 +73,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
@@ -110,6 +111,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
/** Threads started by this processor. */
private final Map<UUID, IgniteThread> bufCheckThreads = new ConcurrentHashMap8<>();
+ /** */
+ public static final IgniteProductVersion QUERY_MSG_VER_2_SINCE = IgniteProductVersion.fromString("1.5.9");
+
/** */
private final ConcurrentMap<IgniteUuid, SyncMessageAckFuture> syncMsgFuts = new ConcurrentHashMap8<>();
@@ -615,7 +619,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
startFuts.put(routineId, fut);
try {
- if (locIncluded && registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true))
+ if (locIncluded
+ && registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true))
hnd.onListenerRegistered(routineId, ctx);
ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData,
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index e6bfd87..35fbbd5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -42,6 +42,7 @@ import javax.cache.configuration.FactoryBuilder;
import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
import javax.cache.event.CacheEntryCreatedListener;
import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryExpiredListener;
import javax.cache.event.CacheEntryListener;
import javax.cache.event.CacheEntryListenerException;
@@ -99,6 +100,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/** */
private boolean useObjects;
+ /** */
+ private static AtomicBoolean serialized = new AtomicBoolean(false);
+
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
@@ -138,6 +142,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
assertEquals(0, syncMsgFuts.size());
}
+
+ serialized.set(false);
}
/**
@@ -178,11 +184,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
return new CreateUpdateRemoveExpireListener();
}
},
- new Factory<CacheEntryEventSerializableFilter<Object, Object>>() {
- @Override public CacheEntryEventSerializableFilter<Object, Object> create() {
- return new ExceptionFilter();
- }
- },
+ new ExceptionFilterFactory(),
false,
false
);
@@ -443,18 +445,23 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
jcache(0).registerCacheEntryListener(new MutableCacheEntryListenerConfiguration<>(
FactoryBuilder.factoryOf(lsnr),
- null,
+ new SerializableFactory(),
true,
false
));
try {
startGrid(gridCount());
+
+ jcache(0).put(1, 1);
}
finally {
stopGrid(gridCount());
}
+ jcache(0).put(2, 2);
+
+ assertFalse(IgniteCacheEntryListenerAbstractTest.serialized.get());
assertFalse(serialized.get());
}
@@ -1130,9 +1137,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class TestFilterFactory implements Factory<CacheEntryEventSerializableFilter<Object, Object>> {
+ private static class TestFilterFactory implements Factory<CacheEntryEventFilter<Object, Object>> {
/** {@inheritDoc} */
- @Override public CacheEntryEventSerializableFilter<Object, Object> create() {
+ @Override public CacheEntryEventFilter<Object, Object> create() {
return new TestFilter();
}
}
@@ -1184,7 +1191,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class TestFilter implements CacheEntryEventSerializableFilter<Object, Object> {
+ private static class TestFilter implements CacheEntryEventFilter<Object, Object>, Externalizable {
/** {@inheritDoc} */
@Override public boolean evaluate(CacheEntryEvent<?, ?> evt) {
assert evt != null;
@@ -1201,6 +1208,16 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
return key % 2 == 0;
}
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ throw new UnsupportedOperationException("Filter must not be marshaled.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ throw new UnsupportedOperationException("Filter must not be unmarshaled.");
+ }
}
/**
@@ -1355,6 +1372,36 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
}
/**
+ *
+ */
+ public static class SerializableFactory implements Factory<NonSerializableFilter> {
+ /** {@inheritDoc} */
+ @Override public NonSerializableFilter create() {
+ return new NonSerializableFilter();
+ }
+ }
+
+ /**
+ *
+ */
+ public static class NonSerializableFilter implements CacheEntryEventFilter<Object, Object>, Externalizable {
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ serialized.set(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ serialized.set(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean evaluate(CacheEntryEvent<?, ?> event) throws CacheEntryListenerException {
+ return true;
+ }
+ }
+
+ /**
*/
public static class NonSerializableListener implements CacheEntryCreatedListener<Object, Object>, Externalizable {
/** */
@@ -1467,4 +1514,14 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
return S.toString(ListenerTestValue.class, this);
}
}
+
+ /**
+ *
+ */
+ static class ExceptionFilterFactory implements Factory<CacheEntryEventSerializableFilter<Object, Object>> {
+ /** {@inheritDoc} */
+ @Override public CacheEntryEventSerializableFilter<Object, Object> create() {
+ return new ExceptionFilter();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
index 41725e7..cad57f0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java
@@ -47,9 +47,4 @@ public class IgniteCacheEntryListenerTxTest extends IgniteCacheEntryListenerAbst
@Override protected NearCacheConfiguration nearConfiguration() {
return null;
}
-
- /** {@inheritDoc} */
- @Override public void testEvents(){
- fail("https://issues.apache.org/jira/browse/IGNITE-1600");
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
index ea2f27b..c6cd5af 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.Factory;
import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryListener;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
@@ -403,18 +404,7 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
}
}
},
- new Factory<CacheEntryEventSerializableFilter<Integer, Object>>() {
- /** {@inheritDoc} */
- @Override public CacheEntryEventSerializableFilter<Integer, Object> create() {
- try {
-
- return cls2.newInstance();
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- },
+ new ClassFilterFactory(cls2),
true,
true
);
@@ -946,4 +936,29 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
return true;
}
}
+
+ /**
+ *
+ */
+ private static class ClassFilterFactory implements Factory<CacheEntryEventFilter<Integer, Object>> {
+ /** */
+ private Class<CacheEntryEventSerializableFilter> cls;
+
+ /**
+ * @param cls Class.
+ */
+ public ClassFilterFactory(Class<CacheEntryEventSerializableFilter> cls) {
+ this.cls = cls;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheEntryEventSerializableFilter<Integer, Object> create() {
+ try {
+ return cls.newInstance();
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
new file mode 100644
index 0000000..6143fa9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryCreatedListener;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryExpiredListener;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryRemovedListener;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest.NonSerializableFilter.isAccepted;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.CLIENT;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.SERVER;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFactoryFilterTest extends CacheContinuousQueryRandomOperationsTest {
+ /** */
+ private static final int NODES = 5;
+
+ /** */
+ private static final int KEYS = 50;
+
+ /** */
+ private static final int VALS = 10;
+
+ /** */
+ public static final int ITERATION_CNT = 40;
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInternalQuery() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 1,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
+ final IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(ccfg);
+
+ UUID uuid = null;
+
+ try {
+ for (int i = 0; i < 10; i++)
+ cache.put(i, i);
+
+ final CountDownLatch latch = new CountDownLatch(5);
+
+ CacheEntryUpdatedListener lsnr = new CacheEntryUpdatedListener() {
+ @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
+ for (Object evt : iterable) {
+ latch.countDown();
+
+ log.info("Received event: " + evt);
+ }
+ }
+ };
+
+ uuid = grid(0).context().cache().cache(cache.getName()).context().continuousQueries()
+ .executeInternalQuery(lsnr, new SerializableFilter(), false, true, true);
+
+ for (int i = 10; i < 20; i++)
+ cache.put(i, i);
+
+ assertTrue(latch.await(3, SECONDS));
+ }
+ finally {
+ if (uuid != null)
+ grid(0).context().cache().cache(cache.getName()).context().continuousQueries()
+ .cancelInternalQuery(uuid);
+
+ cache.destroy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy)
+ throws Exception {
+ ignite(0).createCache(ccfg);
+
+ try {
+ long seed = System.currentTimeMillis();
+
+ Random rnd = new Random(seed);
+
+ log.info("Random seed: " + seed);
+
+ List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues = new ArrayList<>();
+
+ Collection<QueryCursor<?>> curs = new ArrayList<>();
+
+ Collection<T2<Integer, MutableCacheEntryListenerConfiguration>> lsnrCfgs = new ArrayList<>();
+
+ if (deploy == CLIENT)
+ evtsQueues.add(registerListener(ccfg.getName(), NODES - 1, curs, lsnrCfgs, rnd.nextBoolean()));
+ else if (deploy == SERVER)
+ evtsQueues.add(registerListener(ccfg.getName(), rnd.nextInt(NODES - 1), curs, lsnrCfgs,
+ rnd.nextBoolean()));
+ else {
+ boolean isSync = rnd.nextBoolean();
+
+ for (int i = 0; i < NODES - 1; i++)
+ evtsQueues.add(registerListener(ccfg.getName(), i, curs, lsnrCfgs, isSync));
+ }
+
+ ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>();
+
+ Map<Integer, Long> partCntr = new ConcurrentHashMap<>();
+
+ try {
+ for (int i = 0; i < ITERATION_CNT; i++) {
+ if (i % 10 == 0)
+ log.info("Iteration: " + i);
+
+ for (int idx = 0; idx < NODES; idx++)
+ randomUpdate(rnd, evtsQueues, expData, partCntr, grid(idx).cache(ccfg.getName()));
+ }
+ }
+ finally {
+ for (QueryCursor<?> cur : curs)
+ cur.close();
+
+ for (T2<Integer, MutableCacheEntryListenerConfiguration> e : lsnrCfgs)
+ grid(e.get1()).cache(ccfg.getName()).deregisterCacheEntryListener(e.get2());
+ }
+ }
+ finally {
+ ignite(0).destroyCache(ccfg.getName());
+ }
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @param nodeIdx Node index.
+ * @param curs Cursors.
+ * @param lsnrCfgs Listener configurations.
+ * @return Event queue
+ */
+ private BlockingQueue<CacheEntryEvent<?, ?>> registerListener(String cacheName,
+ int nodeIdx,
+ Collection<QueryCursor<?>> curs,
+ Collection<T2<Integer, MutableCacheEntryListenerConfiguration>> lsnrCfgs,
+ boolean sync) {
+ final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
+
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ MutableCacheEntryListenerConfiguration<QueryTestKey, QueryTestValue> lsnrCfg =
+ new MutableCacheEntryListenerConfiguration<>(
+ FactoryBuilder.factoryOf(new LocalNonSerialiseListener() {
+ @Override protected void onEvents(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts)
+ evtsQueue.add(evt);
+ }
+ }),
+ new FilterFactory(),
+ true,
+ sync
+ );
+
+ grid(nodeIdx).cache(cacheName).registerCacheEntryListener((CacheEntryListenerConfiguration)lsnrCfg);
+
+ lsnrCfgs.add(new T2<Integer, MutableCacheEntryListenerConfiguration>(nodeIdx, lsnrCfg));
+ }
+ else {
+ ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+ for (CacheEntryEvent<?, ?> evt : evts)
+ evtsQueue.add(evt);
+ }
+ });
+
+ qry.setRemoteFilterFactory(new FilterFactory());
+
+ QueryCursor<?> cur = grid(nodeIdx).cache(cacheName).query(qry);
+
+ curs.add(cur);
+ }
+
+ return evtsQueue;
+ }
+
+ /**
+ * @param rnd Random generator.
+ * @param evtsQueues Events queue.
+ * @param expData Expected cache data.
+ * @param partCntr Partition counter.
+ * @param cache Cache.
+ * @throws Exception If failed.
+ */
+ private void randomUpdate(
+ Random rnd,
+ List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
+ ConcurrentMap<Object, Object> expData,
+ Map<Integer, Long> partCntr,
+ IgniteCache<Object, Object> cache)
+ throws Exception {
+ Object key = new QueryTestKey(rnd.nextInt(KEYS));
+ Object newVal = value(rnd);
+ Object oldVal = expData.get(key);
+
+ int op = rnd.nextInt(11);
+
+ Ignite ignite = cache.unwrap(Ignite.class);
+
+ Transaction tx = null;
+
+ if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL && rnd.nextBoolean())
+ tx = ignite.transactions().txStart(txRandomConcurrency(rnd), txRandomIsolation(rnd));
+
+ try {
+ // log.info("Random operation [key=" + key + ", op=" + op + ']');
+
+ switch (op) {
+ case 0: {
+ cache.put(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+ expData.put(key, newVal);
+
+ break;
+ }
+
+ case 1: {
+ cache.getAndPut(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+ expData.put(key, newVal);
+
+ break;
+ }
+
+ case 2: {
+ cache.remove(key);
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
+
+ expData.remove(key);
+
+ break;
+ }
+
+ case 3: {
+ cache.getAndRemove(key);
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
+
+ expData.remove(key);
+
+ break;
+ }
+
+ case 4: {
+ cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+ expData.put(key, newVal);
+
+ break;
+ }
+
+ case 5: {
+ cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean()));
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
+
+ expData.remove(key);
+
+ break;
+ }
+
+ case 6: {
+ cache.putIfAbsent(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ if (oldVal == null) {
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(evtsQueues);
+
+ break;
+ }
+
+ case 7: {
+ cache.getAndPutIfAbsent(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ if (oldVal == null) {
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(evtsQueues);
+
+ break;
+ }
+
+ case 8: {
+ cache.replace(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ if (oldVal != null) {
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(evtsQueues);
+
+ break;
+ }
+
+ case 9: {
+ cache.getAndReplace(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ if (oldVal != null) {
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(evtsQueues);
+
+ break;
+ }
+
+ case 10: {
+ if (oldVal != null) {
+ Object replaceVal = value(rnd);
+
+ boolean success = replaceVal.equals(oldVal);
+
+ if (success) {
+ cache.replace(key, replaceVal, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+ expData.put(key, newVal);
+ }
+ else {
+ cache.replace(key, replaceVal, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ checkNoEvent(evtsQueues);
+ }
+ }
+ else {
+ cache.replace(key, value(rnd), newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ checkNoEvent(evtsQueues);
+ }
+
+ break;
+ }
+
+ default:
+ fail("Op:" + op);
+ }
+ } finally {
+ if (tx != null)
+ tx.close();
+ }
+ }
+
+ /**
+ * @param rnd {@link Random}.
+ * @return {@link TransactionIsolation}.
+ */
+ private TransactionIsolation txRandomIsolation(Random rnd) {
+ int val = rnd.nextInt(3);
+
+ if (val == 0)
+ return READ_COMMITTED;
+ else if (val == 1)
+ return REPEATABLE_READ;
+ else
+ return SERIALIZABLE;
+ }
+
+ /**
+ * @param rnd {@link Random}.
+ * @return {@link TransactionConcurrency}.
+ */
+ private TransactionConcurrency txRandomConcurrency(Random rnd) {
+ return rnd.nextBoolean() ? TransactionConcurrency.OPTIMISTIC : TransactionConcurrency.PESSIMISTIC;
+ }
+
+ /**
+ * @param cache Cache.
+ * @param key Key
+ * @param cntrs Partition counters.
+ */
+ private void updatePartitionCounter(IgniteCache<Object, Object> cache, Object key, Map<Integer, Long> cntrs) {
+ Affinity<Object> aff = cache.unwrap(Ignite.class).affinity(cache.getName());
+
+ int part = aff.partition(key);
+
+ Long partCntr = cntrs.get(part);
+
+ if (partCntr == null)
+ partCntr = 0L;
+
+ cntrs.put(part, ++partCntr);
+ }
+
+ /**
+ * @param rnd Random generator.
+ * @return Cache value.
+ */
+ private static Object value(Random rnd) {
+ return new QueryTestValue(rnd.nextInt(VALS));
+ }
+
+ /**
+ * @param evtsQueues Event queue.
+ * @param partCntrs Partition counters.
+ * @param aff Affinity function.
+ * @param key Key.
+ * @param val Value.
+ * @param oldVal Old value.
+ * @throws Exception If failed.
+ */
+ private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
+ Map<Integer, Long> partCntrs,
+ Affinity<Object> aff,
+ Object key,
+ Object val,
+ Object oldVal)
+ throws Exception {
+ if ((val == null && oldVal == null
+ || (val != null && !isAccepted((QueryTestValue)val)))) {
+ checkNoEvent(evtsQueues);
+
+ return;
+ }
+
+ for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
+ CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS);
+
+ assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', evt);
+ assertEquals(key, evt.getKey());
+ assertEquals(val, evt.getValue());
+ assertEquals(oldVal, evt.getOldValue());
+
+ long cntr = partCntrs.get(aff.partition(key));
+ CacheQueryEntryEvent qryEntryEvt = evt.unwrap(CacheQueryEntryEvent.class);
+
+ assertNotNull(cntr);
+ assertNotNull(qryEntryEvt);
+
+ assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter());
+ }
+ }
+
+ /**
+ * @param evtsQueues Event queue.
+ * @throws Exception If failed.
+ */
+ private void checkNoEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues) throws Exception {
+ for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
+ CacheEntryEvent<?, ?> evt = evtsQueue.poll(50, MILLISECONDS);
+
+ assertNull(evt);
+ }
+ }
+
+ /**
+ *
+ */
+ protected static class NonSerializableFilter
+ implements CacheEntryEventSerializableFilter<CacheContinuousQueryRandomOperationsTest.QueryTestKey,
+ CacheContinuousQueryRandomOperationsTest.QueryTestValue>, Externalizable {
+ /** */
+ public NonSerializableFilter() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> event)
+ throws CacheEntryListenerException {
+ return isAccepted(event.getValue());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ fail("Entry filter should not be marshaled.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ fail("Entry filter should not be marshaled.");
+ }
+
+ /**
+ * @return {@code True} if value is even.
+ */
+ public static boolean isAccepted(QueryTestValue val) {
+ return val == null || val.val1 % 2 == 0;
+ }
+ }
+
+ /**
+ *
+ */
+ protected static class SerializableFilter implements CacheEntryEventSerializableFilter<Integer, Integer>{
+ /** */
+ public SerializableFilter() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> event)
+ throws CacheEntryListenerException {
+ return isAccepted(event.getValue());
+ }
+
+ /**
+ * @return {@code True} if value is even.
+ */
+ public static boolean isAccepted(Integer val) {
+ return val == null || val % 2 == 0;
+ }
+ }
+
+ /**
+ *
+ */
+ protected static class FilterFactory implements Factory<NonSerializableFilter> {
+ @Override public NonSerializableFilter create() {
+ return new NonSerializableFilter();
+ }
+ }
+
+ /**
+ *
+ */
+ public abstract class LocalNonSerialiseListener implements
+ CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>,
+ CacheEntryCreatedListener<QueryTestKey, QueryTestValue>,
+ CacheEntryExpiredListener<QueryTestKey, QueryTestValue>,
+ CacheEntryRemovedListener<QueryTestKey, QueryTestValue>,
+ Externalizable {
+ /** */
+ public LocalNonSerialiseListener() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onCreated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+ onEvents(evts);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onExpired(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+ onEvents(evts);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onRemoved(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+ onEvents(evts);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+ onEvents(evts);
+ }
+
+ /**
+ * @param evts Events.
+ */
+ protected abstract void onEvents(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> evts);
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ throw new UnsupportedOperationException("Failed. Listener should not be marshaled.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ throw new UnsupportedOperationException("Failed. Listener should not be unmarshaled.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index a42f056..f104f21 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -1432,7 +1432,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
GridContinuousHandler hnd = GridTestUtils.getFieldValue(info, "hnd");
if (hnd.isQuery() && hnd.cacheName() == null) {
- backupQueue = GridTestUtils.getFieldValue(hnd, "backupQueue");
+ backupQueue = GridTestUtils.getFieldValue(hnd, CacheContinuousQueryHandler.class, "backupQueue");
break;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/baa13122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java
new file mode 100644
index 0000000..97f9e0e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryCreatedListener;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class CacheContinuousQueryOperationP2PTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int NODES = 5;
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setClientMode(client);
+ cfg.setPeerClassLoadingEnabled(true);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ startGridsMultiThreaded(NODES - 1);
+
+ client = true;
+
+ startGrid(NODES - 1);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicClient() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ ATOMIC,
+ ONHEAP_TIERED
+ );
+
+ testContinuousQuery(ccfg, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomic() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ ATOMIC,
+ ONHEAP_TIERED
+ );
+
+ testContinuousQuery(ccfg, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicReplicated() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 0,
+ ATOMIC,
+ ONHEAP_TIERED
+ );
+
+ testContinuousQuery(ccfg, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicReplicatedClient() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 0,
+ ATOMIC,
+ ONHEAP_TIERED
+ );
+
+ testContinuousQuery(ccfg, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTx() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ ONHEAP_TIERED
+ );
+
+ testContinuousQuery(ccfg, false);
+ }
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxClient() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ ONHEAP_TIERED
+ );
+
+ testContinuousQuery(ccfg, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxReplicated() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 0,
+ TRANSACTIONAL,
+ ONHEAP_TIERED
+ );
+
+ testContinuousQuery(ccfg, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxReplicatedClient() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 0,
+ TRANSACTIONAL,
+ ONHEAP_TIERED
+ );
+
+ testContinuousQuery(ccfg, true);
+ }
+
+ /**
+ * @param ccfg Cache configuration.
+ * @param isClient Client.
+ * @throws Exception If failed.
+ */
+ protected void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, boolean isClient)
+ throws Exception {
+ ignite(0).createCache(ccfg);
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ QueryCursor<?> cur = null;
+
+ final Class<Factory<CacheEntryEventFilter>> evtFilterFactory =
+ (Class<Factory<CacheEntryEventFilter>>)getExternalClassLoader().
+ loadClass("org.apache.ignite.tests.p2p.CacheDeploymentEntryEventFilterFactory");
+
+ final CountDownLatch latch = new CountDownLatch(10);
+
+ ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+ TestLocalListener localLsnr = new TestLocalListener() {
+ @Override public void onEvent(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts)
+ throws CacheEntryListenerException {
+ for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) {
+ latch.countDown();
+
+ log.info("Received event: " + evt);
+ }
+ }
+ };
+
+ MutableCacheEntryListenerConfiguration<Integer, Integer> lsnrCfg =
+ new MutableCacheEntryListenerConfiguration<>(
+ new FactoryBuilder.SingletonFactory<>(localLsnr),
+ (Factory<? extends CacheEntryEventFilter<? super Integer, ? super Integer>>)
+ (Object)evtFilterFactory.newInstance(),
+ true,
+ true
+ );
+
+ qry.setLocalListener(localLsnr);
+
+ qry.setRemoteFilterFactory(
+ (Factory<? extends CacheEntryEventFilter<Integer, Integer>>)(Object)evtFilterFactory.newInstance());
+
+ IgniteCache<Integer, Integer> cache = null;
+
+ try {
+ if (isClient)
+ cache = grid(NODES - 1).cache(ccfg.getName());
+ else
+ cache = grid(rnd.nextInt(NODES - 1)).cache(ccfg.getName());
+
+ cur = cache.query(qry);
+
+ cache.registerCacheEntryListener(lsnrCfg);
+
+ for (int i = 0; i < 10; i++)
+ cache.put(i, i);
+
+ assertTrue(latch.await(3, TimeUnit.SECONDS));
+ }
+ finally {
+ if (cur != null)
+ cur.close();
+
+ if (cache != null)
+ cache.deregisterCacheEntryListener(lsnrCfg);
+ }
+ }
+
+ /**
+ *
+ * @param cacheMode Cache mode.
+ * @param backups Number of backups.
+ * @param atomicityMode Cache atomicity mode.
+ * @param memoryMode Cache memory mode.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Object, Object> cacheConfiguration(
+ CacheMode cacheMode,
+ int backups,
+ CacheAtomicityMode atomicityMode,
+ CacheMemoryMode memoryMode) {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setAtomicityMode(atomicityMode);
+ ccfg.setCacheMode(cacheMode);
+ ccfg.setMemoryMode(memoryMode);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setAtomicWriteOrderMode(PRIMARY);
+
+ if (cacheMode == PARTITIONED)
+ ccfg.setBackups(backups);
+
+ return ccfg;
+ }
+
+ /**
+ *
+ */
+ private static abstract class TestLocalListener implements CacheEntryUpdatedListener<Integer, Integer>,
+ CacheEntryCreatedListener<Integer, Integer> {
+ /** {@inheritDoc} */
+ @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts)
+ throws CacheEntryListenerException {
+ onEvent(evts);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts)
+ throws CacheEntryListenerException {
+ onEvent(evts);
+ }
+
+ /**
+ * @param evts Events.
+ */
+ protected abstract void onEvent(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts);
+ }
+}