You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2018/10/22 12:00:01 UTC
[1/2] ignite git commit: IGNITE-9694 Add tests to check that reading
queries are not blocked on exchange events that don't change data visibility
- Fixes #4926.
Repository: ignite
Updated Branches:
refs/heads/master 3a4167a67 -> 3fae41b1f
http://git-wip-us.apache.org/repos/asf/ignite/blob/3fae41b1/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnCreateDestoryIndexTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnCreateDestoryIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnCreateDestoryIndexTest.java
new file mode 100644
index 0000000..469ec93
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnCreateDestoryIndexTest.java
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.lang.annotation.Annotation;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.distributed.CacheBlockOnReadAbstractTest.Params;
+import org.apache.ignite.internal.processors.query.schema.message.SchemaOperationStatusMessage;
+import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ *
+ */
+public class CacheBlockOnCreateDestoryIndexTest extends GridCommonAbstractTest {
+ /** */
+ private final List<? extends CacheBlockOnReadAbstractTest> tests = Arrays.asList(
+ new CacheBlockOnSingleGetTest() {
+ /** {@inheritDoc} */
+ @Nullable @Override protected <A extends Annotation> A currentTestAnnotation(Class<A> annotationCls) {
+ return CacheBlockOnCreateDestoryIndexTest.this.currentTestAnnotation(annotationCls);
+ }
+ },
+ new CacheBlockOnGetAllTest() {
+ /** {@inheritDoc} */
+ @Nullable @Override protected <A extends Annotation> A currentTestAnnotation(Class<A> annotationCls) {
+ return CacheBlockOnCreateDestoryIndexTest.this.currentTestAnnotation(annotationCls);
+ }
+ },
+ new CacheBlockOnScanTest() {
+ /** {@inheritDoc} */
+ @Nullable @Override protected <A extends Annotation> A currentTestAnnotation(Class<A> annotationCls) {
+ return CacheBlockOnCreateDestoryIndexTest.this.currentTestAnnotation(annotationCls);
+ }
+ },
+ new CacheBlockOnSqlQueryTest() {
+ /** {@inheritDoc} */
+ @Nullable @Override protected <A extends Annotation> A currentTestAnnotation(Class<A> annotationCls) {
+ return CacheBlockOnCreateDestoryIndexTest.this.currentTestAnnotation(annotationCls);
+ }
+ }
+ );
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ currentTest().beforeTest();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ currentTest().afterTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(0)
+ @Params(atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ public void testCreateIndexAtomicPartitionedGet() throws Exception {
+ doTestCreateIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(0)
+ @Params(atomicityMode = ATOMIC, cacheMode = REPLICATED)
+ public void testCreateIndexAtomicReplicatedGet() throws Exception {
+ doTestCreateIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(0)
+ @Params(atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ public void testCreateIndexTransactionalPartitionedGet() throws Exception {
+ doTestCreateIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(0)
+ @Params(atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
+ public void testCreateIndexTransactionalReplicatedGet() throws Exception {
+ doTestCreateIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(1)
+ @Params(atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ public void testCreateIndexAtomicPartitionedGetAll() throws Exception {
+ doTestCreateIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(1)
+ @Params(atomicityMode = ATOMIC, cacheMode = REPLICATED)
+ public void testCreateIndexAtomicReplicatedGetAll() throws Exception {
+ doTestCreateIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(1)
+ @Params(atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ public void testCreateIndexTransactionalPartitionedGetAll() throws Exception {
+ doTestCreateIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(1)
+ @Params(atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
+ public void testCreateIndexTransactionalReplicatedGetAll() throws Exception {
+ doTestCreateIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(2)
+ @Params(atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ public void testCreateIndexAtomicPartitionedScan() throws Exception {
+ doTestCreateIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(2)
+ @Params(atomicityMode = ATOMIC, cacheMode = REPLICATED)
+ public void testCreateIndexAtomicReplicatedScan() throws Exception {
+ doTestCreateIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(2)
+ @Params(atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ public void testCreateIndexTransactionalPartitionedScan() throws Exception {
+ doTestCreateIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(2)
+ @Params(atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
+ public void testCreateIndexTransactionalReplicatedScan() throws Exception {
+ doTestCreateIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(3)
+ @Params(atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ public void testCreateIndexAtomicPartitionedSqlQuery() throws Exception {
+ doTestCreateIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(3)
+ @Params(atomicityMode = ATOMIC, cacheMode = REPLICATED)
+ public void testCreateIndexAtomicReplicatedSqlQuery() throws Exception {
+ doTestCreateIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(3)
+ @Params(atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ public void testCreateIndexTransactionalPartitionedSqlQuery() throws Exception {
+ doTestCreateIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(3)
+ @Params(atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
+ public void testCreateIndexTransactionalReplicatedSqlQuery() throws Exception {
+ doTestCreateIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(0)
+ @Params(atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ public void testDestroyIndexAtomicPartitionedGet() throws Exception {
+ doTestDestroyIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(0)
+ @Params(atomicityMode = ATOMIC, cacheMode = REPLICATED)
+ public void testDestroyIndexAtomicReplicatedGet() throws Exception {
+ doTestDestroyIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(0)
+ @Params(atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ public void testDestroyIndexTransactionalPartitionedGet() throws Exception {
+ doTestDestroyIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(0)
+ @Params(atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
+ public void testDestroyIndexTransactionalReplicatedGet() throws Exception {
+ doTestDestroyIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(1)
+ @Params(atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ public void testDestroyIndexAtomicPartitionedGetAll() throws Exception {
+ doTestDestroyIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(1)
+ @Params(atomicityMode = ATOMIC, cacheMode = REPLICATED)
+ public void testDestroyIndexAtomicReplicatedGetAll() throws Exception {
+ doTestDestroyIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(1)
+ @Params(atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ public void testDestroyIndexTransactionalPartitionedGetAll() throws Exception {
+ doTestDestroyIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(1)
+ @Params(atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
+ public void testDestroyIndexTransactionalReplicatedGetAll() throws Exception {
+ doTestDestroyIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(2)
+ @Params(atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ public void testDestroyIndexAtomicPartitionedScan() throws Exception {
+ doTestDestroyIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(2)
+ @Params(atomicityMode = ATOMIC, cacheMode = REPLICATED)
+ public void testDestroyIndexAtomicReplicatedScan() throws Exception {
+ doTestDestroyIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(2)
+ @Params(atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ public void testDestroyIndexTransactionalPartitionedScan() throws Exception {
+ doTestDestroyIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(2)
+ @Params(atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
+ public void testDestroyIndexTransactionalReplicatedScan() throws Exception {
+ doTestDestroyIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(3)
+ @Params(atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ public void testDestroyIndexAtomicPartitionedSqlQuery() throws Exception {
+ doTestDestroyIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(3)
+ @Params(atomicityMode = ATOMIC, cacheMode = REPLICATED)
+ public void testDestroyIndexAtomicReplicatedSqlQuery() throws Exception {
+ doTestDestroyIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(3)
+ @Params(atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ public void testDestroyIndexTransactionalPartitionedSqlQuery() throws Exception {
+ doTestDestroyIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @TestIndex(3)
+ @Params(atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
+ public void testDestroyIndexTransactionalReplicatedSqlQuery() throws Exception {
+ doTestDestroyIndex();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void doTestCreateIndex() throws Exception {
+ IgniteEx ignite = currentTest().baseline().get(0);
+
+ List<T3<String, String, String>> caches = createCaches(ignite);
+
+ currentTest().doTest(
+ msg -> msg instanceof SchemaOperationStatusMessage,
+ () -> createIndex(ignite, caches.remove(caches.size() - 1))
+ );
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void doTestDestroyIndex() throws Exception {
+ IgniteEx ignite = currentTest().baseline().get(0);
+
+ List<T3<String, String, String>> caches = createCaches(ignite);
+
+ for (T3<String, String, String> pair : caches)
+ createIndex(ignite, pair);
+
+ currentTest().doTest(
+ msg -> msg instanceof SchemaOperationStatusMessage,
+ () -> destroyIndex(ignite, caches.remove(caches.size() - 1))
+ );
+ }
+
+ /**
+ * @param ignite Ignite instance.
+ * @return 3 pairs {@code {cacheName, tableName, indexName}} for further sql operations.
+ */
+ @NotNull private List<T3<String, String, String>> createCaches(IgniteEx ignite) {
+ List<T3<String, String, String>> caches = new ArrayList<>();
+
+ for (int i = 0; i < 3; i++) {
+ String tblName = "TABLE_" + UUID.randomUUID().toString().replace('-', '_');
+
+ String cacheName = "CACHE_" + tblName;
+
+ CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>(cacheName).setSqlSchema("PUBLIC");
+
+ IgniteCache<?, ?> cache = ignite.createCache(ccfg);
+
+ String createTblQryStr = String.format(
+ "CREATE TABLE %s (id LONG, name VARCHAR, city_id LONG, PRIMARY KEY (id, city_id)) " +
+ "WITH \"backups=1, affinityKey=city_id\"",
+ tblName
+ );
+
+ cache.query(new SqlFieldsQuery(createTblQryStr)).getAll();
+
+ String idxName = "IDX_" + tblName;
+
+ caches.add(new T3<>(cacheName, tblName, idxName));
+ }
+
+ return caches;
+ }
+
+ /**
+ *
+ */
+ private void createIndex(IgniteEx ignite, T3<String, String, String> pair) {
+ IgniteCache<?, ?> cache = ignite.getOrCreateCache(pair.get1());
+
+ String createIdxQryStr = String.format("CREATE INDEX %S on %s (city_id)", pair.get3(), pair.get2());
+
+ cache.query(new SqlFieldsQuery(createIdxQryStr)).getAll();
+ }
+
+ /**
+ *
+ */
+ private void destroyIndex(IgniteEx ignite, T3<String, String, String> pair) {
+ IgniteCache<?, ?> cache = ignite.getOrCreateCache(pair.get1());
+
+ String createIdxQryStr = String.format("DROP INDEX %s", pair.get3());
+
+ cache.query(new SqlFieldsQuery(createIdxQryStr)).getAll();
+ }
+
+ /**
+ *
+ */
+ @Target(ElementType.METHOD)
+ @Retention(RetentionPolicy.RUNTIME)
+ protected @interface TestIndex {
+ /**
+ * Index in {@link CacheBlockOnCreateDestoryIndexTest#tests} list.
+ */
+ int value();
+ }
+
+ /**
+ * Index in {@link CacheBlockOnCreateDestoryIndexTest#tests} list.
+ *
+ * @see TestIndex#value()
+ */
+ private CacheBlockOnReadAbstractTest currentTest() {
+ return tests.get(currentTestAnnotation(TestIndex.class).value());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3fae41b1/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnSqlQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnSqlQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnSqlQueryTest.java
new file mode 100644
index 0000000..f1d96ea
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnSqlQueryTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.Random;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.jetbrains.annotations.NotNull;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ *
+ */
+public class CacheBlockOnSqlQueryTest extends CacheBlockOnReadAbstractTest {
+
+ /** {@inheritDoc} */
+ @Override @NotNull protected CacheReadBackgroundOperation<?, ?> getReadOperation() {
+ return new CacheReadBackgroundOperation<Integer, TestingEntity>() {
+ /** Random. */
+ private Random random = new Random();
+
+ /** {@inheritDoc} */
+ @Override protected CacheConfiguration<Integer, TestingEntity> createCacheConfiguration() {
+ return super.createCacheConfiguration().setIndexedTypes(Integer.class, TestingEntity.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Integer createKey(int idx) {
+ return idx;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected TestingEntity createValue(int idx) {
+ return new TestingEntity(idx, idx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void doRead() {
+ int idx = random.nextInt(entriesCount());
+
+ cache().query(
+ new SqlQuery<>(TestingEntity.class, "val >= ? and val < ?")
+ .setArgs(idx, idx + 500)
+ ).getAll();
+ }
+ };
+ }
+
+ /**
+ *
+ */
+ public static class TestingEntity {
+ /** Id. */
+ @QuerySqlField(index = true)
+ public Integer id;
+
+ /** Value. */
+ @QuerySqlField(index = true)
+ public double val;
+
+ /**
+ * Default constructor.
+ */
+ public TestingEntity() {
+ }
+
+ /**
+ * @param id Id.
+ * @param val Value.
+ */
+ public TestingEntity(Integer id, double val) {
+ this.id = id;
+ this.val = val;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ @Override public void testStartServerAtomicPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9916");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ @Override public void testStartServerTransactionalPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9916");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ @Override public void testStopServerAtomicPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9916");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ @Override public void testStopServerTransactionalPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9916");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ @Override public void testStopBaselineAtomicPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9916");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ @Override public void testStopBaselineTransactionalPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9916");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3fae41b1/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheBlockExchangeOnSqlReadOperationsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheBlockExchangeOnSqlReadOperationsTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheBlockExchangeOnSqlReadOperationsTestSuite.java
new file mode 100644
index 0000000..3be640b
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheBlockExchangeOnSqlReadOperationsTestSuite.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.distributed.CacheBlockOnCreateDestoryIndexTest;
+import org.apache.ignite.internal.processors.cache.distributed.CacheBlockOnSqlQueryTest;
+
+/**
+ * Test suite for cache queries.
+ */
+public class IgniteCacheBlockExchangeOnSqlReadOperationsTestSuite extends TestSuite {
+ /**
+ * @return Test suite.
+ */
+ public static TestSuite suite() {
+ TestSuite suite = new TestSuite("Do Not Block Read Operations Test Suite");
+
+ suite.addTestSuite(CacheBlockOnSqlQueryTest.class);
+ suite.addTestSuite(CacheBlockOnCreateDestoryIndexTest.class);
+
+ return suite;
+ }
+}
[2/2] ignite git commit: IGNITE-9694 Add tests to check that reading
queries are not blocked on exchange events that don't change data visibility
- Fixes #4926.
Posted by dg...@apache.org.
IGNITE-9694 Add tests to check that reading queries are not blocked on exchange events that don't change data visibility - Fixes #4926.
Signed-off-by: Dmitriy Govorukhin <dm...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3fae41b1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3fae41b1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3fae41b1
Branch: refs/heads/master
Commit: 3fae41b1fce89f2f05ff9027cdc37ed84f3a70a0
Parents: 3a4167a
Author: ibessonov <be...@gmail.com>
Authored: Mon Oct 22 14:59:19 2018 +0300
Committer: Dmitriy Govorukhin <dm...@gmail.com>
Committed: Mon Oct 22 14:59:19 2018 +0300
----------------------------------------------------------------------
.../ignite/internal/util/IgniteUtils.java | 12 +-
.../ignite/internal/util/lang/GridFunc.java | 2 +
.../distributed/CacheBlockOnGetAllTest.java | 196 +++
.../CacheBlockOnReadAbstractTest.java | 1277 ++++++++++++++++++
.../cache/distributed/CacheBlockOnScanTest.java | 73 +
.../distributed/CacheBlockOnSingleGetTest.java | 190 +++
.../testframework/junits/GridAbstractTest.java | 33 +
...eBlockExchangeOnReadOperationsTestSuite.java | 51 +
.../testsuites/IgniteCacheTestSuite7.java | 2 +-
.../CacheBlockOnCreateDestoryIndexTest.java | 480 +++++++
.../distributed/CacheBlockOnSqlQueryTest.java | 131 ++
...ockExchangeOnSqlReadOperationsTestSuite.java | 39 +
12 files changed, 2475 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3fae41b1/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index d10a7c7..e6f374a 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -8169,23 +8169,15 @@ public abstract class IgniteUtils {
for (Class cls = obj.getClass(); cls != Object.class; cls = cls.getSuperclass()) {
for (Field field : cls.getDeclaredFields()) {
if (field.getName().equals(fieldName)) {
- boolean accessible = field.isAccessible();
-
field.setAccessible(true);
- T val = (T)field.get(obj);
-
- if (!accessible)
- field.setAccessible(false);
-
- return val;
+ return (T)field.get(obj);
}
}
}
}
catch (Exception e) {
- throw new IgniteException("Failed to get field value [fieldName=" + fieldName + ", obj=" + obj + ']',
- e);
+ throw new IgniteException("Failed to get field value [fieldName=" + fieldName + ", obj=" + obj + ']', e);
}
throw new IgniteException("Failed to get field value [fieldName=" + fieldName + ", obj=" + obj + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/3fae41b1/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index ce5076b..3b3bbaa 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -94,6 +94,7 @@ import org.apache.ignite.internal.util.lang.gridfunc.TransformFilteringIterator;
import org.apache.ignite.internal.util.lang.gridfunc.TransformMapView;
import org.apache.ignite.internal.util.lang.gridfunc.TransformMapView2;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiClosure;
@@ -2172,6 +2173,7 @@ public class GridFunc {
* @param t2 Second object in pair.
* @param <T> Type of objects in pair.
* @return Pair of objects.
+ * @deprecated Use {@link T2} instead.
*/
@Deprecated
public static <T> IgnitePair<T> pair(@Nullable T t1, @Nullable T t2) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/3fae41b1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnGetAllTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnGetAllTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnGetAllTest.java
new file mode 100644
index 0000000..084a431
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnGetAllTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import org.jetbrains.annotations.NotNull;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ *
+ */
+public class CacheBlockOnGetAllTest extends CacheBlockOnReadAbstractTest {
+
+ /** {@inheritDoc} */
+ @Override @NotNull protected CacheReadBackgroundOperation<?, ?> getReadOperation() {
+ return new IntCacheReadBackgroundOperation() {
+ /** Random. */
+ private Random random = new Random();
+
+ /** {@inheritDoc} */
+ @Override public void doRead() {
+ Set<Integer> keys = new HashSet<>();
+
+ for (int i = 0; i < 500; i++)
+ keys.add(random.nextInt(entriesCount()));
+
+ cache().getAll(keys);
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ @Override public void testStopBaselineAtomicPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9915");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = REPLICATED)
+ @Override public void testStopBaselineAtomicReplicated() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9915");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ @Override public void testStopBaselineTransactionalPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9915");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
+ @Override public void testStopBaselineTransactionalReplicated() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9915");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ @Override public void testCreateCacheAtomicPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = REPLICATED)
+ @Override public void testCreateCacheAtomicReplicated() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ @Override public void testCreateCacheTransactionalPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
+ @Override public void testCreateCacheTransactionalReplicated() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ @Override public void testDestroyCacheAtomicPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = REPLICATED)
+ @Override public void testDestroyCacheAtomicReplicated() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ @Override public void testDestroyCacheTransactionalPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
+ @Override public void testDestroyCacheTransactionalReplicated() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ @Override public void testStartServerAtomicPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = REPLICATED)
+ @Override public void testStartServerAtomicReplicated() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ @Override public void testStartServerTransactionalPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
+ @Override public void testStartServerTransactionalReplicated() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ @Override public void testStopServerAtomicPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = REPLICATED)
+ @Override public void testStopServerAtomicReplicated() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ @Override public void testStopServerTransactionalPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
+ @Override public void testStopServerTransactionalReplicated() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ @Override public void testUpdateBaselineTopologyAtomicPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = REPLICATED)
+ @Override public void testUpdateBaselineTopologyAtomicReplicated() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ @Override public void testUpdateBaselineTopologyTransactionalPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
+ @Override public void testUpdateBaselineTopologyTransactionalReplicated() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3fae41b1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnReadAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnReadAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnReadAbstractTest.java
new file mode 100644
index 0000000..42b5df0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnReadAbstractTest.java
@@ -0,0 +1,1277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import org.apache.ignite.internal.processors.cache.ExchangeActions;
+import org.apache.ignite.internal.processors.cache.ExchangeActions.CacheActionData;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ *
+ */
+public abstract class CacheBlockOnReadAbstractTest extends GridCommonAbstractTest {
+ /** Default cache entries count. */
+ private static final int DFLT_CACHE_ENTRIES_CNT = 2 * 1024;
+
+ /** Ip finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** List of baseline nodes started at the beginning of the test. */
+ protected final List<IgniteEx> baseline = new CopyOnWriteArrayList<>();
+
+ /** List of server nodes started at the beginning of the test. */
+ protected final List<IgniteEx> srvs = new CopyOnWriteArrayList<>();
+
+ /** List of client nodes started at the beginning of the test. */
+ protected final List<IgniteEx> clients = new CopyOnWriteArrayList<>();
+
+ /** Start node in client mode. */
+ private volatile boolean startNodesInClientMode;
+
+ /** Latch that is used to wait until all required messages are blocked. */
+ private volatile CountDownLatch cntFinishedReadOperations;
+
+ /** Custom ip finder. */
+ private volatile TcpDiscoveryIpFinder customIpFinder;
+
+ /**
+ * Number of baseline servers to start before test.
+ *
+ * @see Params#baseline()
+ */
+ protected int baselineServersCount() {
+ return currentTestParams().baseline();
+ }
+
+ /**
+ * Number of non-baseline servers to start before test.
+ *
+ * @see Params#servers()
+ */
+ protected int serversCount() {
+ return currentTestParams().servers();
+ }
+
+ /**
+ * Number of clients to start before test.
+ *
+ * @see Params#clients()
+ */
+ protected int clientsCount() {
+ return currentTestParams().clients();
+ }
+
+ /**
+ * Number of backups to configure in caches by default.
+ */
+ protected int backupsCount() {
+ return Math.min(3, baselineServersCount() - 1);
+ }
+
+ /**
+ * Number of milliseconds to warmup reading process. Used to lower fluctuations in run time. Might be 0.
+ *
+ * @see Params#warmup()
+ */
+ protected long warmup() {
+ return currentTestParams().warmup();
+ }
+
+ /**
+ * Number of milliseconds to wait on the potentially blocking operation.
+ *
+ * @see Params#timeout()
+ */
+ protected long timeout() {
+ return currentTestParams().timeout();
+ }
+
+ /**
+ * Cache atomicity mode.
+ *
+ * @see Params#atomicityMode()
+ */
+ protected CacheAtomicityMode atomicityMode() {
+ return currentTestParams().atomicityMode();
+ }
+
+ /**
+ * Cache mode.
+ *
+ * @see Params#cacheMode()
+ */
+ protected CacheMode cacheMode() {
+ return currentTestParams().cacheMode();
+ }
+
+ /**
+ * Whether allowing {@link ClusterTopologyCheckedException} as the valid reading result or not.
+ *
+ * @see Params#allowException()
+ */
+ protected boolean allowException() {
+ return currentTestParams().allowException();
+ }
+
+ /**
+ * @param startNodesInClientMode Start nodes on client mode.
+ */
+ public void startNodesInClientMode(boolean startNodesInClientMode) {
+ this.startNodesInClientMode = startNodesInClientMode;
+ }
+
+ /** List of baseline nodes started at the beginning of the test. */
+ public List<? extends IgniteEx> baseline() {
+ return baseline;
+ }
+
+ /** List of server nodes started at the beginning of the test. */
+ public List<? extends IgniteEx> servers() {
+ return srvs;
+ }
+
+ /** List of client nodes started at the beginning of the test. */
+ public List<? extends IgniteEx> clients() {
+ return clients;
+ }
+
+ /**
+ * Annotation to configure test methods in {@link CacheBlockOnReadAbstractTest}. Its values are used throughout
+ * test implementation.
+ */
+ @Target(ElementType.METHOD)
+ @Retention(RetentionPolicy.RUNTIME)
+ public @interface Params {
+ /**
+ * Number of baseline servers to start before test.
+ */
+ int baseline() default 3;
+
+ /**
+ * Number of non-baseline servers to start before test.
+ */
+ int servers() default 1;
+
+ /**
+ * Number of clients to start before test.
+ */
+ int clients() default 1;
+
+ /**
+ * Number of milliseconds to warmup reading process. Used to lower fluctuations in run time. Might be 0.
+ */
+ long warmup() default 2000L;
+
+ /**
+ * Number of milliseconds to wait on the potentially blocking operation.
+ */
+ long timeout() default 3000L;
+
+ /**
+ * Cache atomicity mode.
+ */
+ CacheAtomicityMode atomicityMode();
+
+ /**
+ * Cache mode.
+ */
+ CacheMode cacheMode();
+
+ /**
+ * Whether allowing {@link ClusterTopologyCheckedException} as the valid reading result or not.
+ */
+ boolean allowException() default false;
+ }
+
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setConsistentId(igniteInstanceName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(customIpFinder == null ? IP_FINDER : customIpFinder);
+
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+ cfg.setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setPersistenceEnabled(true)
+ )
+ );
+
+ cfg.setClientMode(startNodesInClientMode);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void beforeTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ // Checking prerequisites.
+ assertTrue("Positive timeout is required for the test.", timeout() > 0);
+
+ assertTrue("No baseline servers were requested.", baselineServersCount() > 0);
+
+ int idx = 0;
+
+ // Start baseline nodes.
+ for (int i = 0; i < baselineServersCount(); i++)
+ baseline.add(startGrid(idx++));
+
+ // Activate cluster.
+ baseline.get(0).cluster().active(true);
+
+ // Start server nodes in activated cluster.
+ for (int i = 0; i < serversCount(); i++)
+ srvs.add(startGrid(idx++));
+
+ // Start client nodes.
+ startNodesInClientMode(true);
+
+ customIpFinder = new TcpDiscoveryVmIpFinder(false)
+ .setAddresses(
+ Collections.singletonList("127.0.0.1:47500")
+ );
+
+ for (int i = 0; i < clientsCount(); i++)
+ clients.add(startGrid(idx++));
+
+ customIpFinder = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void afterTest() throws Exception {
+ baseline.clear();
+
+ srvs.clear();
+
+ clients.clear();
+
+ grid(0).cluster().active(false);
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Params(atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ public void testCreateCacheAtomicPartitioned() throws Exception {
+ testCreateCacheTransactionalReplicated();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Params(atomicityMode = ATOMIC, cacheMode = REPLICATED)
+ public void testCreateCacheAtomicReplicated() throws Exception {
+ testCreateCacheTransactionalReplicated();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Params(atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ public void testCreateCacheTransactionalPartitioned() throws Exception {
+ testCreateCacheTransactionalReplicated();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Params(atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
+ public void testCreateCacheTransactionalReplicated() throws Exception {
+ doTest(
+ asMessagePredicate(CacheBlockOnReadAbstractTest::createCachePredicate),
+ () -> baseline.get(0).createCache(UUID.randomUUID().toString())
+ );
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Params(timeout = 5000L, atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ public void testDestroyCacheAtomicPartitioned() throws Exception {
+ testDestroyCacheTransactionalReplicated();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Params(timeout = 5000L, atomicityMode = ATOMIC, cacheMode = REPLICATED)
+ public void testDestroyCacheAtomicReplicated() throws Exception {
+ testDestroyCacheTransactionalReplicated();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Params(timeout = 5000L, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ public void testDestroyCacheTransactionalPartitioned() throws Exception {
+ testDestroyCacheTransactionalReplicated();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Params(timeout = 5000L, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
+ public void testDestroyCacheTransactionalReplicated() throws Exception {
+ List<String> cacheNames = new ArrayList<>(Arrays.asList(
+ UUID.randomUUID().toString(),
+ UUID.randomUUID().toString(),
+ UUID.randomUUID().toString())
+ );
+
+ for (String cacheName : cacheNames)
+ baseline.get(0).createCache(cacheName);
+
+ doTest(
+ asMessagePredicate(CacheBlockOnReadAbstractTest::destroyCachePredicate),
+ () -> baseline.get(0).destroyCache(cacheNames.remove(0))
+ );
+ }
+ /**
+ * @throws Exception If failed.
+ */
+ public void _testStartClient() throws Exception {
+ startNodesInClientMode(true);
+
+ doTest(
+ asMessagePredicate(discoEvt -> discoEvt.type() == EventType.EVT_NODE_JOINED),
+ () -> {
+ for (int i = 0; i < baselineServersCount() - 2; i++)
+ cntFinishedReadOperations.countDown();
+
+ customIpFinder = new TcpDiscoveryVmIpFinder(false)
+ .setAddresses(
+ Collections.singletonList("127.0.0.1:47500")
+ );
+
+ startGrid(UUID.randomUUID().toString());
+
+ customIpFinder = null;
+ }
+ );
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void _testStopClient() throws Exception {
+ customIpFinder = new TcpDiscoveryVmIpFinder(false)
+ .setAddresses(
+ Collections.singletonList("127.0.0.1:47500")
+ );
+
+ startNodesInClientMode(true);
+
+ for (int i = 0; i < 3; i++)
+ clients.add(startGrid(UUID.randomUUID().toString()));
+
+ customIpFinder = null;
+
+ doTest(
+ asMessagePredicate(discoEvt -> discoEvt.type() == EventType.EVT_NODE_LEFT),
+ () -> {
+ for (int i = 0; i < baselineServersCount() - 2; i++)
+ cntFinishedReadOperations.countDown();
+
+ stopGrid(clients.remove(clients.size() - 1).name());
+ }
+ );
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Params(atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ public void testStartServerAtomicPartitioned() throws Exception {
+ testStartServerTransactionalReplicated();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Params(atomicityMode = ATOMIC, cacheMode = REPLICATED)
+ public void testStartServerAtomicReplicated() throws Exception {
+ testStartServerTransactionalReplicated();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Params(atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ public void testStartServerTransactionalPartitioned() throws Exception {
+ testStartServerTransactionalReplicated();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Params(atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
+ public void testStartServerTransactionalReplicated() throws Exception {
+ startNodesInClientMode(false);
+
+ doTest(
+ asMessagePredicate(discoEvt -> discoEvt.type() == EventType.EVT_NODE_JOINED),
+ () -> startGrid(UUID.randomUUID().toString())
+ );
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Params(servers = 4, atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ public void testStopServerAtomicPartitioned() throws Exception {
+ testStopServerTransactionalReplicated();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Params(servers = 4, atomicityMode = ATOMIC, cacheMode = REPLICATED)
+ public void testStopServerAtomicReplicated() throws Exception {
+ testStopServerTransactionalReplicated();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Params(servers = 4, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ public void testStopServerTransactionalPartitioned() throws Exception {
+ testStopServerTransactionalReplicated();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Params(servers = 4, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
+ public void testStopServerTransactionalReplicated() throws Exception {
+ doTest(
+ asMessagePredicate(discoEvt -> discoEvt.type() == EventType.EVT_NODE_LEFT),
+ () -> stopGrid(srvs.remove(srvs.size() - 1).name())
+ );
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Params(baseline = 4, atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ public void testRestartBaselineAtomicPartitioned() throws Exception {
+ testRestartBaselineTransactionalReplicated();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Params(baseline = 4, atomicityMode = ATOMIC, cacheMode = REPLICATED)
+ public void testRestartBaselineAtomicReplicated() throws Exception {
+ testRestartBaselineTransactionalReplicated();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Params(baseline = 4, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ public void testRestartBaselineTransactionalPartitioned() throws Exception {
+ testRestartBaselineTransactionalReplicated();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Params(baseline = 4, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
+ public void testRestartBaselineTransactionalReplicated() throws Exception {
+ doTest(
+ asMessagePredicate(discoEvt -> discoEvt.type() == EventType.EVT_NODE_JOINED),
+ () -> {
+ IgniteEx node = baseline.get(baseline.size() - 1);
+
+ TestRecordingCommunicationSpi.spi(node).stopBlock();
+
+ stopGrid(node.name());
+
+ for (int i = 0; i < baselineServersCount() - 2; i++)
+ cntFinishedReadOperations.countDown();
+
+ startGrid(node.name());
+ }
+ );
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Params(timeout = 5000L, atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ public void testUpdateBaselineTopologyAtomicPartitioned() throws Exception {
+ testUpdateBaselineTopologyTransactionalReplicated();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Params(timeout = 5000L, atomicityMode = ATOMIC, cacheMode = REPLICATED)
+ public void testUpdateBaselineTopologyAtomicReplicated() throws Exception {
+ testUpdateBaselineTopologyTransactionalReplicated();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Params(timeout = 5000L, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ public void testUpdateBaselineTopologyTransactionalPartitioned() throws Exception {
+ testUpdateBaselineTopologyTransactionalReplicated();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Params(timeout = 5000L, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
+ public void testUpdateBaselineTopologyTransactionalReplicated() throws Exception {
+ doTest(
+ asMessagePredicate(discoEvt -> {
+ if (discoEvt instanceof DiscoveryCustomEvent) {
+ DiscoveryCustomEvent discoCustomEvt = (DiscoveryCustomEvent)discoEvt;
+
+ DiscoveryCustomMessage customMsg = discoCustomEvt.customMessage();
+
+ return customMsg instanceof ChangeGlobalStateMessage;
+ }
+
+ return false;
+ }),
+ () -> {
+ startNodesInClientMode(false);
+
+ IgniteEx ignite = startGrid(UUID.randomUUID().toString());
+
+ baseline.get(0).cluster().setBaselineTopology(baseline.get(0).context().discovery().topologyVersion());
+
+ baseline.add(ignite);
+ }
+ );
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Params(baseline = 9, atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ public void testStopBaselineAtomicPartitioned() throws Exception {
+ testStopBaselineTransactionalReplicated();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Params(baseline = 9, atomicityMode = ATOMIC, cacheMode = REPLICATED)
+ public void testStopBaselineAtomicReplicated() throws Exception {
+ testStopBaselineTransactionalReplicated();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Params(baseline = 9, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ public void testStopBaselineTransactionalPartitioned() throws Exception {
+ testStopBaselineTransactionalReplicated();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Params(baseline = 9, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
+ public void testStopBaselineTransactionalReplicated() throws Exception {
+ AtomicInteger cntDownCntr = new AtomicInteger(0);
+
+ doTest(
+ asMessagePredicate(discoEvt -> discoEvt.type() == EventType.EVT_NODE_LEFT),
+ () -> {
+ IgniteEx node = baseline.get(baseline.size() - cntDownCntr.get() - 1);
+
+ TestRecordingCommunicationSpi.spi(node).stopBlock();
+
+ cntDownCntr.incrementAndGet();
+
+ for (int i = 0; i < cntDownCntr.get(); i++)
+ cntFinishedReadOperations.countDown(); // This node and previously stopped nodes as well.
+
+ stopGrid(node.name());
+ }
+ );
+ }
+
+ /**
+ * Checks that given discovery event is from "Create cache" operation.
+ *
+ * @param discoEvt Discovery event.
+ */
+ private static boolean createCachePredicate(DiscoveryEvent discoEvt) {
+ if (discoEvt instanceof DiscoveryCustomEvent) {
+
+ DiscoveryCustomEvent discoCustomEvt = (DiscoveryCustomEvent)discoEvt;
+
+ DiscoveryCustomMessage customMsg = discoCustomEvt.customMessage();
+
+ if (customMsg instanceof DynamicCacheChangeBatch) {
+ DynamicCacheChangeBatch cacheChangeBatch = (DynamicCacheChangeBatch)customMsg;
+
+ ExchangeActions exchangeActions = U.field(cacheChangeBatch, "exchangeActions");
+
+ Collection<CacheActionData> startRequests = exchangeActions.cacheStartRequests();
+
+ return !startRequests.isEmpty();
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Checks that given discovery event is from "Destroy cache" operation.
+ *
+ * @param discoEvt Discovery event.
+ */
+ private static boolean destroyCachePredicate(DiscoveryEvent discoEvt) {
+ if (discoEvt instanceof DiscoveryCustomEvent) {
+
+ DiscoveryCustomEvent discoCustomEvt = (DiscoveryCustomEvent)discoEvt;
+
+ DiscoveryCustomMessage customMsg = discoCustomEvt.customMessage();
+
+ if (customMsg instanceof DynamicCacheChangeBatch) {
+ DynamicCacheChangeBatch cacheChangeBatch = (DynamicCacheChangeBatch)customMsg;
+
+ ExchangeActions exchangeActions = U.field(cacheChangeBatch, "exchangeActions");
+
+ Collection<CacheActionData> stopRequests = exchangeActions.cacheStopRequests();
+
+ return !stopRequests.isEmpty();
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Read operation tat is going to be executed during blocking operation.
+ */
+ @NotNull protected abstract CacheReadBackgroundOperation getReadOperation();
+
+ /**
+ * Checks that {@code block} closure doesn't block read operation.
+ * Does it for client, baseline and regular server node.
+ *
+ * @param blockMsgPred Predicate that check whether the message corresponds to the {@code block} or not.
+ * @param block Blocking operation.
+ * @throws Exception If failed.
+ */
+ public void doTest(Predicate<Message> blockMsgPred, RunnableX block) throws Exception {
+ BackgroundOperation backgroundOperation = new BlockMessageOnBaselineBackgroundOperation(
+ block,
+ blockMsgPred
+ );
+
+ CacheReadBackgroundOperation<?, ?> readOperation = getReadOperation();
+
+ readOperation.initCache(baseline.get(0), true);
+
+ // Warmup.
+ if (warmup() > 0) {
+ try (AutoCloseable read = readOperation.start()) {
+ Thread.sleep(warmup());
+ }
+
+ assertEquals(
+ readOperation.readOperationsFailed() + " read operations failed during warmup.",
+ 0,
+ readOperation.readOperationsFailed()
+ );
+
+ assertTrue(
+ "No read operations were finished during warmup.",
+ readOperation.readOperationsFinishedUnderBlock() > 0
+ );
+ }
+
+
+ doTest0(clients.get(0), readOperation, backgroundOperation);
+
+ doTest0(srvs.get(0), readOperation, backgroundOperation);
+
+ doTest0(baseline.get(0), readOperation, backgroundOperation);
+
+
+ try (AutoCloseable read = readOperation.start()) {
+ Thread.sleep(500L);
+ }
+
+ assertEquals(
+ readOperation.readOperationsFailed() + " read operations failed during finish stage.",
+ 0,
+ readOperation.readOperationsFailed()
+ );
+
+ assertTrue(
+ "No read operations were finished during finish stage.",
+ readOperation.readOperationsFinishedUnderBlock() > 0
+ );
+ }
+
+ /**
+ * Internal part for {@link CacheBlockOnReadAbstractTest#doTest(Predicate, RunnableX)}.
+ *
+ * @param ignite Ignite instance. Client / baseline / server node.
+ * @param readOperation Read operation.
+ * @param backgroundOperation Background operation.
+ */
+ private void doTest0(
+ IgniteEx ignite,
+ CacheReadBackgroundOperation<?, ?> readOperation,
+ BackgroundOperation backgroundOperation
+ ) throws Exception {
+ // Reinit internal cache state with given ignite instance.
+ readOperation.initCache(ignite, false);
+
+ cntFinishedReadOperations = new CountDownLatch(baseline.size() - 1);
+
+ // Read while potentially blocking operation is executing.
+ try (AutoCloseable block = backgroundOperation.start()) {
+ cntFinishedReadOperations.await(5 * timeout(), TimeUnit.MILLISECONDS);
+
+ // Possible if test itself is wrong.
+ assertEquals("Messages weren't blocked in time", 0, cntFinishedReadOperations.getCount());
+
+ try (AutoCloseable read = readOperation.start()) {
+ Thread.sleep(timeout());
+ }
+ }
+ finally {
+ cntFinishedReadOperations = null;
+ }
+
+ log.info("Operations finished: " + readOperation.readOperationsFinishedUnderBlock());
+ log.info("Longest operation took " + readOperation.maxReadDuration() + "ms");
+
+ // None of read operations should fail.
+ assertEquals(
+ readOperation.readOperationsFailed() + " read operations failed.",
+ 0,
+ readOperation.readOperationsFailed()
+ );
+
+ assertTrue(
+ "No read operations were finished during timeout.",
+ readOperation.readOperationsFinishedUnderBlock() > 0
+ );
+
+ // There were no operations as long as blocking timeout.
+ assertNotAlmostEqual(timeout(), readOperation.maxReadDuration());
+
+ // On average every read operation was much faster then blocking timeout.
+ double avgDuration = (double)timeout() / readOperation.readOperationsFinishedUnderBlock();
+
+ assertTrue("Avarage duration was too long.",avgDuration < timeout() * 0.25);
+ }
+
+ /**
+ * Utility class that allows to start and stop some background operation many times.
+ */
+ protected abstract static class BackgroundOperation {
+ /** */
+ private IgniteInternalFuture<?> fut;
+
+ /**
+ * Invoked strictly before background thread is started.
+ */
+ protected void init() {
+ // No-op.
+ }
+
+ /**
+ * Operation itself. Will be executed in separate thread. Thread interruption has to be considered as a valid
+ * way to stop operation.
+ */
+ protected abstract void execute();
+
+ /**
+ * @return Allowed time to wait in {@link BackgroundOperation#stop()} method before canceling background thread.
+ */
+ protected abstract long stopTimeout();
+
+ /**
+ * Start separate thread and execute method {@link BackgroundOperation#execute()} in it.
+ *
+ * @return {@link AutoCloseable} that invokes {@link BackgroundOperation#stop()} on closing.
+ */
+ AutoCloseable start() {
+ if (fut != null)
+ throw new UnsupportedOperationException("Only one simultanious operation is allowed");
+
+ init();
+
+ CountDownLatch threadStarted = new CountDownLatch(1);
+
+ fut = GridTestUtils.runAsync(() -> {
+ try {
+ threadStarted.countDown();
+
+ execute();
+ }
+ catch (Exception e) {
+ throw new IgniteException("Unexpected exception in background operation thread", e);
+ }
+ });
+
+ try {
+ threadStarted.await();
+ }
+ catch (InterruptedException e) {
+ try {
+ fut.cancel();
+ }
+ catch (IgniteCheckedException e1) {
+ e.addSuppressed(e1);
+ }
+
+ throw new IgniteException(e);
+ }
+
+ return this::stop;
+ }
+
+ /**
+ * Interrupt the operation started in {@link BackgroundOperation#start()} method and join interrupted thread.
+ */
+ void stop() throws Exception {
+ if (fut == null)
+ return;
+
+ try {
+ fut.get(stopTimeout());
+ }
+ catch (IgniteFutureTimeoutCheckedException e) {
+ fut.cancel();
+
+ fut.get();
+ }
+ finally {
+ fut = null;
+ }
+ }
+ }
+
+ /**
+ * @param discoEvtPred Predicate that tests discovery events.
+ * @return New predicate that test any message based on {@code discoEvtPred} predicate.
+ */
+ public static Predicate<Message> asMessagePredicate(Predicate<DiscoveryEvent> discoEvtPred) {
+ return msg -> {
+ if (msg instanceof GridDhtPartitionsFullMessage) {
+ GridDhtPartitionsFullMessage fullMsg = (GridDhtPartitionsFullMessage)msg;
+
+ GridDhtPartitionExchangeId exchangeId = fullMsg.exchangeId();
+
+ if (exchangeId != null)
+ return discoEvtPred.test(U.field(exchangeId, "discoEvt"));
+ }
+
+ return false;
+ };
+ }
+
+ /**
+ * Background operation that executes some node request and doesn't allow its messages to be fully processed until
+ * operation is stopped.
+ */
+ protected class BlockMessageOnBaselineBackgroundOperation extends BackgroundOperation {
+ /** */
+ private final RunnableX block;
+
+ /** */
+ private final Predicate<Message> blockMsg;
+
+ /**
+ * @param block Blocking operation.
+ * @param blockMsgPred Predicate that checks whether to block message or not.
+ *
+ * @see BlockMessageOnBaselineBackgroundOperation#blockMessage(ClusterNode, Message)
+ */
+ protected BlockMessageOnBaselineBackgroundOperation(
+ RunnableX block,
+ Predicate<Message> blockMsgPred
+ ) {
+ this.block = block;
+ blockMsg = blockMsgPred;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void execute() {
+ for (IgniteEx server : baseline) {
+ TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(server);
+
+ spi.blockMessages(this::blockMessage);
+ }
+
+ block.run();
+ }
+
+ /**
+ * Function to pass into {@link TestRecordingCommunicationSpi#blockMessages(IgniteBiPredicate)}.
+ *
+ * @param node Node that receives message.
+ * @param msg Message.
+ * @return Whether the given message should be blocked or not.
+ */
+ private boolean blockMessage(ClusterNode node, Message msg) {
+ boolean block = blockMsg.test(msg)
+ && baseline.stream().map(IgniteEx::name).anyMatch(node.consistentId()::equals);
+
+ if (block)
+ cntFinishedReadOperations.countDown();
+
+ return block;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long stopTimeout() {
+ // Should be big enough so thread will stop by it's own. Otherwise test will fail, but that's fine.
+ return 30_000L;
+ }
+
+ /** {@inheritDoc} */
+ @Override void stop() throws Exception {
+ for (IgniteEx server : baseline) {
+ TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(server);
+
+ spi.stopBlock();
+ }
+
+ super.stop();
+ }
+ }
+
+
+ /**
+ * Runnable that can throw exceptions.
+ */
+ @FunctionalInterface
+ public interface RunnableX extends Runnable {
+ /**
+ * Closure body.
+ *
+ * @throws Exception If failed.
+ */
+ void runx() throws Exception;
+
+ /** {@inheritdoc} */
+ @Override default void run() {
+ try {
+ runx();
+ }
+ catch (Exception e) {
+ throw new IgniteException(e);
+ }
+ }
+ }
+
+ /**
+ * {@link BackgroundOperation} implementation for cache reading operations.
+ */
+ protected abstract class ReadBackgroundOperation extends BackgroundOperation {
+
+ /** Counter for successfully finished operations. */
+ private final AtomicInteger readOperationsFinishedUnderBlock = new AtomicInteger();
+
+ /** Counter for failed operations. */
+ private final AtomicInteger readOperationsFailed = new AtomicInteger();
+
+ /** Duration of the longest read operation. */
+ private final AtomicLong maxReadDuration = new AtomicLong(-1);
+
+ /**
+ * Do single iteration of reading operation. Will be executed in a loop.
+ */
+ protected abstract void doRead() throws Exception;
+
+
+ /** {@inheritDoc} */
+ @Override protected void init() {
+ readOperationsFinishedUnderBlock.set(0);
+
+ readOperationsFailed.set(0);
+
+ maxReadDuration.set(-1);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void execute() {
+ Set<String> loggedMessages = new HashSet<>();
+
+ while (!Thread.currentThread().isInterrupted()) {
+ long prevTs = System.currentTimeMillis();
+
+ try {
+ doRead();
+
+ readOperationsFinishedUnderBlock.incrementAndGet();
+ }
+ catch (Exception e) {
+ boolean threadInterrupted = X.hasCause(e,
+ InterruptedException.class,
+ IgniteInterruptedException.class,
+ IgniteInterruptedCheckedException.class
+ );
+
+ if (threadInterrupted)
+ Thread.currentThread().interrupt();
+ else if (allowException() && X.hasCause(e, ClusterTopologyCheckedException.class))
+ readOperationsFinishedUnderBlock.incrementAndGet();
+ else {
+ readOperationsFailed.incrementAndGet();
+
+ if (loggedMessages.add(e.getMessage()))
+ log.error("Error during read operation execution", e);
+
+ continue;
+ }
+ }
+
+ maxReadDuration.set(Math.max(maxReadDuration.get(), System.currentTimeMillis() - prevTs));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long stopTimeout() {
+ return 0;
+ }
+
+ /**
+ * @return Number of successfully finished operations.
+ */
+ public int readOperationsFinishedUnderBlock() {
+ return readOperationsFinishedUnderBlock.get();
+ }
+
+ /**
+ * @return Number of failed operations.
+ */
+ public int readOperationsFailed() {
+ return readOperationsFailed.get();
+ }
+
+ /**
+ * @return Duration of the longest read operation.
+ */
+ public long maxReadDuration() {
+ return maxReadDuration.get();
+ }
+ }
+
+ /**
+ *
+ */
+ protected abstract class CacheReadBackgroundOperation<KeyType, ValueType> extends ReadBackgroundOperation {
+ /**
+ * {@link CacheReadBackgroundOperation#cache()} method backing field. Updated on each
+ * {@link CacheReadBackgroundOperation#initCache(IgniteEx, boolean)} invocation.
+ */
+ private IgniteCache<KeyType, ValueType> cache;
+
+ /**
+ * Reinit internal cache using passed ignite instance and fill it with data if required.
+ *
+ * @param ignite Node to get or create cache from.
+ * @param fillData Whether the cache should be filled with new data or not.
+ */
+ public void initCache(IgniteEx ignite, boolean fillData) {
+ cache = ignite.getOrCreateCache(
+ createCacheConfiguration()
+ .setAtomicityMode(atomicityMode())
+ .setCacheMode(cacheMode())
+ );
+
+ if (fillData) {
+ try (IgniteDataStreamer<KeyType, ValueType> dataStreamer = ignite.dataStreamer(cache.getName())) {
+ dataStreamer.allowOverwrite(true);
+
+ for (int i = 0; i < entriesCount(); i++)
+ dataStreamer.addData(createKey(i), createValue(i));
+ }
+ }
+ }
+
+ /**
+ * @return Cache configuration.
+ */
+ protected CacheConfiguration<KeyType, ValueType> createCacheConfiguration() {
+ return new CacheConfiguration<KeyType, ValueType>(DEFAULT_CACHE_NAME)
+ .setBackups(backupsCount())
+ .setAffinity(
+ new RendezvousAffinityFunction()
+ .setPartitions(32)
+ );
+ }
+
+ /**
+ * @return Current cache.
+ */
+ protected final IgniteCache<KeyType, ValueType> cache() {
+ return cache;
+ }
+
+ /**
+ * @return Count of cache entries to create in {@link CacheReadBackgroundOperation#initCache(IgniteEx, boolean)}
+ * method.
+ */
+ protected int entriesCount() {
+ return DFLT_CACHE_ENTRIES_CNT;
+ }
+
+ /**
+ * @param idx Unique number.
+ * @return Key to be used for inserting into cache.
+ * @see CacheReadBackgroundOperation#createValue(int)
+ */
+ protected abstract KeyType createKey(int idx);
+
+ /**
+ * @param idx Unique number.
+ * @return Value to be used for inserting into cache.
+ * @see CacheReadBackgroundOperation#createKey(int)
+ */
+ protected abstract ValueType createValue(int idx);
+ }
+
+ /**
+ * {@link CacheReadBackgroundOperation} implementation for (int -> int) cache. Keys and values are equal by default.
+ */
+ protected abstract class IntCacheReadBackgroundOperation
+ extends CacheReadBackgroundOperation<Integer, Integer> {
+ /** {@inheritDoc} */
+ @Override protected Integer createKey(int idx) {
+ return idx;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Integer createValue(int idx) {
+ return idx;
+ }
+ }
+
+ /**
+ * @return {@link Params} annotation object from the current test method.
+ */
+ protected Params currentTestParams() {
+ Params params = currentTestAnnotation(Params.class);
+
+ assertNotNull("Test " + getName() + " is not annotated with @Param annotation.", params);
+
+ return params;
+ }
+
+ /**
+ * Assert that two numbers are close to each other.
+ */
+ private static void assertAlmostEqual(long exp, long actual) {
+ assertTrue(String.format("Numbers differ too much [exp=%d, actual=%d]", exp, actual), almostEqual(exp, actual));
+ }
+
+ /**
+ * Assert that two numbers are not close to each other.
+ */
+ private static void assertNotAlmostEqual(long exp, long actual) {
+ assertFalse(String.format("Numbers are almost equal [exp=%d, actual=%d]", exp, actual), almostEqual(exp, actual));
+ }
+
+ /**
+ * Check that two numbers are close to each other.
+ */
+ private static boolean almostEqual(long exp, long actual) {
+ double rel = (double)(actual - exp) / exp;
+
+ return Math.abs(rel) < 0.05;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3fae41b1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnScanTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnScanTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnScanTest.java
new file mode 100644
index 0000000..2912d05
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnScanTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.Objects;
+import java.util.Random;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.jetbrains.annotations.NotNull;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ *
+ */
+public class CacheBlockOnScanTest extends CacheBlockOnReadAbstractTest {
+
+ /** {@inheritDoc} */
+ @Override @NotNull protected CacheReadBackgroundOperation<?, ?> getReadOperation() {
+ return new IntCacheReadBackgroundOperation() {
+ /** Random. */
+ private Random random = new Random();
+
+ /** {@inheritDoc} */
+ @Override public void doRead() {
+ int idx = random.nextInt(entriesCount());
+
+ cache().query(new ScanQuery<>((k, v) -> Objects.equals(k, idx))).getAll();
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 9, atomicityMode = ATOMIC, cacheMode = PARTITIONED, allowException = true)
+ @Override public void testStopBaselineAtomicPartitioned() throws Exception {
+ super.testStopBaselineAtomicPartitioned();
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 9, atomicityMode = ATOMIC, cacheMode = REPLICATED, allowException = true)
+ @Override public void testStopBaselineAtomicReplicated() throws Exception {
+ super.testStopBaselineAtomicReplicated();
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 9, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED, allowException = true)
+ @Override public void testStopBaselineTransactionalPartitioned() throws Exception {
+ super.testStopBaselineTransactionalPartitioned();
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 9, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED, allowException = true)
+ @Override public void testStopBaselineTransactionalReplicated() throws Exception {
+ super.testStopBaselineTransactionalReplicated();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3fae41b1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnSingleGetTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnSingleGetTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnSingleGetTest.java
new file mode 100644
index 0000000..fc181be
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnSingleGetTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.Random;
+import org.jetbrains.annotations.NotNull;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ *
+ */
+public class CacheBlockOnSingleGetTest extends CacheBlockOnReadAbstractTest {
+
+ /** {@inheritDoc} */
+ @Override @NotNull protected CacheReadBackgroundOperation<?, ?> getReadOperation() {
+ return new IntCacheReadBackgroundOperation() {
+ /** Random. */
+ private Random random = new Random();
+
+ /** {@inheritDoc} */
+ @Override public void doRead() {
+ for (int i = 0; i < 300; i++)
+ cache().get(random.nextInt(entriesCount()));
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ @Override public void testStopBaselineAtomicPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9915");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = REPLICATED)
+ @Override public void testStopBaselineAtomicReplicated() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9915");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ @Override public void testStopBaselineTransactionalPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9915");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
+ @Override public void testStopBaselineTransactionalReplicated() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9915");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ @Override public void testCreateCacheAtomicPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = REPLICATED)
+ @Override public void testCreateCacheAtomicReplicated() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ @Override public void testCreateCacheTransactionalPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
+ @Override public void testCreateCacheTransactionalReplicated() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ @Override public void testDestroyCacheAtomicPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = REPLICATED)
+ @Override public void testDestroyCacheAtomicReplicated() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ @Override public void testDestroyCacheTransactionalPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
+ @Override public void testDestroyCacheTransactionalReplicated() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ @Override public void testStartServerAtomicPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = REPLICATED)
+ @Override public void testStartServerAtomicReplicated() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ @Override public void testStartServerTransactionalPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
+ @Override public void testStartServerTransactionalReplicated() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ @Override public void testStopServerAtomicPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = REPLICATED)
+ @Override public void testStopServerAtomicReplicated() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ @Override public void testStopServerTransactionalPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
+ @Override public void testStopServerTransactionalReplicated() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = PARTITIONED)
+ @Override public void testUpdateBaselineTopologyAtomicPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = REPLICATED)
+ @Override public void testUpdateBaselineTopologyAtomicReplicated() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED)
+ @Override public void testUpdateBaselineTopologyTransactionalPartitioned() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+
+ /** {@inheritDoc} */
+ @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED)
+ @Override public void testUpdateBaselineTopologyTransactionalReplicated() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9883");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3fae41b1/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index f1d6682..057087e 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testframework.junits;
import java.io.ObjectStreamException;
import java.io.Serializable;
+import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
@@ -118,6 +119,7 @@ import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import org.apache.log4j.Priority;
import org.apache.log4j.RollingFileAppender;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
@@ -197,6 +199,9 @@ public abstract class GridAbstractTest extends TestCase {
/** Number of tests. */
private int testCnt;
+ /** Lazily initialized current test method. */
+ private volatile Method currTestMtd;
+
/**
*
*/
@@ -670,6 +675,34 @@ public abstract class GridAbstractTest extends TestCase {
}
/**
+ * @return Current test method.
+ * @throws NoSuchMethodError If method wasn't found for some reason.
+ */
+ @NotNull protected Method currentTestMethod() {
+ if (currTestMtd == null) {
+ try {
+ currTestMtd = getClass().getMethod(getName());
+ }
+ catch (NoSuchMethodException e) {
+ throw new NoSuchMethodError("Current test method is not found: " + getName());
+ }
+ }
+
+ return currTestMtd;
+ }
+
+ /**
+ * Search for the annotation of the given type in current test method.
+ *
+ * @param annotationCls Type of annotation to look for.
+ * @param <A> Annotation type.
+ * @return Instance of annotation if it is present in test method.
+ */
+ @Nullable protected <A extends Annotation> A currentTestAnnotation(Class<A> annotationCls) {
+ return currentTestMethod().getAnnotation(annotationCls);
+ }
+
+ /**
* @return Started grid.
* @throws Exception If anything failed.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/3fae41b1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheBlockExchangeOnReadOperationsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheBlockExchangeOnReadOperationsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheBlockExchangeOnReadOperationsTestSuite.java
new file mode 100755
index 0000000..79ec18b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheBlockExchangeOnReadOperationsTestSuite.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import java.util.Set;
+import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.distributed.CacheBlockOnGetAllTest;
+import org.apache.ignite.internal.processors.cache.distributed.CacheBlockOnScanTest;
+import org.apache.ignite.internal.processors.cache.distributed.CacheBlockOnSingleGetTest;
+
+/**
+ * Test suite.
+ */
+public class IgniteCacheBlockExchangeOnReadOperationsTestSuite extends TestSuite {
+ /**
+ * @return IgniteCache test suite.
+ * @throws Exception Thrown in case of the failure.
+ */
+ public static TestSuite suite() throws Exception {
+ return suite(null);
+ }
+
+ /**
+ * @param ignoredTests Tests to ignore.
+ * @return Test suite.
+ */
+ public static TestSuite suite(Set<Class> ignoredTests) {
+ TestSuite suite = new TestSuite("Do Not Block Read Operations Test Suite");
+
+ suite.addTestSuite(CacheBlockOnSingleGetTest.class);
+ suite.addTestSuite(CacheBlockOnGetAllTest.class);
+ suite.addTestSuite(CacheBlockOnScanTest.class);
+
+ return suite;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3fae41b1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
index 6c48ecc..d0734a8 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
@@ -33,9 +33,9 @@ import org.apache.ignite.internal.processors.cache.WalModeChangeAdvancedSelfTest
import org.apache.ignite.internal.processors.cache.WalModeChangeCoordinatorNotAffinityNodeSelfTest;
import org.apache.ignite.internal.processors.cache.WalModeChangeSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.Cache64kPartitionsTest;
+import org.apache.ignite.internal.processors.cache.distributed.CacheDataLossOnPartitionMoveTest;
import org.apache.ignite.internal.processors.cache.distributed.CachePageWriteLockUnlockTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheRentingStateRepairTest;
-import org.apache.ignite.internal.processors.cache.distributed.CacheDataLossOnPartitionMoveTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheStartWithLoadTest;
import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingPartitionCountersTest;
import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingWithAsyncClearingTest;