You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2019/03/06 10:13:21 UTC

[ignite] branch master updated: IGNITE-9927: Fix flaky failures in CacheContinuousQueryOperationFromCallbackTest. This closes #5914.

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new ff9431f  IGNITE-9927: Fix flaky failures in CacheContinuousQueryOperationFromCallbackTest. This closes #5914.
ff9431f is described below

commit ff9431fe5becd828c2d45bf8027efe66501b04e1
Author: rkondakov <ko...@mail.ru>
AuthorDate: Wed Mar 6 13:13:13 2019 +0300

    IGNITE-9927: Fix flaky failures in CacheContinuousQueryOperationFromCallbackTest. This closes #5914.
---
 .../ignite/spi/discovery/tcp/ServerImpl.java       |   4 +-
 ...heContinuousQueryOperationFromCallbackTest.java | 292 +++++++++++++++++----
 2 files changed, 242 insertions(+), 54 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 427ca42..6016e42 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -5824,8 +5824,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                     notifyDiscoveryListener(msg, waitForNotification);
                 }
 
-                if (msg.verified())
-                    msg.message(null, msg.messageBytes());
+                // Clear msg field to prevent possible memory leak.
+                msg.message(null, msg.messageBytes());
 
                 if (sendMessageToRemotes(msg))
                     sendMessageAcrossRing(msg);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java
index a3f822d..5b3565f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java
@@ -56,15 +56,19 @@ import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionSerializationException;
 import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 
 /**
  *
@@ -77,6 +81,9 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
     public static final int KEYS_FROM_CALLBACK = 20;
 
     /** */
+    public static final int KEYS_FROM_CALLBACK_RANGE = 10_000;
+
+    /** */
     private static final int NODES = 5;
 
     /** */
@@ -238,9 +245,100 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testMvccTxTwoBackupsFilter() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL_SNAPSHOT, FULL_SYNC);
+
+        doTest(ccfg, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testMvccTxTwoBackupsFilterPrimary() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL_SNAPSHOT, PRIMARY_SYNC);
+
+        doTest(ccfg, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testMvccTxReplicatedFilter() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, TRANSACTIONAL_SNAPSHOT, FULL_SYNC);
+
+        doTest(ccfg, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testMvccTxTwoBackup() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL_SNAPSHOT, FULL_SYNC);
+
+        doTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testMvccTxReplicated() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 2, TRANSACTIONAL_SNAPSHOT, FULL_SYNC);
+
+        doTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testMvccTxReplicatedPrimary() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 2, TRANSACTIONAL_SNAPSHOT, PRIMARY_SYNC);
+
+        doTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testMvccTxOneBackupFilter() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL_SNAPSHOT, FULL_SYNC);
+
+        doTest(ccfg, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testMvccTxOneBackupFilterPrimary() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL_SNAPSHOT, PRIMARY_SYNC);
+
+        doTest(ccfg, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testMvccTxOneBackup() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL_SNAPSHOT, FULL_SYNC);
+
+        doTest(ccfg, true);
+    }
+
+    /**
      * @param ccfg Cache configuration.
      * @throws Exception If failed.
      */
