You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2018/10/22 15:16:57 UTC
[05/14] ignite git commit: IGNITE-9659 Fixed
testNonCollocatedRetryMessage flaky test - Fixes #5005.
IGNITE-9659 Fixed testNonCollocatedRetryMessage flaky test - Fixes #5005.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6f287428
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6f287428
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6f287428
Branch: refs/heads/ignite-9720
Commit: 6f2874285a9eb1132d589223a5aa2c8fa8891fb8
Parents: f156631
Author: NSAmelchev <ns...@gmail.com>
Authored: Mon Oct 22 13:29:44 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Oct 22 13:29:44 2018 +0300
----------------------------------------------------------------------
.../NonCollocatedRetryMessageSelfTest.java | 119 ++++++++++---------
1 file changed, 62 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f287428/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NonCollocatedRetryMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NonCollocatedRetryMessageSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NonCollocatedRetryMessageSelfTest.java
index c602225..e8f6624 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NonCollocatedRetryMessageSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NonCollocatedRetryMessageSelfTest.java
@@ -22,19 +22,18 @@ import javax.cache.Cache;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
-import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT;
@@ -44,23 +43,31 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT;
*/
public class NonCollocatedRetryMessageSelfTest extends GridCommonAbstractTest {
/** */
- private static final int NODES_COUNT = 3;
+ private static final int NODES_COUNT = 2;
/** */
private static final String ORG = "org";
/** */
+ private static final int TEST_SQL_RETRY_TIMEOUT = 500;
+
+ /** */
+ private String sqlRetryTimeoutBackup;
+
+ /** */
private IgniteCache<String, JoinSqlTestHelper.Person> personCache;
/** */
public void testNonCollocatedRetryMessage() {
- SqlQuery<String, JoinSqlTestHelper.Person> qry = new SqlQuery<String, JoinSqlTestHelper.Person>(JoinSqlTestHelper.Person.class, JoinSqlTestHelper.JOIN_SQL).setArgs("Organization #0");
+ SqlQuery<String, JoinSqlTestHelper.Person> qry = new SqlQuery<String, JoinSqlTestHelper.Person>(
+ JoinSqlTestHelper.Person.class, JoinSqlTestHelper.JOIN_SQL).setArgs("Organization #0");
qry.setDistributedJoins(true);
try {
- List<Cache.Entry<String,JoinSqlTestHelper.Person>> prsns = personCache.query(qry).getAll();
- fail("No CacheException emitted. Collection size="+prsns.size());
+ List<Cache.Entry<String, JoinSqlTestHelper.Person>> prsns = personCache.query(qry).getAll();
+
+ fail("No CacheException emitted. Collection size=" + prsns.size());
}
catch (CacheException e) {
assertTrue(e.getMessage(), e.getMessage().contains("Failed to execute non-collocated query"));
@@ -68,66 +75,35 @@ public class NonCollocatedRetryMessageSelfTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- cfg.setCommunicationSpi(new TcpCommunicationSpi(){
- volatile long reqId = -1;
- /** {@inheritDoc} */
- @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) {
- assert msg != null;
-
- if ( GridIoMessage.class.isAssignableFrom(msg.getClass())){
- GridIoMessage gridMsg = (GridIoMessage)msg;
-
- if ( GridH2QueryRequest.class.isAssignableFrom( gridMsg.message().getClass() ) ){
- GridH2QueryRequest req = (GridH2QueryRequest) (gridMsg.message());
-
- if (reqId < 0) {
- reqId = req.requestId();
-
- String shutName = getTestIgniteInstanceName(1);
-
- stopGrid(shutName, true, false);
- }
- else if( reqId != req.requestId() ){
- try {
- U.sleep(IgniteSystemProperties.getLong(IGNITE_SQL_RETRY_TIMEOUT, GridReduceQueryExecutor.DFLT_RETRY_TIMEOUT));
- }
- catch (IgniteInterruptedCheckedException e) {
- // no-op
- }
- }
- }
- }
- super.sendMessage(node, msg, ackC);
- }
- });
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
- cfg.setDiscoverySpi(new TcpDiscoverySpi(){
- public long getNodesJoined() {
- return stats.joinedNodesCount();
- }
- });
+ cfg.setCommunicationSpi(new TestTcpCommunication());
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
- System.setProperty(IGNITE_SQL_RETRY_TIMEOUT, "5000");
+ sqlRetryTimeoutBackup = System.getProperty(IGNITE_SQL_RETRY_TIMEOUT);
+
+ System.setProperty(IGNITE_SQL_RETRY_TIMEOUT, String.valueOf(TEST_SQL_RETRY_TIMEOUT));
startGridsMultiThreaded(NODES_COUNT, false);
- personCache = ignite(0).getOrCreateCache(new CacheConfiguration<String, JoinSqlTestHelper.Person>("pers")
- .setBackups(1)
- .setIndexedTypes(String.class, JoinSqlTestHelper.Person.class)
- );
+ CacheConfiguration<String, JoinSqlTestHelper.Person> ccfg1 = new CacheConfiguration<>("pers");
+
+ ccfg1.setBackups(1);
+ ccfg1.setIndexedTypes(String.class, JoinSqlTestHelper.Person.class);
- final IgniteCache<String, JoinSqlTestHelper.Organization> orgCache = ignite(0).getOrCreateCache(new CacheConfiguration<String, JoinSqlTestHelper.Organization>(ORG)
- .setBackups(1)
- .setIndexedTypes(String.class, JoinSqlTestHelper.Organization.class)
- );
+ personCache = ignite(0).getOrCreateCache(ccfg1);
+
+ CacheConfiguration<String, JoinSqlTestHelper.Organization> ccfg2 = new CacheConfiguration<>(ORG);
+
+ ccfg2.setBackups(1);
+ ccfg2.setIndexedTypes(String.class, JoinSqlTestHelper.Organization.class);
+
+ IgniteCache<String, JoinSqlTestHelper.Organization> orgCache = ignite(0).getOrCreateCache(ccfg2);
awaitPartitionMapExchange();
@@ -137,10 +113,39 @@ public class NonCollocatedRetryMessageSelfTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
+ @Override protected void afterTest() {
+ if (sqlRetryTimeoutBackup != null)
+ System.setProperty(IGNITE_SQL_RETRY_TIMEOUT, sqlRetryTimeoutBackup);
+
stopAllGrids();
}
+ /**
+ * TcpCommunicationSpi with additional features needed for tests.
+ */
+ private class TestTcpCommunication extends TcpCommunicationSpi {
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) {
+ assert msg != null;
+
+ if (igniteInstanceName.equals(getTestIgniteInstanceName(1)) &&
+ GridIoMessage.class.isAssignableFrom(msg.getClass())) {
+ GridIoMessage gridMsg = (GridIoMessage)msg;
+
+ if (GridH2IndexRangeRequest.class.isAssignableFrom(gridMsg.message().getClass())) {
+ try {
+ U.sleep(TEST_SQL_RETRY_TIMEOUT);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ fail("Test was interrupted.");
+ }
+ throw new IgniteSpiException("Test exception.");
+ }
+ }
+
+ super.sendMessage(node, msg, ackC);
+ }
+ }
}