You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2018/10/22 15:16:57 UTC

[05/14] ignite git commit: IGNITE-9659 Fixed testNonCollocatedRetryMessage flaky test - Fixes #5005.

IGNITE-9659 Fixed testNonCollocatedRetryMessage flaky test - Fixes #5005.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6f287428
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6f287428
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6f287428

Branch: refs/heads/ignite-9720
Commit: 6f2874285a9eb1132d589223a5aa2c8fa8891fb8
Parents: f156631
Author: NSAmelchev <ns...@gmail.com>
Authored: Mon Oct 22 13:29:44 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Oct 22 13:29:44 2018 +0300

----------------------------------------------------------------------
 .../NonCollocatedRetryMessageSelfTest.java      | 119 ++++++++++---------
 1 file changed, 62 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6f287428/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NonCollocatedRetryMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NonCollocatedRetryMessageSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NonCollocatedRetryMessageSelfTest.java
index c602225..e8f6624 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NonCollocatedRetryMessageSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NonCollocatedRetryMessageSelfTest.java
@@ -22,19 +22,18 @@ import javax.cache.Cache;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.query.SqlQuery;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
-import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT;
@@ -44,23 +43,31 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT;
  */
 public class NonCollocatedRetryMessageSelfTest extends GridCommonAbstractTest {
     /** */
-    private static final int NODES_COUNT = 3;
+    private static final int NODES_COUNT = 2;
 
     /** */
     private static final String ORG = "org";
 
     /** */
+    private static final int TEST_SQL_RETRY_TIMEOUT = 500;
+
+    /** */
+    private String sqlRetryTimeoutBackup;
+
+    /** */
     private IgniteCache<String, JoinSqlTestHelper.Person> personCache;
 
     /** */
     public void testNonCollocatedRetryMessage() {
-        SqlQuery<String, JoinSqlTestHelper.Person> qry = new SqlQuery<String, JoinSqlTestHelper.Person>(JoinSqlTestHelper.Person.class, JoinSqlTestHelper.JOIN_SQL).setArgs("Organization #0");
+        SqlQuery<String, JoinSqlTestHelper.Person> qry = new SqlQuery<String, JoinSqlTestHelper.Person>(
+            JoinSqlTestHelper.Person.class, JoinSqlTestHelper.JOIN_SQL).setArgs("Organization #0");
 
         qry.setDistributedJoins(true);
 
         try {
-            List<Cache.Entry<String,JoinSqlTestHelper.Person>> prsns = personCache.query(qry).getAll();
-            fail("No CacheException emitted. Collection size="+prsns.size());
+            List<Cache.Entry<String, JoinSqlTestHelper.Person>> prsns = personCache.query(qry).getAll();
+
+            fail("No CacheException emitted. Collection size=" + prsns.size());
         }
         catch (CacheException e) {
             assertTrue(e.getMessage(), e.getMessage().contains("Failed to execute non-collocated query"));
@@ -68,66 +75,35 @@ public class NonCollocatedRetryMessageSelfTest extends GridCommonAbstractTest {
     }
 
     /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        cfg.setCommunicationSpi(new TcpCommunicationSpi(){
-            volatile long reqId = -1;
-            /** {@inheritDoc} */
-            @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) {
-                assert msg != null;
-
-                if ( GridIoMessage.class.isAssignableFrom(msg.getClass())){
-                    GridIoMessage gridMsg = (GridIoMessage)msg;
-
-                    if ( GridH2QueryRequest.class.isAssignableFrom( gridMsg.message().getClass() ) ){
-                        GridH2QueryRequest req = (GridH2QueryRequest) (gridMsg.message());
-
-                        if (reqId < 0) {
-                            reqId = req.requestId();
-
-                            String shutName = getTestIgniteInstanceName(1);
-
-                            stopGrid(shutName, true, false);
-                        }
-                        else if( reqId != req.requestId() ){
-                            try {
-                                U.sleep(IgniteSystemProperties.getLong(IGNITE_SQL_RETRY_TIMEOUT, GridReduceQueryExecutor.DFLT_RETRY_TIMEOUT));
-                            }
-                            catch (IgniteInterruptedCheckedException e) {
-                                // no-op
-                            }
-                        }
-                    }
-                }
-                super.sendMessage(node, msg, ackC);
-            }
-        });
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
-        cfg.setDiscoverySpi(new TcpDiscoverySpi(){
-            public long getNodesJoined() {
-                return stats.joinedNodesCount();
-            }
-        });
+        cfg.setCommunicationSpi(new TestTcpCommunication());
 
         return cfg;
     }
 
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
-        System.setProperty(IGNITE_SQL_RETRY_TIMEOUT, "5000");
+        sqlRetryTimeoutBackup = System.getProperty(IGNITE_SQL_RETRY_TIMEOUT);
+
+        System.setProperty(IGNITE_SQL_RETRY_TIMEOUT, String.valueOf(TEST_SQL_RETRY_TIMEOUT));
 
         startGridsMultiThreaded(NODES_COUNT, false);
 
-        personCache = ignite(0).getOrCreateCache(new CacheConfiguration<String, JoinSqlTestHelper.Person>("pers")
-            .setBackups(1)
-            .setIndexedTypes(String.class, JoinSqlTestHelper.Person.class)
-        );
+        CacheConfiguration<String, JoinSqlTestHelper.Person> ccfg1 = new CacheConfiguration<>("pers");
+
+        ccfg1.setBackups(1);
+        ccfg1.setIndexedTypes(String.class, JoinSqlTestHelper.Person.class);
 
-        final IgniteCache<String, JoinSqlTestHelper.Organization> orgCache = ignite(0).getOrCreateCache(new CacheConfiguration<String, JoinSqlTestHelper.Organization>(ORG)
-            .setBackups(1)
-            .setIndexedTypes(String.class, JoinSqlTestHelper.Organization.class)
-        );
+        personCache = ignite(0).getOrCreateCache(ccfg1);
+
+        CacheConfiguration<String, JoinSqlTestHelper.Organization> ccfg2 = new CacheConfiguration<>(ORG);
+
+        ccfg2.setBackups(1);
+        ccfg2.setIndexedTypes(String.class, JoinSqlTestHelper.Organization.class);
+
+        IgniteCache<String, JoinSqlTestHelper.Organization> orgCache = ignite(0).getOrCreateCache(ccfg2);
 
         awaitPartitionMapExchange();
 
@@ -137,10 +113,39 @@ public class NonCollocatedRetryMessageSelfTest extends GridCommonAbstractTest {
     }
 
     /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
+    @Override protected void afterTest() {
+        if (sqlRetryTimeoutBackup != null)
+            System.setProperty(IGNITE_SQL_RETRY_TIMEOUT, sqlRetryTimeoutBackup);
+
         stopAllGrids();
     }
 
+    /**
+     * TcpCommunicationSpi with additional features needed for tests.
+     */
+    private class TestTcpCommunication extends TcpCommunicationSpi {
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) {
+            assert msg != null;
+
+            if (igniteInstanceName.equals(getTestIgniteInstanceName(1)) &&
+                GridIoMessage.class.isAssignableFrom(msg.getClass())) {
+                GridIoMessage gridMsg = (GridIoMessage)msg;
+
+                if (GridH2IndexRangeRequest.class.isAssignableFrom(gridMsg.message().getClass())) {
+                    try {
+                        U.sleep(TEST_SQL_RETRY_TIMEOUT);
+                    }
+                    catch (IgniteInterruptedCheckedException e) {
+                        fail("Test was interrupted.");
+                    }
 
+                    throw new IgniteSpiException("Test exception.");
+                }
+            }
+
+            super.sendMessage(node, msg, ackC);
+        }
+    }
 }