You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2019/03/06 10:13:21 UTC
[ignite] branch master updated: IGNITE-9927: Fix flaky failures in
CacheContinuousQueryOperationFromCallbackTest. This closes #5914.
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new ff9431f IGNITE-9927: Fix flaky failures in CacheContinuousQueryOperationFromCallbackTest. This closes #5914.
ff9431f is described below
commit ff9431fe5becd828c2d45bf8027efe66501b04e1
Author: rkondakov <ko...@mail.ru>
AuthorDate: Wed Mar 6 13:13:13 2019 +0300
IGNITE-9927: Fix flaky failures in CacheContinuousQueryOperationFromCallbackTest. This closes #5914.
---
.../ignite/spi/discovery/tcp/ServerImpl.java | 4 +-
...heContinuousQueryOperationFromCallbackTest.java | 292 +++++++++++++++++----
2 files changed, 242 insertions(+), 54 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 427ca42..6016e42 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -5824,8 +5824,8 @@ class ServerImpl extends TcpDiscoveryImpl {
notifyDiscoveryListener(msg, waitForNotification);
}
- if (msg.verified())
- msg.message(null, msg.messageBytes());
+ // Clear msg field to prevent possible memory leak.
+ msg.message(null, msg.messageBytes());
if (sendMessageToRemotes(msg))
sendMessageAcrossRing(msg);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java
index a3f822d..5b3565f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java
@@ -56,15 +56,19 @@ import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionSerializationException;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
*
@@ -77,6 +81,9 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
public static final int KEYS_FROM_CALLBACK = 20;
/** */
+ public static final int KEYS_FROM_CALLBACK_RANGE = 10_000;
+
+ /** */
private static final int NODES = 5;
/** */
@@ -238,9 +245,100 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
}
/**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testMvccTxTwoBackupsFilter() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL_SNAPSHOT, FULL_SYNC);
+
+ doTest(ccfg, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testMvccTxTwoBackupsFilterPrimary() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL_SNAPSHOT, PRIMARY_SYNC);
+
+ doTest(ccfg, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testMvccTxReplicatedFilter() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, TRANSACTIONAL_SNAPSHOT, FULL_SYNC);
+
+ doTest(ccfg, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testMvccTxTwoBackup() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL_SNAPSHOT, FULL_SYNC);
+
+ doTest(ccfg, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testMvccTxReplicated() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 2, TRANSACTIONAL_SNAPSHOT, FULL_SYNC);
+
+ doTest(ccfg, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testMvccTxReplicatedPrimary() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 2, TRANSACTIONAL_SNAPSHOT, PRIMARY_SYNC);
+
+ doTest(ccfg, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testMvccTxOneBackupFilter() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL_SNAPSHOT, FULL_SYNC);
+
+ doTest(ccfg, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testMvccTxOneBackupFilterPrimary() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL_SNAPSHOT, PRIMARY_SYNC);
+
+ doTest(ccfg, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testMvccTxOneBackup() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL_SNAPSHOT, FULL_SYNC);
+
+ doTest(ccfg, true);
+ }
+
+ /**
* @param ccfg Cache configuration.
* @throws Exception If failed.
*/
+ @SuppressWarnings({"TypeMayBeWeakened", "unchecked", "TooBroadScope"})
protected void doTest(final CacheConfiguration ccfg, boolean fromLsnr) throws Exception {
ignite(0).createCache(ccfg);
@@ -291,35 +389,48 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
IgniteCache<QueryTestKey, QueryTestValue> cache =
grid(rnd.nextInt(NODES)).cache(ccfg.getName());
- QueryTestKey key = new QueryTestKey(rnd.nextInt(KEYS));
+ QueryTestKey key = new QueryTestKey(rnd.nextInt(KEYS) - KEYS);
- boolean startTx = cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() ==
- TRANSACTIONAL && rnd.nextBoolean();
+ boolean startTx = cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() !=
+ ATOMIC && rnd.nextBoolean();
Transaction tx = null;
- if (startTx)
- tx = cache.unwrap(Ignite.class).transactions().txStart();
+ boolean committed = false;
- try {
- if ((cache.get(key) == null) || rnd.nextBoolean())
- cache.invoke(key, new IncrementTestEntryProcessor());
- else {
- QueryTestValue val;
- QueryTestValue newVal;
+ while (!committed && !Thread.currentThread().isInterrupted()) {
+ try {
+ if (startTx)
+ tx = cache.unwrap(Ignite.class).transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
+
+ if ((cache.get(key) == null) || rnd.nextBoolean())
+ cache.invoke(key, new IncrementTestEntryProcessor());
+ else {
+ QueryTestValue val;
+ QueryTestValue newVal;
- do {
- val = cache.get(key);
+ do {
+ val = cache.get(key);
- newVal = val == null ?
- new QueryTestValue(0) : new QueryTestValue(val.val1 + 1);
+ newVal = val == null ?
+ new QueryTestValue(0) : new QueryTestValue(val.val1 + 1);
+ }
+ while (!cache.replace(key, val, newVal));
}
- while (!cache.replace(key, val, newVal));
+
+ if (tx != null)
+ tx.commit();
+
+ committed = true;
+ }
+ catch (Exception e) {
+ assertTrue(e.getCause() instanceof TransactionSerializationException);
+ assertEquals(ccfg.getAtomicityMode(), TRANSACTIONAL_SNAPSHOT);
+ }
+ finally {
+ if (tx != null)
+ tx.close();
}
- }
- finally {
- if (tx != null)
- tx.commit();
}
}
}
@@ -331,7 +442,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
@Override public boolean apply() {
return qryCntr.get() >= ITERATION_CNT * threadCnt * NODES;
}
- }, TimeUnit.MINUTES.toMillis(2));
+ }, getTestTimeout());
for (Set<T2<QueryTestKey, QueryTestValue>> set : rcvdEvts)
checkEvents(set, ITERATION_CNT * threadCnt, grid(0).cache(ccfg.getName()), false);
@@ -343,7 +454,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
@Override public boolean apply() {
return cbCntr.get() >= expCnt;
}
- }, TimeUnit.SECONDS.toMillis(60));
+ }, getTestTimeout());
assertTrue("Failed to wait events [exp=" + expCnt + ", act=" + cbCntr.get() + "]", res);
@@ -360,7 +471,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
@Override public boolean apply() {
return filterCbCntr.get() >= expInvkCnt;
}
- }, TimeUnit.SECONDS.toMillis(60));
+ }, getTestTimeout());
assertEquals(expInvkCnt, filterCbCntr.get());
@@ -388,21 +499,39 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
@Override public boolean apply() {
return set.size() >= expCnt;
}
- }, 10000L));
+ }, getTestTimeout()));
+
+ final int setSize = set.size();
+
+ if (cb) {
+ int cntr = 0;
+
+ while (!set.isEmpty()) {
+ T2<QueryTestKey, QueryTestValue> t = set.iterator().next();
- int startKey = cb ? KEYS : 0;
- int endKey = cb ? KEYS + KEYS_FROM_CALLBACK : KEYS;
+ QueryTestKey key = t.getKey();
- for (int i = startKey; i < endKey; i++) {
- QueryTestKey key = new QueryTestKey(i);
+ QueryTestValue maxVal = (QueryTestValue)cache.get(key);
- QueryTestValue maxVal = (QueryTestValue)cache.get(key);
+ for (int val = 0; val <= maxVal.val1; val++)
+ assertTrue(set.remove(new T2<>(key, new QueryTestValue(val))));
- for (int val = 0; val <= maxVal.val1; val++)
- assertTrue(set.remove(new T2<>(key, new QueryTestValue(val))));
+ if (cntr++ > setSize)
+ fail();
+ }
}
+ else {
+ for (int i = -KEYS; i < 0; i++) {
+ QueryTestKey key = new QueryTestKey(i);
+
+ QueryTestValue maxVal = (QueryTestValue)cache.get(key);
+
+ for (int val = 0; val <= maxVal.val1; val++)
+ assertTrue(set.remove(new T2<>(key, new QueryTestValue(val))));
+ }
- assertTrue(set.isEmpty());
+ assertTrue(set.isEmpty());
+ }
}
/**
@@ -445,20 +574,46 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
/** {@inheritDoc} */
@Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e)
throws CacheEntryListenerException {
- if (e.getKey().compareTo(new QueryTestKey(KEYS)) < 0) {
+ if (e.getKey().key() < 0) {
IgniteCache<QueryTestKey, QueryTestValue> cache = ignite.cache(cacheName);
- if (ThreadLocalRandom.current().nextBoolean()) {
- Set<QueryTestKey> keys = new LinkedHashSet<>();
+ boolean committed = false;
+ Transaction tx = null;
+ boolean startTx = cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() != ATOMIC;
- for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++)
- keys.add(new QueryTestKey(key));
+ Set<QueryTestKey> keys = new LinkedHashSet<>();
- cache.invokeAll(keys, new IncrementTestEntryProcessor());
- }
- else {
- for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++)
- cache.invoke(new QueryTestKey(key), new IncrementTestEntryProcessor());
+ int startKey = ThreadLocalRandom.current().nextInt(KEYS_FROM_CALLBACK_RANGE - KEYS_FROM_CALLBACK);
+
+ for (int key = startKey; key < startKey + KEYS_FROM_CALLBACK; key++)
+ keys.add(new QueryTestKey(key));
+
+ while (!committed && !Thread.currentThread().isInterrupted()) {
+ try {
+ if (startTx)
+ tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
+
+ if (ThreadLocalRandom.current().nextBoolean())
+ cache.invokeAll(keys, new IncrementTestEntryProcessor());
+ else {
+ for (QueryTestKey key : keys)
+ cache.invoke(key, new IncrementTestEntryProcessor());
+ }
+
+ if (tx != null)
+ tx.commit();
+
+ committed = true;
+ }
+ catch (Exception ex) {
+ assertTrue(ex.getCause() instanceof TransactionSerializationException);
+ assertEquals(cache.getConfiguration(CacheConfiguration.class).getAtomicityMode(),
+ TRANSACTIONAL_SNAPSHOT);
+ }
+ finally {
+ if (tx != null)
+ tx.close();
+ }
}
filterCbCntr.incrementAndGet();
@@ -512,23 +667,49 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
@Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> evts)
throws CacheEntryListenerException {
for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : evts) {
- if (e.getKey().compareTo(new QueryTestKey(KEYS)) < 0) {
+ if (e.getKey().key() < 0) {
rcvsEvts.add(new T2<>(e.getKey(), e.getValue()));
cntr.incrementAndGet();
if (cache != null) {
- if (ThreadLocalRandom.current().nextBoolean()) {
- Set<QueryTestKey> keys = new LinkedHashSet<>();
+ boolean committed = false;
+ Transaction tx = null;
+ boolean startTx = cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() != ATOMIC;
- for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++)
- keys.add(new QueryTestKey(key));
+ Set<QueryTestKey> keys = new LinkedHashSet<>();
- cache.invokeAll(keys, new IncrementTestEntryProcessor());
- }
- else {
- for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++)
- cache.invoke(new QueryTestKey(key), new IncrementTestEntryProcessor());
+ int startKey = ThreadLocalRandom.current().nextInt(KEYS_FROM_CALLBACK_RANGE - KEYS_FROM_CALLBACK);
+
+ for (int key = startKey; key < startKey + KEYS_FROM_CALLBACK; key++)
+ keys.add(new QueryTestKey(key));
+
+ while (!committed && !Thread.currentThread().isInterrupted()) {
+ try {
+ if (startTx)
+ tx = cache.unwrap(Ignite.class).transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
+
+ if (ThreadLocalRandom.current().nextBoolean())
+ cache.invokeAll(keys, new IncrementTestEntryProcessor());
+ else {
+ for (QueryTestKey key : keys)
+ cache.invoke(key, new IncrementTestEntryProcessor());
+ }
+
+ if (tx != null)
+ tx.commit();
+
+ committed = true;
+ }
+ catch (Exception ex) {
+ assertTrue(ex.getCause() instanceof TransactionSerializationException);
+ assertEquals(cache.getConfiguration(CacheConfiguration.class).getAtomicityMode(),
+ TRANSACTIONAL_SNAPSHOT);
+ }
+ finally {
+ if (tx != null)
+ tx.close();
+ }
}
}
}
@@ -580,6 +761,13 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
this.key = key;
}
+ /**
+ * @return Key.
+ */
+ public Integer key() {
+ return key;
+ }
+
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)