You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/03/22 17:50:26 UTC
[50/50] [abbrv] ignite git commit: IGNITE-2004 Added tests.
IGNITE-2004 Added tests.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3192b7fa
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3192b7fa
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3192b7fa
Branch: refs/heads/ignite-2004
Commit: 3192b7fa242e5dc4e18dc07f00655f26db2004bf
Parents: 68c94bc
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Tue Mar 22 19:49:07 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Tue Mar 22 19:49:07 2016 +0300
----------------------------------------------------------------------
.../continuous/CacheContinuousQueryHandler.java | 12 +-
.../continuous/GridContinuousProcessor.java | 1 -
...eContinuousQueryAsyncFilterListenerTest.java | 324 ++++++++++++++-----
.../IgniteCacheQuerySelfTestSuite.java | 1 -
4 files changed, 255 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3192b7fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 7c3e128..5cd7b2b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -1313,7 +1313,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
private CacheEntryEventFilter filter;
/** */
- private IgniteInternalFuture<Boolean> notify;
+ private IgniteInternalFuture<Boolean> notifyFut;
/** */
private final GridCacheContext<K, V> cctx;
@@ -1349,7 +1349,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
* @param primary Primary flag.
* @param cctx Cache context.
* @param filter Filter.
- * @param notify
+ * @param notifyFut Notify future.
* @param evt Event.
* @param cache Cache.
*/
@@ -1362,7 +1362,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
boolean primary,
GridCacheContext<K, V> cctx,
CacheEntryEventFilter filter,
- IgniteInternalFuture<Boolean> notify,
+ IgniteInternalFuture<Boolean> notifyFut,
CacheContinuousQueryEvent<K, V> evt,
IgniteCache cache) {
this.taskName = taskName;
@@ -1374,7 +1374,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
this.primary = primary;
this.cctx = cctx;
this.filter = filter;
- this.notify = notify;
+ this.notifyFut = notifyFut;
this.evt = evt;
this.cache = cache;
}
@@ -1383,7 +1383,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
@Override public void run() {
boolean notify;
- if (evt.getFilterFuture() == null) {
+ if (notifyFut == null) {
notify = !evt.entry().isFiltered();
if (notify && filter != null) {
@@ -1397,7 +1397,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
else {
try {
- notify = evt.getFilterFuture().get();
+ notify = notifyFut.get();
}
catch (IgniteCheckedException e) {
U.error(cctx.logger(CacheContinuousQueryHandler.class), "CacheEntryEventFilter failed.", e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/3192b7fa/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index dd30822..f2d6e1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -76,7 +76,6 @@ import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
http://git-wip-us.apache.org/repos/asf/ignite/blob/3192b7fa/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
index 677845f..8eab9d3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.query.continuous;
import java.io.Serializable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import javax.cache.configuration.FactoryBuilder;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryListenerException;
@@ -44,6 +45,7 @@ import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
@@ -57,6 +59,8 @@ import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
*
@@ -82,6 +86,11 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
cfg.setClientMode(client);
+ MemoryEventStorageSpi storeSpi = new MemoryEventStorageSpi();
+ storeSpi.setExpireCount(1000);
+
+ cfg.setEventStorageSpi(storeSpi);
+
return cfg;
}
@@ -103,161 +112,244 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
super.afterTestsStopped();
}
+ ///
+ /// ASYNC FILTER AND LISTENER. TEST LISTENER.
+ ///
+
/**
* @throws Exception If failed.
*/
public void testNonDeadLockInListenerTx() throws Exception {
- testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED));
+ testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true);
}
/**
* @throws Exception If failed.
*/
public void testNonDeadLockInListenerTxOffHeap() throws Exception {
- testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED));
+ testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), true, true);
}
/**
* @throws Exception If failed.
*/
public void testNonDeadLockInListenerTxOffHeapValues() throws Exception {
- testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_VALUES));
+ testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_VALUES), true, true);
}
/**
* @throws Exception If failed.
*/
public void testNonDeadLockInListenerAtomic() throws Exception {
- testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED));
+ testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), true, true);
}
/**
* @throws Exception If failed.
*/
public void testNonDeadLockInListenerReplicatedAtomic() throws Exception {
- testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED));
+ testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), true, true);
}
/**
* @throws Exception If failed.
*/
public void testNonDeadLockInListenerReplicatedAtomicOffHeapValues() throws Exception {
- testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED));
+ testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), true, true);
}
/**
* @throws Exception If failed.
*/
public void testNonDeadLockInListenerAtomicOffHeap() throws Exception {
- testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED));
+ testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true);
}
/**
* @throws Exception If failed.
*/
public void testNonDeadLockInListenerAtomicOffHeapValues() throws Exception {
- testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED));
+ testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true);
}
/**
* @throws Exception If failed.
*/
public void testNonDeadLockInListenerAtomicWithoutBackup() throws Exception {
- testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED));
+ testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED), true, true);
}
/**
* @throws Exception If failed.
*/
public void testNonDeadLockInListener() throws Exception {
- testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED));
+ testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true);
}
/**
* @throws Exception If failed.
*/
public void testNonDeadLockInListenerReplicated() throws Exception {
- testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED));
+ testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true);
}
+ ///
+ /// ASYNC FILTER AND LISTENER. TEST FILTER.
+ ///
+
/**
- * START START START
- *
- *
- *
* @throws Exception If failed.
*/
public void testNonDeadLockInFilterTx() throws Exception {
- testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED));
+ testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true);
}
/**
* @throws Exception If failed.
*/
public void testNonDeadLockInFilterTxOffHeap() throws Exception {
- testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED));
+ testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), true, true);
}
/**
* @throws Exception If failed.
*/
public void testNonDeadLockInFilterTxOffHeapValues() throws Exception {
- testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_VALUES));
+ testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_VALUES), true, true);
}
/**
* @throws Exception If failed.
*/
public void testNonDeadLockInFilterAtomic() throws Exception {
- testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED));
+ testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), true, true);
}
/**
* @throws Exception If failed.
*/
public void testNonDeadLockInFilterReplicatedAtomic() throws Exception {
- testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED));
+ testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), true, true);
}
/**
* @throws Exception If failed.
*/
public void testNonDeadLockInFilterAtomicOffHeap() throws Exception {
- testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED));
+ testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true);
}
/**
* @throws Exception If failed.
*/
public void testNonDeadLockInFilterAtomicOffHeapValues() throws Exception {
- testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED));
+ testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true);
}
/**
* @throws Exception If failed.
*/
public void testNonDeadLockInFilterAtomicWithoutBackup() throws Exception {
- testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED));
+ testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED), true, true);
}
/**
* @throws Exception If failed.
*/
public void testNonDeadLockInFilter() throws Exception {
- testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED));
+ testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true);
}
/**
* @throws Exception If failed.
*/
public void testNonDeadLockInFilterReplicated() throws Exception {
- testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED));
+ testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true);
+ }
+
+ ///
+ /// ASYNC LISTENER. TEST LISTENER.
+ ///
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNonDeadLockInFilterTxSyncFilter() throws Exception {
+ testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNonDeadLockInFilterTxOffHeapSyncFilter() throws Exception {
+ testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNonDeadLockInFilterTxOffHeapValuesSyncFilter() throws Exception {
+ testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_VALUES), false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNonDeadLockInFilterAtomicSyncFilter() throws Exception {
+ testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNonDeadLockInFilterReplicatedAtomicSyncFilter() throws Exception {
+ testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNonDeadLockInFilterAtomicOffHeapSyncFilter() throws Exception {
+ testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNonDeadLockInFilterAtomicOffHeapValuesSyncFilter() throws Exception {
+ testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNonDeadLockInFilterAtomicWithoutBackupSyncFilter() throws Exception {
+ testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED), false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNonDeadLockInFilterSyncFilter() throws Exception {
+ testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNonDeadLockInFilterReplicatedSyncFilter() throws Exception {
+ testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), false, true);
}
/**
+ * @param ccfg Cache configuration.
+ * @param asyncFilter Async filter.
+ * @param asyncListener Async listener.
* @throws Exception If failed.
*/
- public void testNonDeadLockInListener(CacheConfiguration ccfg) throws Exception {
+ public void testNonDeadLockInListener(CacheConfiguration ccfg,
+ final boolean asyncFilter,
+ boolean asyncListener) throws Exception {
final IgniteCache cache = ignite(0).createCache(ccfg);
try {
@@ -273,8 +365,22 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
final CountDownLatch latch = new CountDownLatch(1);
- conQry.setLocalListener(new CacheInvokeListener(new IgniteBiInClosure<Ignite,
- CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() {
+ IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> fltrClsr =
+ new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() {
+ @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue> e) {
+ if (asyncFilter) {
+ assertFalse("Failed: " + Thread.currentThread().getName(),
+ Thread.currentThread().getName().contains("sys-"));
+
+ assertTrue("Failed: " + Thread.currentThread().getName(),
+ Thread.currentThread().getName().contains("contQry-"));
+ }
+ }
+ };
+
+ IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> lsnrClsr =
+ new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() {
@Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey,
? extends QueryTestValue> e) {
IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName());
@@ -288,7 +394,7 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
try {
if (cache0.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL)
- tx = ignite.transactions().txStart();
+ tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
assertEquals(val, val0);
@@ -309,7 +415,17 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
tx.close();
}
}
- }));
+ };
+
+ if (asyncListener)
+ conQry.setLocalListener(new CacheInvokeListenerAsync(lsnrClsr));
+ else
+ conQry.setLocalListener(new CacheInvokeListener(lsnrClsr));
+
+ if (asyncFilter)
+ conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheTestRemoteFilterAsync(fltrClsr)));
+ else
+ conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheTestRemoteFilter(fltrClsr)));
try (QueryCursor qry = cache.query(conQry)) {
cache.put(key, val0);
@@ -328,9 +444,14 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
}
/**
+ * @param ccfg Cache configuration.
+ * @param asyncFilter Async filter.
+ * @param asyncListener Async listener.
* @throws Exception If failed.
*/
- public void testNonDeadLockInFilter(CacheConfiguration ccfg) throws Exception {
+ public void testNonDeadLockInFilter(CacheConfiguration ccfg,
+ final boolean asyncFilter,
+ final boolean asyncListener) throws Exception {
final IgniteCache cache = ignite(0).createCache(ccfg);
try {
@@ -346,64 +467,87 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
final CountDownLatch latch = new CountDownLatch(1);
- conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheTestRemoteFilter(
+ IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> fltrClsr =
new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() {
- @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey,
- ? extends QueryTestValue> e) {
- assertFalse(Thread.currentThread().getName().contains("sys-"));
+ @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue> e) {
+ if (asyncFilter) {
+ assertFalse("Failed: " + Thread.currentThread().getName(),
+ Thread.currentThread().getName().contains("sys-"));
+
assertTrue("Failed: " + Thread.currentThread().getName(),
Thread.currentThread().getName().contains("contQry-"));
+ }
- IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName());
- QueryTestValue val = e.getValue();
- if (val == null || !val.equals(new QueryTestValue(1)))
- return;
+ IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName());
- Transaction tx = null;
+ QueryTestValue val = e.getValue();
- try {
- if (cache0.getConfiguration(CacheConfiguration.class).getAtomicityMode()
- == TRANSACTIONAL)
- tx = ignite.transactions().txStart();
+ if (val == null || !val.equals(new QueryTestValue(1)))
+ return;
- assertEquals(val, val0);
+ Transaction tx = null;
- cache0.put(key, newVal);
+ try {
+ if (cache0.getConfiguration(CacheConfiguration.class)
+ .getAtomicityMode() == TRANSACTIONAL)
+ tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
- if (tx != null)
- tx.commit();
+ assertEquals(val, val0);
- latch.countDown();
- }
- catch (Exception exp) {
- log.error("Failed: ", exp);
+ cache0.put(key, newVal);
- throw new IgniteException(exp);
- }
- finally {
- if (tx != null)
- tx.close();
- }
+ if (tx != null)
+ tx.commit();
+
+ latch.countDown();
}
- })
- ));
+ catch (Exception exp) {
+ log.error("Failed: ", exp);
- conQry.setLocalListener(new CacheInvokeListener(new IgniteBiInClosure<Ignite,
- CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() {
- @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey,
- ? extends QueryTestValue> e) {
- QueryTestValue val = e.getValue();
+ throw new IgniteException(exp);
+ }
+ finally {
+ if (tx != null)
+ tx.close();
+ }
+ }
+ };
- if (val == null || !val.equals(new QueryTestValue(1)))
- return;
+ IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> lsnrClsr =
+ new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() {
+ @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue> e) {
+ if (asyncListener) {
+ assertFalse("Failed: " + Thread.currentThread().getName(),
+ Thread.currentThread().getName().contains("sys-"));
- assertEquals(val, val0);
+ assertTrue("Failed: " + Thread.currentThread().getName(),
+ Thread.currentThread().getName().contains("contQry-"));
+ }
- latch.countDown();
- }
- }));
+ QueryTestValue val = e.getValue();
+
+ if (val == null || !val.equals(new QueryTestValue(1)))
+ return;
+
+ assertEquals(val, val0);
+
+ latch.countDown();
+ }
+ };
+
+ if (asyncFilter)
+ conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheTestRemoteFilterAsync(fltrClsr)));
+ else
+ conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheTestRemoteFilter(fltrClsr)));
+
+ if (asyncListener)
+ conQry.setLocalListener(new CacheInvokeListenerAsync(lsnrClsr));
+ else
+ conQry.setLocalListener(new CacheInvokeListener(lsnrClsr));
try (QueryCursor qry = cache.query(conQry)) {
cache.put(key, val0);
@@ -438,11 +582,29 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
throw new IgniteException("Failed to found primary key.");
}
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return TimeUnit.SECONDS.toMillis(10);
+ }
+
+ /**
+ *
+ */
+ private static class CacheTestRemoteFilterAsync extends CacheTestRemoteFilter implements CacheAsyncCallback {
+ /**
+ * @param clsr Closure.
+ */
+ public CacheTestRemoteFilterAsync(
+ IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> clsr) {
+ super(clsr);
+ }
+ }
+
/**
*
*/
private static class CacheTestRemoteFilter implements
- CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue>, CacheAsyncCallback {
+ CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue> {
/** */
@IgniteInstanceResource
private Ignite ignite;
@@ -470,8 +632,20 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
/**
*
*/
- private static class CacheInvokeListener implements CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>,
- CacheAsyncCallback {
+ private static class CacheInvokeListenerAsync extends CacheInvokeListener implements CacheAsyncCallback {
+ /**
+ * @param clsr Closure.
+ */
+ public CacheInvokeListenerAsync(
+ IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> clsr) {
+ super(clsr);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class CacheInvokeListener implements CacheEntryUpdatedListener<QueryTestKey, QueryTestValue> {
@IgniteInstanceResource
private Ignite ignite;
@@ -509,7 +683,7 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
CacheMemoryMode memoryMode) {
CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
- ccfg.setName("test-cache");
+ ccfg.setName("test-cache-" + atomicityMode + "-" + cacheMode + "-" + memoryMode + "-" + backups);
ccfg.setAtomicityMode(atomicityMode);
ccfg.setCacheMode(cacheMode);
ccfg.setMemoryMode(memoryMode);
http://git-wip-us.apache.org/repos/asf/ignite/blob/3192b7fa/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 6da8cac..5d52cfc 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -230,7 +230,6 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.class);
suite.addTestSuite(CacheContinuousQueryFailoverTxOffheapTieredTest.class);
suite.addTestSuite(CacheContinuousQueryRandomOperationsTest.class);
- suite.addTestSuite(CacheContinuousQueryFactoryFilterTest.class);
suite.addTestSuite(GridCacheContinuousQueryConcurrentTest.class);
suite.addTestSuite(CacheContinuousQueryFactoryFilterRandomOperationTest.class);
suite.addTestSuite(CacheContinuousQueryAsyncFilterListenerTest.class);