+    @SuppressWarnings({"TypeMayBeWeakened", "unchecked", "TooBroadScope"})
     protected void doTest(final CacheConfiguration ccfg, boolean fromLsnr) throws Exception {
         ignite(0).createCache(ccfg);
 
@@ -291,35 +389,48 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
                         IgniteCache<QueryTestKey, QueryTestValue> cache =
                             grid(rnd.nextInt(NODES)).cache(ccfg.getName());
 
-                        QueryTestKey key = new QueryTestKey(rnd.nextInt(KEYS));
+                        QueryTestKey key = new QueryTestKey(rnd.nextInt(KEYS) - KEYS);
 
-                        boolean startTx = cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() ==
-                            TRANSACTIONAL && rnd.nextBoolean();
+                        boolean startTx = cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() !=
+                            ATOMIC && rnd.nextBoolean();
 
                         Transaction tx = null;
 
-                        if (startTx)
-                            tx = cache.unwrap(Ignite.class).transactions().txStart();
+                        boolean committed = false;
 
-                        try {
-                            if ((cache.get(key) == null) || rnd.nextBoolean())
-                                cache.invoke(key, new IncrementTestEntryProcessor());
-                            else {
-                                QueryTestValue val;
-                                QueryTestValue newVal;
+                        while (!committed && !Thread.currentThread().isInterrupted()) {
+                            try {
+                                if (startTx)
+                                    tx = cache.unwrap(Ignite.class).transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
+
+                                if ((cache.get(key) == null) || rnd.nextBoolean())
+                                    cache.invoke(key, new IncrementTestEntryProcessor());
+                                else {
+                                    QueryTestValue val;
+                                    QueryTestValue newVal;
 
-                                do {
-                                    val = cache.get(key);
+                                    do {
+                                        val = cache.get(key);
 
-                                    newVal = val == null ?
-                                        new QueryTestValue(0) : new QueryTestValue(val.val1 + 1);
+                                        newVal = val == null ?
+                                            new QueryTestValue(0) : new QueryTestValue(val.val1 + 1);
+                                    }
+                                    while (!cache.replace(key, val, newVal));
                                 }
-                                while (!cache.replace(key, val, newVal));
+
+                                if (tx != null)
+                                    tx.commit();
+
+                                committed = true;
+                            }
+                            catch (Exception e) {
+                                assertTrue(e.getCause() instanceof TransactionSerializationException);
+                                assertEquals(ccfg.getAtomicityMode(), TRANSACTIONAL_SNAPSHOT);
+                            }
+                            finally {
+                                if (tx != null)
+                                    tx.close();
                             }
-                        }
-                        finally {
-                            if (tx != null)
-                                tx.commit();
                         }
                     }
                 }
@@ -331,7 +442,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
                 @Override public boolean apply() {
                     return qryCntr.get() >= ITERATION_CNT * threadCnt * NODES;
                 }
-            }, TimeUnit.MINUTES.toMillis(2));
+            }, getTestTimeout());
 
             for (Set<T2<QueryTestKey, QueryTestValue>> set : rcvdEvts)
                 checkEvents(set, ITERATION_CNT * threadCnt, grid(0).cache(ccfg.getName()), false);
@@ -343,7 +454,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
                     @Override public boolean apply() {
                         return cbCntr.get() >= expCnt;
                     }
-                }, TimeUnit.SECONDS.toMillis(60));
+                }, getTestTimeout());
 
                 assertTrue("Failed to wait events [exp=" + expCnt + ", act=" + cbCntr.get() + "]", res);
 
@@ -360,7 +471,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
                     @Override public boolean apply() {
                         return filterCbCntr.get() >= expInvkCnt;
                     }
-                }, TimeUnit.SECONDS.toMillis(60));
+                }, getTestTimeout());
 
                 assertEquals(expInvkCnt, filterCbCntr.get());
 
@@ -388,21 +499,39 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
             @Override public boolean apply() {
                 return set.size() >= expCnt;
             }
