You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/29 14:32:26 UTC
[17/22] ignite git commit: Continuous queries fixes: - flush backup
queue on exchange end (otherwise we don't really wait for all current
operations) - on coordinator apply counters after all single messages
received (otherwise extra counter increments a
http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index befd1d7..2fb7fcb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -30,6 +30,7 @@ import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
@@ -400,7 +401,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
/**
* @param ignite Ignite.
- * @param topVer Topology version.
+ * @param topVer Major topology version.
+ * @param minorVer Minor topology version.
* @throws Exception If failed.
*/
private void waitRebalanceFinished(Ignite ignite, long topVer, int minorVer) throws Exception {
@@ -511,9 +513,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
* @param nodes Count nodes.
* @param killedNodeIdx Killed node index.
* @param updCntrs Update counters.
- * @return {@code True} if counters matches.
*/
- private boolean checkPartCounter(int nodes, int killedNodeIdx, Map<Integer, Long> updCntrs) {
+ private void checkPartCounter(int nodes, int killedNodeIdx, Map<Integer, Long> updCntrs) {
for (int i = 0; i < nodes; i++) {
if (i == killedNodeIdx)
continue;
@@ -527,8 +528,6 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
assertEquals(e.getValue(), act.get(e.getKey()).get2());
}
}
-
- return true;
}
/**
@@ -753,8 +752,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
assert GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
// (SRV_NODES + 1 client node) - 1 primary - backup nodes.
- return qryClient.cluster().nodes().size() == (SRV_NODES + 1 /** client node */)
- - 1 /** Primary node */ - backups;
+ return qryClient.cluster().nodes().size() == (SRV_NODES + 1 /* client node */)
+ - 1 /* Primary node */ - backups;
}
}, 5000L);
@@ -1253,6 +1252,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
/**
* @param expEvts Expected events.
* @param lsnr Listener.
+ * @throws Exception If failed.
*/
private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener3 lsnr,
boolean allowLoseEvt) throws Exception {
@@ -1347,9 +1347,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
QueryCursor<?> cur = qryClient.cache(DEFAULT_CACHE_NAME).query(qry);
- final Collection<Object> backupQueue = backupQueue(ignite(1));
-
- assertEquals(0, backupQueue.size());
+ assertEquals(0, backupQueue(ignite(1)).size());
IgniteCache<Object, Object> cache0 = ignite(0).cache(DEFAULT_CACHE_NAME);
@@ -1367,11 +1365,12 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
- return backupQueue.isEmpty();
+ return backupQueue(ignite(1)).isEmpty();
}
}, 2000);
- assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() < BACKUP_ACK_THRESHOLD);
+ assertTrue("Backup queue is not cleared: " + backupQueue(ignite(1)),
+ backupQueue(ignite(1)).size() < BACKUP_ACK_THRESHOLD);
if (!latch.await(5, SECONDS))
fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
@@ -1389,11 +1388,11 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
- return backupQueue.isEmpty();
+ return backupQueue(ignite(1)).isEmpty();
}
}, ACK_FREQ + 2000);
- assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.isEmpty());
+ assertTrue("Backup queue is not cleared: " + backupQueue(ignite(1)), backupQueue(ignite(1)).isEmpty());
if (!latch.await(5, SECONDS))
fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
@@ -1421,9 +1420,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
QueryCursor<?> cur = qryClient.cache(DEFAULT_CACHE_NAME).query(qry);
- final Collection<Object> backupQueue = backupQueue(ignite(0));
-
- assertEquals(0, backupQueue.size());
+ assertEquals(0, backupQueue(ignite(0)).size());
long ttl = 100;
@@ -1433,9 +1430,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
final List<Integer> keys = primaryKeys(ignite(1).cache(DEFAULT_CACHE_NAME), BACKUP_ACK_THRESHOLD);
- CountDownLatch latch = new CountDownLatch(keys.size());
-
- lsnr.latch = latch;
+ lsnr.latch = new CountDownLatch(keys.size());
for (Integer key : keys) {
log.info("Put: " + key);
@@ -1445,11 +1440,12 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
- return backupQueue.isEmpty();
+ return backupQueue(ignite(0)).isEmpty();
}
}, 2000);
- assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() < BACKUP_ACK_THRESHOLD);
+ assertTrue("Backup queue is not cleared: " + backupQueue(ignite(0)),
+ backupQueue(ignite(0)).size() < BACKUP_ACK_THRESHOLD);
boolean wait = waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
@@ -1461,14 +1457,14 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
- return backupQueue.isEmpty();
+ return backupQueue(ignite(0)).isEmpty();
}
}, 2000);
- assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() < BACKUP_ACK_THRESHOLD);
+ assertTrue("Backup queue is not cleared: " + backupQueue(ignite(0)), backupQueue(ignite(0)).size() < BACKUP_ACK_THRESHOLD);
- if (backupQueue.size() != 0) {
- for (Object o : backupQueue) {
+ if (backupQueue(ignite(0)).size() != 0) {
+ for (Object o : backupQueue(ignite(0))) {
CacheContinuousQueryEntry e = (CacheContinuousQueryEntry)o;
assertNotSame("Evicted entry added to backup queue.", -1L, e.updateCounter());
@@ -1494,9 +1490,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
QueryCursor<?> cur = cache.query(qry);
- final Collection<Object> backupQueue = backupQueue(ignite(1));
-
- assertEquals(0, backupQueue.size());
+ assertEquals(0, backupQueue(ignite(1)).size());
List<Integer> keys = primaryKeys(cache, BACKUP_ACK_THRESHOLD);
@@ -1512,11 +1506,12 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
- return backupQueue.isEmpty();
+ return backupQueue(ignite(1)).isEmpty();
}
}, 3000);
- assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() < BACKUP_ACK_THRESHOLD);
+ assertTrue("Backup queue is not cleared: " + backupQueue(ignite(1)),
+ backupQueue(ignite(1)).size() < BACKUP_ACK_THRESHOLD);
if (!latch.await(5, SECONDS))
fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
@@ -1533,20 +1528,25 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
ConcurrentMap<Object, Object> infos = GridTestUtils.getFieldValue(proc, "rmtInfos");
- Collection<Object> backupQueue = null;
+ Collection<Object> backupQueue = new ArrayList<>();
for (Object info : infos.values()) {
GridContinuousHandler hnd = GridTestUtils.getFieldValue(info, "hnd");
- if (hnd.isQuery() && DEFAULT_CACHE_NAME.equals(hnd.cacheName())) {
- backupQueue = GridTestUtils.getFieldValue(hnd, CacheContinuousQueryHandler.class, "backupQueue");
+ if (hnd.isQuery() && hnd.cacheName().equals(DEFAULT_CACHE_NAME)) {
+ Map<Integer, CacheContinuousQueryEventBuffer> map = GridTestUtils.getFieldValue(hnd,
+ CacheContinuousQueryHandler.class, "entryBufs");
+
+ for (CacheContinuousQueryEventBuffer buf : map.values()) {
+ Collection<Object> q = GridTestUtils.getFieldValue(buf,
+ CacheContinuousQueryEventBuffer.class, "backupQ");
- break;
+ if (q != null)
+ backupQueue.addAll(q);
+ }
}
}
- assertNotNull(backupQueue);
-
return backupQueue;
}
@@ -2422,7 +2422,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
private ConcurrentHashMap<Object, CacheEntryEvent<?, ?>> evts = new ConcurrentHashMap<>();
/** */
- private List<CacheEntryEvent<?, ?>> allEvts;
+ private final CopyOnWriteArrayList<CacheEntryEvent<?, ?>> allEvts;
/** */
@LoggerResource
@@ -2432,8 +2432,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
* @param saveAll Save all events flag.
*/
CacheEventListener1(boolean saveAll) {
- if (saveAll)
- allEvts = new ArrayList<>();
+ allEvts = saveAll ? new CopyOnWriteArrayList<CacheEntryEvent<?, ?>>() : null;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
index 26c7d41..85d68d3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
@@ -262,11 +262,16 @@ public class IgniteCacheContinuousQueryBackupQueueTest extends GridCommonAbstrac
GridContinuousHandler hnd = GridTestUtils.getFieldValue(info, "hnd");
if (hnd.isQuery() && hnd.cacheName().equals(CACHE_NAME)) {
- Collection<Object> q = GridTestUtils.getFieldValue(hnd,
- CacheContinuousQueryHandler.class, "backupQueue");
+ Map<Integer, CacheContinuousQueryEventBuffer> map = GridTestUtils.getFieldValue(hnd,
+ CacheContinuousQueryHandler.class, "entryBufs");
- if (q != null)
- backupQueues.add(q);
+ for (CacheContinuousQueryEventBuffer buf : map.values()) {
+ Collection<Object> q = GridTestUtils.getFieldValue(buf,
+ CacheContinuousQueryEventBuffer.class, "backupQ");
+
+ if (q != null)
+ backupQueues.add(q);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
index b91217f..81a7515 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
@@ -35,7 +35,6 @@ import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
-import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -138,9 +137,9 @@ public class IgniteCacheContinuousQueryImmutableEntryTest extends GridCommonAbst
true,
1,
1L,
- new AffinityTopologyVersion(1L));
+ new AffinityTopologyVersion(1L),
+ (byte)0);
- e0.filteredEvents(new GridLongList(new long[]{1L, 2L}));
e0.markFiltered();
ByteBuffer buf = ByteBuffer.allocate(4096);
@@ -156,7 +155,6 @@ public class IgniteCacheContinuousQueryImmutableEntryTest extends GridCommonAbst
assertEquals(e0.cacheId(), e1.cacheId());
assertEquals(e0.eventType(), e1.eventType());
assertEquals(e0.isFiltered(), e1.isFiltered());
- assertEquals(GridLongList.asList(e0.filteredEvents()), GridLongList.asList(e1.filteredEvents()));
assertEquals(e0.isBackup(), e1.isBackup());
assertEquals(e0.isKeepBinary(), e1.isKeepBinary());
assertEquals(e0.partition(), e1.partition());
http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
index 8dd273a..0084cdc 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
@@ -21,6 +21,8 @@ import junit.framework.TestSuite;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchAckTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchForceServerModeAckTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFilterListenerTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryConcurrentPartitionUpdateTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEventBufferTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryExecuteInPrimaryTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryAsyncFilterRandomOperationTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterRandomOperationTest;
@@ -118,6 +120,9 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite {
suite.addTestSuite(ContinuousQueryPeerClassLoadingTest.class);
suite.addTestSuite(ClientReconnectContinuousQueryTest.class);
+ suite.addTestSuite(CacheContinuousQueryConcurrentPartitionUpdateTest.class);
+ suite.addTestSuite(CacheContinuousQueryEventBufferTest.class);
+
suite.addTest(IgniteDistributedJoinTestSuite.suite());
return suite;