-        }, 10000L));
+        }, getTestTimeout()));
+
+        final int setSize = set.size();
+
+        if (cb) {
+            int cntr = 0;
+
+            while (!set.isEmpty()) {
+                T2<QueryTestKey, QueryTestValue> t = set.iterator().next();
 
-        int startKey = cb ? KEYS : 0;
-        int endKey = cb ? KEYS + KEYS_FROM_CALLBACK : KEYS;
+                QueryTestKey key = t.getKey();
 
-        for (int i = startKey; i < endKey; i++) {
-            QueryTestKey key = new QueryTestKey(i);
+                QueryTestValue maxVal = (QueryTestValue)cache.get(key);
 
-            QueryTestValue maxVal = (QueryTestValue)cache.get(key);
+                for (int val = 0; val <= maxVal.val1; val++)
+                    assertTrue(set.remove(new T2<>(key, new QueryTestValue(val))));
 
-            for (int val = 0; val <= maxVal.val1; val++)
-                assertTrue(set.remove(new T2<>(key, new QueryTestValue(val))));
+                if (cntr++ > setSize)
+                    fail();
+            }
         }
+        else {
+            for (int i = -KEYS; i < 0; i++) {
+                QueryTestKey key = new QueryTestKey(i);
+
+                QueryTestValue maxVal = (QueryTestValue)cache.get(key);
+
+                for (int val = 0; val <= maxVal.val1; val++)
+                    assertTrue(set.remove(new T2<>(key, new QueryTestValue(val))));
+            }
 
-        assertTrue(set.isEmpty());
+            assertTrue(set.isEmpty());
+        }
     }
 
     /**
@@ -445,20 +574,46 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
         /** {@inheritDoc} */
         @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e)
             throws CacheEntryListenerException {
-            if (e.getKey().compareTo(new QueryTestKey(KEYS)) < 0) {
+            if (e.getKey().key() < 0) {
                 IgniteCache<QueryTestKey, QueryTestValue> cache = ignite.cache(cacheName);
 
-                if (ThreadLocalRandom.current().nextBoolean()) {
-                    Set<QueryTestKey> keys = new LinkedHashSet<>();
+                boolean committed = false;
+                Transaction tx = null;
+                boolean startTx = cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() != ATOMIC;
 
-                    for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++)
-                        keys.add(new QueryTestKey(key));
+                Set<QueryTestKey> keys = new LinkedHashSet<>();
 
-                    cache.invokeAll(keys, new IncrementTestEntryProcessor());
-                }
-                else {
-                    for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++)
-                        cache.invoke(new QueryTestKey(key), new IncrementTestEntryProcessor());
+                int startKey = ThreadLocalRandom.current().nextInt(KEYS_FROM_CALLBACK_RANGE - KEYS_FROM_CALLBACK);
+
+                for (int key = startKey; key < startKey + KEYS_FROM_CALLBACK; key++)
+                    keys.add(new QueryTestKey(key));
+
+                while (!committed && !Thread.currentThread().isInterrupted()) {
+                    try {
+                        if (startTx)
+                            tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
+
+                        if (ThreadLocalRandom.current().nextBoolean())
+                            cache.invokeAll(keys, new IncrementTestEntryProcessor());
+                        else {
+                            for (QueryTestKey key : keys)
+                                cache.invoke(key, new IncrementTestEntryProcessor());
+                        }
+
+                        if (tx != null)
+                            tx.commit();
+
+                        committed = true;
+                    }
+                    catch (Exception ex) {
+                        assertTrue(ex.getCause() instanceof TransactionSerializationException);
+                        assertEquals(cache.getConfiguration(CacheConfiguration.class).getAtomicityMode(),
+                            TRANSACTIONAL_SNAPSHOT);
+                    }
+                    finally {
+                        if (tx != null)
+                            tx.close();
+                    }
                 }
 
                 filterCbCntr.incrementAndGet();
@@ -512,23 +667,49 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
         @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> evts)
             throws CacheEntryListenerException {
             for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : evts) {
-                if (e.getKey().compareTo(new QueryTestKey(KEYS)) < 0) {
+                if (e.getKey().key() < 0) {
                     rcvsEvts.add(new T2<>(e.getKey(), e.getValue()));
 
                     cntr.incrementAndGet();
 
                     if (cache != null) {
-                        if (ThreadLocalRandom.current().nextBoolean()) {
-                            Set<QueryTestKey> keys = new LinkedHashSet<>();
+                        boolean committed = false;
+                        Transaction tx = null;
+                        boolean startTx = cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() != ATOMIC;
 
-                            for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++)
-                                keys.add(new QueryTestKey(key));
+                        Set<QueryTestKey> keys = new LinkedHashSet<>();
 
-                            cache.invokeAll(keys, new IncrementTestEntryProcessor());
-                        }
-                        else {
-                            for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++)
-                                cache.invoke(new QueryTestKey(key), new IncrementTestEntryProcessor());
+                        int startKey = ThreadLocalRandom.current().nextInt(KEYS_FROM_CALLBACK_RANGE - KEYS_FROM_CALLBACK);
+
+                        for (int key = startKey; key < startKey + KEYS_FROM_CALLBACK; key++)
+                            keys.add(new QueryTestKey(key));
+
+                        while (!committed && !Thread.currentThread().isInterrupted()) {
+                            try {
+                                if (startTx)
+                                    tx = cache.unwrap(Ignite.class).transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
+
+                                if (ThreadLocalRandom.current().nextBoolean())
+                                    cache.invokeAll(keys, new IncrementTestEntryProcessor());
+                                else {
+                                    for (QueryTestKey key : keys)
+                                        cache.invoke(key, new IncrementTestEntryProcessor());
+                                }
+
+                                if (tx != null)
+                                    tx.commit();
+
+                                committed = true;
+                            }
+                            catch (Exception ex) {
+                                assertTrue(ex.getCause() instanceof TransactionSerializationException);
+                                assertEquals(cache.getConfiguration(CacheConfiguration.class).getAtomicityMode(),
+                                    TRANSACTIONAL_SNAPSHOT);
+                            }
+                            finally {
+                                if (tx != null)
+                                    tx.close();
+                            }
                         }
                     }
                 }
@@ -580,6 +761,13 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
             this.key = key;
         }
 
+        /**
+         * @return Key.
+         */
+        public Integer key() {
+            return key;
+        }
+
         /** {@inheritDoc} */
         @Override public boolean equals(Object o) {
             if (this == o)