You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/08/30 12:24:16 UTC
[10/45] ignite git commit: IGNITE-4191: MVCC and transactional SQL
support. Joint multi-man-years efforts of Semen Boikov, Igor Seliverstov,
Alexander Paschenko, Igor Sapego, Sergey Kalashnikov, Roman Kondakov,
Pavel Kuznetsov, Ivan Pavlukhin, Andrey Mas
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBackupsAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBackupsAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBackupsAbstractTest.java
new file mode 100644
index 0000000..71e004b
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBackupsAbstractTest.java
@@ -0,0 +1,808 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryEnlistResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataRow;
+import org.apache.ignite.internal.util.lang.GridCursor;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.transactions.Transaction;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.DML;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Backups tests.
+ */
+@SuppressWarnings("unchecked")
+public abstract class CacheMvccBackupsAbstractTest extends CacheMvccAbstractTest {
+
+ /** Test timeout. */
+ private final long txLongTimeout = getTestTimeout() / 4;
+
+ /**
+ * Tests backup consistency.
+ *
+ * @throws Exception If fails.
+ */
+ public void testBackupsCoherenceSimple() throws Exception {
+ disableScheduledVacuum = true;
+
+ ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT)
+ .setIndexedTypes(Integer.class, Integer.class);
+
+ final int KEYS_CNT = 5_000;
+ assert KEYS_CNT % 2 == 0;
+
+ startGrids(3);
+
+ Ignite node0 = grid(0);
+ Ignite node1 = grid(1);
+ Ignite node2 = grid(2);
+
+ client = true;
+
+ Ignite client = startGrid();
+
+ awaitPartitionMapExchange();
+
+ IgniteCache clientCache = client.cache(DEFAULT_CACHE_NAME);
+ IgniteCache cache0 = node0.cache(DEFAULT_CACHE_NAME);
+ IgniteCache cache1 = node1.cache(DEFAULT_CACHE_NAME);
+ IgniteCache cache2 = node2.cache(DEFAULT_CACHE_NAME);
+
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ tx.timeout(txLongTimeout);
+
+ for (int i = 0; i < KEYS_CNT / 2; i += 2) {
+ SqlFieldsQuery qry = new SqlFieldsQuery("INSERT INTO Integer (_key, _val) values ("
+ + i + ',' + i * 2 + "), (" + (i + 1) + ',' + (i + 1) * 2 + ')');
+
+ clientCache.query(qry).getAll();
+ }
+
+ tx.commit();
+ }
+
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ tx.timeout(txLongTimeout);
+
+ for (int i = 0; i < 10; i++) {
+ SqlFieldsQuery qry = new SqlFieldsQuery("DELETE from Integer WHERE _key = " + i);
+
+ clientCache.query(qry).getAll();
+ }
+
+ for (int i = 10; i < KEYS_CNT + 1; i++) {
+ SqlFieldsQuery qry = new SqlFieldsQuery("UPDATE Integer SET _val=" + i * 10 + " WHERE _key = " + i);
+
+ clientCache.query(qry).getAll();
+ }
+
+ tx.commit();
+ }
+
+ Map<KeyCacheObject, List<CacheDataRow>> vers0 = allVersions(cache0);
+
+ List res0 = getAll(cache0, "Integer");
+
+ stopGrid(0);
+
+ awaitPartitionMapExchange();
+
+ Map<KeyCacheObject, List<CacheDataRow>> vers1 = allVersions(cache1);
+
+ assertVersionsEquals(vers0, vers1);
+
+ List res1 = getAll(cache1, "Integer");
+
+ assertEqualsCollections(res0, res1);
+
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ tx.timeout(txLongTimeout);
+
+ for (int i = 10; i < 20; i++) {
+ SqlFieldsQuery qry = new SqlFieldsQuery("DELETE from Integer WHERE _key = " + i);
+
+ clientCache.query(qry).getAll();
+ }
+
+ for (int i = 20; i < KEYS_CNT + 1; i++) {
+ SqlFieldsQuery qry = new SqlFieldsQuery("UPDATE Integer SET _val=" + i * 100 + " WHERE _key = " + i);
+
+ clientCache.query(qry).getAll();
+ }
+
+ tx.commit();
+ }
+
+ vers1 = allVersions(cache1);
+
+ res1 = getAll(cache2, "Integer");
+
+ stopGrid(1);
+
+ awaitPartitionMapExchange();
+
+ Map<KeyCacheObject, List<CacheDataRow>> vers2 = allVersions(cache2);
+
+ assertVersionsEquals(vers1, vers2);
+
+ List res2 = getAll(cache2, "Integer");
+
+ assertEqualsCollections(res1, res2);
+ }
+
+ /**
+ * Checks cache backups consistency with large queries.
+ *
+ * @throws Exception If failed.
+ */
+ public void testBackupsCoherenceWithLargeOperations() throws Exception {
+ disableScheduledVacuum = true;
+
+ ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 1, DFLT_PARTITION_COUNT)
+ .setIndexedTypes(Integer.class, Integer.class);
+
+ final int KEYS_CNT = 50_000;
+ assert KEYS_CNT % 2 == 0;
+
+ startGrids(2);
+
+ Ignite node1 = grid(0);
+ Ignite node2 = grid(1);
+
+ client = true;
+
+ Ignite client = startGrid();
+
+ awaitPartitionMapExchange();
+
+ IgniteCache clientCache = client.cache(DEFAULT_CACHE_NAME);
+ IgniteCache cache1 = node1.cache(DEFAULT_CACHE_NAME);
+ IgniteCache cache2 = node2.cache(DEFAULT_CACHE_NAME);
+
+ StringBuilder insert = new StringBuilder("INSERT INTO Integer (_key, _val) values ");
+
+ boolean first = true;
+
+ for (int key = 0; key < KEYS_CNT; key++) {
+ if (!first)
+ insert.append(',');
+ else
+ first = false;
+
+ insert.append('(').append(key).append(',').append(key * 10).append(')');
+ }
+
+ String qryStr = insert.toString();
+
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ tx.timeout(txLongTimeout);
+
+ SqlFieldsQuery qry = new SqlFieldsQuery(qryStr);
+
+ clientCache.query(qry).getAll();
+
+ tx.commit();
+ }
+
+ qryStr = "SELECT * FROM Integer WHERE _key >= " + KEYS_CNT / 2 + " FOR UPDATE";
+
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ tx.timeout(txLongTimeout);
+
+ SqlFieldsQuery qry = new SqlFieldsQuery(qryStr);
+
+ clientCache.query(qry).getAll();
+
+ tx.commit();
+ }
+
+
+ qryStr = "DELETE FROM Integer WHERE _key >= " + KEYS_CNT / 2;
+
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ tx.timeout(txLongTimeout);
+
+ SqlFieldsQuery qry = new SqlFieldsQuery(qryStr);
+
+ clientCache.query(qry).getAll();
+
+ tx.commit();
+ }
+
+ Map<KeyCacheObject, List<CacheDataRow>> cache1Vers = allVersions(cache1);
+
+ List res1 = getAll(cache1, "Integer");
+
+ stopGrid(0);
+
+ awaitPartitionMapExchange();
+
+ Map<KeyCacheObject, List<CacheDataRow>> cache2Vers = allVersions(cache2);
+
+ assertVersionsEquals(cache1Vers, cache2Vers);
+
+ List res2 = getAll(cache2, "Integer");
+
+ assertEqualsCollections(res1, res2);
+ }
+
+ /**
+ * Checks cache backups consistency with in-flight batches overflow.
+ *
+ * @throws Exception If failed.
+ */
+ public void testBackupsCoherenceWithInFlightBatchesOverflow() throws Exception {
+ testSpi = true;
+
+ disableScheduledVacuum = true;
+
+ ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 1, DFLT_PARTITION_COUNT)
+ .setIndexedTypes(Integer.class, Integer.class);
+
+ final int KEYS_CNT = 30_000;
+ assert KEYS_CNT % 2 == 0;
+
+ startGrids(2);
+
+ Ignite node1 = grid(0);
+ Ignite node2 = grid(1);
+
+ client = true;
+
+ Ignite client = startGrid();
+
+ awaitPartitionMapExchange();
+
+ IgniteCache<?,?> clientCache = client.cache(DEFAULT_CACHE_NAME);
+ IgniteCache<?,?> cache1 = node1.cache(DEFAULT_CACHE_NAME);
+ IgniteCache<?,?> cache2 = node2.cache(DEFAULT_CACHE_NAME);
+
+ AtomicInteger keyGen = new AtomicInteger();
+ Affinity affinity = affinity(clientCache);
+
+ ClusterNode cNode1 = ((IgniteEx)node1).localNode();
+ ClusterNode cNode2 = ((IgniteEx)node2).localNode();
+
+ StringBuilder insert = new StringBuilder("INSERT INTO Integer (_key, _val) values ");
+
+ for (int i = 0; i < KEYS_CNT; i++) {
+ if (i > 0)
+ insert.append(',');
+
+ // To make big batches in near results future.
+ Integer key = i < KEYS_CNT / 2 ? keyForNode(affinity, keyGen, cNode1) : keyForNode(affinity, keyGen, cNode2);
+
+ assert key != null;
+
+ insert.append('(').append(key).append(',').append(key * 10).append(')');
+ }
+
+ String qryStr = insert.toString();
+
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ tx.timeout(txLongTimeout);
+
+ SqlFieldsQuery qry = new SqlFieldsQuery(qryStr);
+
+ clientCache.query(qry).getAll();
+
+ tx.commit();
+ }
+
+ // Add a delay to simulate batches overflow.
+ TestRecordingCommunicationSpi spi1 = TestRecordingCommunicationSpi.spi(node1);
+ TestRecordingCommunicationSpi spi2 = TestRecordingCommunicationSpi.spi(node2);
+
+ spi1.closure(new IgniteBiInClosure<ClusterNode, Message>() {
+ @Override public void apply(ClusterNode node, Message msg) {
+ if (msg instanceof GridDhtTxQueryEnlistResponse)
+ doSleep(100);
+ }
+ });
+
+ spi2.closure(new IgniteBiInClosure<ClusterNode, Message>() {
+ @Override public void apply(ClusterNode node, Message msg) {
+ if (msg instanceof GridDhtTxQueryEnlistResponse)
+ doSleep(100);
+ }
+ });
+
+ qryStr = "DELETE FROM Integer WHERE _key >= " + 10;
+
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ tx.timeout(txLongTimeout);
+
+ SqlFieldsQuery qry = new SqlFieldsQuery(qryStr);
+
+ clientCache.query(qry).getAll();
+
+ tx.commit();
+ }
+
+ Map<KeyCacheObject, List<CacheDataRow>> cache1Vers = allVersions(cache1);
+
+ List res1 = getAll(cache1, "Integer");
+
+ stopGrid(0);
+
+ awaitPartitionMapExchange();
+
+ Map<KeyCacheObject, List<CacheDataRow>> cache2Vers = allVersions(cache2);
+
+ assertVersionsEquals(cache1Vers, cache2Vers);
+
+ List res2 = getAll(cache2, "Integer");
+
+ assertEqualsCollections(res1, res2);
+ }
+
+ /**
+ * Tests concurrent updates backups coherence.
+ *
+ * @throws Exception If failed.
+ */
+ public void testBackupsCoherenceWithConcurrentUpdates2ServersNoClients() throws Exception {
+ checkBackupsCoherenceWithConcurrentUpdates(2, 0);
+ }
+
+ /**
+ * Tests concurrent updates backups coherence.
+ *
+ * @throws Exception If failed.
+ */
+ public void testBackupsCoherenceWithConcurrentUpdates4ServersNoClients() throws Exception {
+ checkBackupsCoherenceWithConcurrentUpdates(4, 0);
+ }
+
+ /**
+ * Tests concurrent updates backups coherence.
+ *
+ * @throws Exception If failed.
+ */
+ public void testBackupsCoherenceWithConcurrentUpdates3Servers1Client() throws Exception {
+ checkBackupsCoherenceWithConcurrentUpdates(3, 1);
+ }
+
+ /**
+ * Tests concurrent updates backups coherence.
+ *
+ * @throws Exception If failed.
+ */
+ public void testBackupsCoherenceWithConcurrentUpdates5Servers2Clients() throws Exception {
+ checkBackupsCoherenceWithConcurrentUpdates(5, 2);
+ }
+
+ /**
+ * Tests concurrent updates backups coherence.
+ *
+ * @throws Exception If failed.
+ */
+ private void checkBackupsCoherenceWithConcurrentUpdates(int srvs, int clients) throws Exception {
+ assert srvs > 1;
+
+ disableScheduledVacuum = true;
+
+ accountsTxReadAll(srvs, clients, srvs - 1, DFLT_PARTITION_COUNT,
+ new InitIndexing(Integer.class, MvccTestAccount.class), true, SQL, DML, 5_000, null);
+
+ for (int i = 0; i < srvs - 1; i++) {
+ Ignite node1 = grid(i);
+
+ IgniteCache cache1 = node1.cache(DEFAULT_CACHE_NAME);
+
+ Map<KeyCacheObject, List<CacheDataRow>> vers1 = allVersions(cache1);
+
+ List res1 = getAll(cache1, "MvccTestAccount");
+
+ stopGrid(i);
+
+ awaitPartitionMapExchange();
+
+ Ignite node2 = grid(i + 1);
+
+ IgniteCache cache2 = node2.cache(DEFAULT_CACHE_NAME);
+
+ Map<KeyCacheObject, List<CacheDataRow>> vers2 = allVersions(cache2);
+
+ assertVersionsEquals(vers1, vers2);
+
+ List res2 = getAll(cache2, "MvccTestAccount");
+
+ assertEqualsCollections(res1, res2);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNoForceKeyRequestDelayedRebalanceNoVacuum() throws Exception {
+ disableScheduledVacuum = true;
+
+ doTestRebalanceNodeAdd(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNoForceKeyRequestDelayedRebalance() throws Exception {
+ doTestRebalanceNodeAdd(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNoForceKeyRequestNoVacuum() throws Exception {
+ disableScheduledVacuum = true;
+
+ doTestRebalanceNodeAdd(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNoForceKeyRequest() throws Exception {
+ doTestRebalanceNodeAdd(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void doTestRebalanceNodeAdd(boolean delayRebalance) throws Exception {
+ testSpi = true;
+
+ final Ignite node1 = startGrid(0);
+
+ final IgniteCache<Object, Object> cache = node1.createCache(
+ cacheConfiguration(cacheMode(), FULL_SYNC, 1, 16)
+ .setIndexedTypes(Integer.class, Integer.class));
+
+ try (Transaction tx = node1.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+
+ SqlFieldsQuery qry = new SqlFieldsQuery("INSERT INTO Integer (_key, _val) values " +
+ "(1,1),(2,2),(3,3),(4,4),(5,5)");
+
+ cache.query(qry).getAll();
+
+ tx.commit();
+ }
+
+ TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(node1);
+
+ // Check for a force key request.
+ spi.closure(new IgniteBiInClosure<ClusterNode, Message>() {
+ @Override public void apply(ClusterNode node, Message msg) {
+ if (delayRebalance && msg instanceof GridDhtPartitionSupplyMessage)
+ doSleep(500);
+
+ if (msg instanceof GridDhtForceKeysResponse)
+ fail("Force key request");
+ }
+ });
+
+ final Ignite node2 = startGrid(1);
+
+ TestRecordingCommunicationSpi.spi(node2).closure(
+ new IgniteBiInClosure<ClusterNode, Message>() {
+ @Override public void apply(ClusterNode node, Message msg) {
+ if (msg instanceof GridDhtForceKeysRequest)
+ fail("Force key request");
+ }
+ }
+ );
+
+ IgniteCache<Object, Object> cache2 = node2.cache(DEFAULT_CACHE_NAME);
+
+ try (Transaction tx = node2.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ SqlFieldsQuery qry = new SqlFieldsQuery("DELETE FROM Integer WHERE _key IN " +
+ "(1,2,3,4,5)");
+
+ cache2.query(qry).getAll();
+
+ tx.commit();
+ }
+
+ awaitPartitionMapExchange();
+
+ doSleep(2000);
+
+ stopGrid(1);
+
+ try (Transaction tx = node1.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+
+ SqlFieldsQuery qry = new SqlFieldsQuery("INSERT INTO Integer (_key, _val) values " +
+ "(1,1),(2,2),(3,3),(4,4),(5,5)");
+
+ cache.query(qry).getAll();
+
+ tx.commit();
+ }
+
+ doSleep(1000);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRebalanceNodeLeaveClient() throws Exception {
+ doTestRebalanceNodeLeave(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRebalanceNodeLeaveServer() throws Exception {
+ doTestRebalanceNodeLeave(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void doTestRebalanceNodeLeave(boolean startClient) throws Exception {
+ testSpi = true;
+ disableScheduledVacuum = true;
+
+ startGridsMultiThreaded(4);
+
+ client = true;
+
+ final Ignite node = startClient ? startGrid(4) : grid(0);
+
+ final IgniteCache<Object, Object> cache = node.createCache(
+ cacheConfiguration(cacheMode(), FULL_SYNC, 2, 16)
+ .setIndexedTypes(Integer.class, Integer.class));
+
+ List<Integer> keys = new ArrayList<>();
+
+ for (int i = 0; i < 4; i++)
+ keys.addAll(primaryKeys(grid(i).cache(DEFAULT_CACHE_NAME), 2));
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ StringBuilder sb = new StringBuilder("INSERT INTO Integer (_key, _val) values ");
+
+ for (int i = 0; i < keys.size(); i++) {
+ if (i > 0)
+ sb.append(", ");
+
+ sb.append("(" + keys.get(i) + ", " + keys.get(i) + ")");
+ }
+
+ SqlFieldsQuery qry = new SqlFieldsQuery(sb.toString());
+
+ cache.query(qry).getAll();
+
+ tx.commit();
+ }
+
+ stopGrid(3);
+
+ awaitPartitionMapExchange();
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ SqlFieldsQuery qry = new SqlFieldsQuery("UPDATE Integer SET _val = 10*_key");
+
+ cache.query(qry).getAll();
+
+ tx.commit();
+ }
+
+ awaitPartitionMapExchange();
+
+ for (Integer key : keys) {
+ List<CacheDataRow> vers = null;
+
+ for (int i = 0; i < 3; i++) {
+ ClusterNode n = grid(i).cluster().localNode();
+
+ if (node.affinity(DEFAULT_CACHE_NAME).isPrimaryOrBackup(n, key)) {
+
+ List<CacheDataRow> vers0 = allKeyVersions(grid(i).cache(DEFAULT_CACHE_NAME), key);
+
+ if (vers != null)
+ assertKeyVersionsEquals(vers, vers0);
+
+ vers = vers0;
+ }
+ }
+ }
+ }
+
+ /**
+ * Retrieves all versions of all keys from cache.
+ *
+ * @param cache Cache.
+ * @return {@link Map} of keys to its versions.
+ * @throws IgniteCheckedException If failed.
+ */
+ private Map<KeyCacheObject, List<CacheDataRow>> allVersions(IgniteCache cache) throws IgniteCheckedException {
+ IgniteCacheProxy cache0 = (IgniteCacheProxy)cache;
+ GridCacheContext cctx = cache0.context();
+
+ assert cctx.mvccEnabled();
+
+ Map<KeyCacheObject, List<CacheDataRow>> vers = new HashMap<>();
+
+ for (Object e : cache) {
+ IgniteBiTuple entry = (IgniteBiTuple)e;
+
+ KeyCacheObject key = cctx.toCacheKeyObject(entry.getKey());
+
+ GridCursor<CacheDataRow> cur = cctx.offheap().mvccAllVersionsCursor(cctx, key, null);
+
+ List<CacheDataRow> rows = new ArrayList<>();
+
+ while (cur.next()) {
+ CacheDataRow row = cur.get();
+
+ rows.add(row);
+ }
+
+ vers.put(key, rows);
+ }
+
+ return vers;
+ }
+
+ /**
+ * @param cache Cache.
+ * @param key Key.
+ * @return Collection of versioned rows.
+ * @throws IgniteCheckedException if failed.
+ */
+ private List<CacheDataRow> allKeyVersions(IgniteCache cache, Object key) throws IgniteCheckedException {
+ IgniteCacheProxy cache0 = (IgniteCacheProxy)cache;
+ GridCacheContext cctx = cache0.context();
+
+ KeyCacheObject key0 = cctx.toCacheKeyObject(key);
+
+ GridCursor<CacheDataRow> cur = cctx.offheap().mvccAllVersionsCursor(cctx, key0, null);
+
+ List<CacheDataRow> rows = new ArrayList<>();
+
+ while (cur.next()) {
+ CacheDataRow row = cur.get();
+
+ rows.add(row);
+ }
+
+ return rows;
+ }
+
+ /**
+ * Checks stored versions equality.
+ *
+ * @param left Keys versions to compare.
+ * @param right Keys versions to compare.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void assertVersionsEquals(Map<KeyCacheObject, List<CacheDataRow>> left,
+ Map<KeyCacheObject, List<CacheDataRow>> right) throws IgniteCheckedException {
+ assertNotNull(left);
+ assertNotNull(right);
+
+ assertTrue(!left.isEmpty());
+ assertTrue(!right.isEmpty());
+
+ assertEqualsCollections(left.keySet(), right.keySet());
+
+ for (KeyCacheObject key : right.keySet()) {
+ List<CacheDataRow> leftRows = left.get(key);
+ List<CacheDataRow> rightRows = right.get(key);
+
+ assertKeyVersionsEquals(leftRows, rightRows);
+ }
+ }
+
+ /**
+ *
+ * @param leftRows Left rows.
+ * @param rightRows Right rows.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void assertKeyVersionsEquals(List<CacheDataRow> leftRows, List<CacheDataRow> rightRows)
+ throws IgniteCheckedException {
+
+ assertNotNull(leftRows);
+ assertNotNull(rightRows);
+
+ assertEquals("leftRows=" + leftRows + ", rightRows=" + rightRows, leftRows.size(), rightRows.size());
+
+ for (int i = 0; i < leftRows.size(); i++) {
+ CacheDataRow leftRow = leftRows.get(i);
+ CacheDataRow rightRow = rightRows.get(i);
+
+ assertNotNull(leftRow);
+ assertNotNull(rightRow);
+
+ assertTrue(leftRow instanceof MvccDataRow);
+ assertTrue(rightRow instanceof MvccDataRow);
+
+ leftRow.key().valueBytes(null);
+
+ assertEquals(leftRow.expireTime(), rightRow.expireTime());
+ assertEquals(leftRow.partition(), rightRow.partition());
+ assertArrayEquals(leftRow.value().valueBytes(null), rightRow.value().valueBytes(null));
+ assertEquals(leftRow.version(), rightRow.version());
+ assertEquals(leftRow.cacheId(), rightRow.cacheId());
+ assertEquals(leftRow.hash(), rightRow.hash());
+ assertEquals(leftRow.key(), rightRow.key());
+ assertTrue(MvccUtils.compare(leftRow, rightRow.mvccVersion()) == 0);
+ assertTrue(MvccUtils.compareNewVersion(leftRow, rightRow.newMvccVersion()) == 0);
+ assertEquals(leftRow.newMvccCoordinatorVersion(), rightRow.newMvccCoordinatorVersion());
+ assertEquals(leftRow.newMvccCounter(), rightRow.newMvccCounter());
+ assertEquals(leftRow.newMvccOperationCounter(), rightRow.newMvccOperationCounter());
+ }
+ }
+
+ /**
+ * Retrieves all table rows from local node.
+ * @param cache Cache.
+ * @param tblName Table name.
+ * @return All table rows.
+ */
+ private List getAll(IgniteCache cache, String tblName) {
+ List res = cache.query(new SqlFieldsQuery("SELECT * FROM " + tblName)).getAll();
+
+ Collections.sort(res, new Comparator<Object>() {
+ @Override public int compare(Object o1, Object o2) {
+ List<?> l1 = (List<?>)o1;
+ List<?> l2 = (List<?>)o2;
+
+ int res = ((Comparable)l1.get(0)).compareTo((Comparable)l2.get(0));
+
+ if (res == 0 && l1.size() > 1)
+ return ((Comparable)l1.get(1)).compareTo((Comparable)l2.get(1));
+ else
+ return res;
+
+ }
+ });
+
+ return res;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBulkLoadTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBulkLoadTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBulkLoadTest.java
new file mode 100644
index 0000000..d620e84
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBulkLoadTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.io.File;
+import java.io.Serializable;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+
+/**
+ *
+ */
+public class CacheMvccBulkLoadTest extends CacheMvccAbstractTest {
+ /** */
+ private IgniteCache<Object, Object> sqlNexus;
+
+ /** */
+ private Statement stmt;
+
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ Ignite ignite = startGrid(0);
+ sqlNexus = ignite.getOrCreateCache(new CacheConfiguration<>("sqlNexus").setSqlSchema("PUBLIC"));
+ sqlNexus.query(q("" +
+ "create table person(" +
+ " id int not null primary key," +
+ " name varchar not null" +
+ ") with \"atomicity=transactional\""
+ ));
+ stmt = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1").createStatement();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCopyStoresData() throws Exception {
+ String csvFilePath = new File(getClass().getResource("mvcc_person.csv").toURI()).getAbsolutePath();
+ stmt.executeUpdate("copy from '" + csvFilePath + "' into person (id, name) format csv");
+
+ List<List<?>> rows = sqlNexus.query(q("select * from person")).getAll();
+
+ List<List<? extends Serializable>> exp = asList(
+ asList(1, "John"),
+ asList(2, "Jack")
+ );
+ assertEquals(exp, rows);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCopyDoesNotOverwrite() throws Exception {
+ sqlNexus.query(q("insert into person values(1, 'Old')"));
+ String csvFilePath = new File(getClass().getResource("mvcc_person.csv").toURI()).getAbsolutePath();
+ stmt.executeUpdate("copy from '" + csvFilePath + "' into person (id, name) format csv");
+
+ List<List<?>> rows = sqlNexus.query(q("select * from person")).getAll();
+
+ List<List<? extends Serializable>> exp = asList(
+ asList(1, "Old"),
+ asList(2, "Jack")
+ );
+ assertEquals(exp, rows);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCopyLeavesPartialResultsInCaseOfFailure() throws Exception {
+ String csvFilePath = new File(getClass().getResource("mvcc_person_broken.csv").toURI()).getAbsolutePath();
+ try {
+ stmt.executeUpdate("copy from '" + csvFilePath + "' into person (id, name) format csv");
+ fail();
+ }
+ catch (SQLException ignored) {
+ // assert exception is thrown
+ }
+
+ List<List<?>> rows = sqlNexus.query(q("select * from person")).getAll();
+
+ List<List<? extends Serializable>> exp = singletonList(
+ asList(1, "John")
+ );
+ assertEquals(exp, rows);
+ }
+
+ /** */
+ private static SqlFieldsQuery q(String sql) {
+ return new SqlFieldsQuery(sql);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccDmlSimpleTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccDmlSimpleTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccDmlSimpleTest.java
new file mode 100644
index 0000000..bb5e753
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccDmlSimpleTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import javax.cache.CacheException;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+
+import static java.util.Arrays.asList;
+
+/**
+ *
+ */
+public class CacheMvccDmlSimpleTest extends CacheMvccAbstractTest {
+ /** */
+ private IgniteCache<?, ?> cache;
+
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ cache = startGrid(0).getOrCreateCache(
+ new CacheConfiguration<>("test")
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+ .setSqlSchema("PUBLIC")
+ .setIndexedTypes(Integer.class, Integer.class)
+ );
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testInsert() throws Exception {
+ int cnt = update("insert into Integer(_key, _val) values(1, 1),(2, 2)");
+
+ assertEquals(2, cnt);
+
+ assertEquals(asSet(asList(1, 1), asList(2, 2)), query("select * from Integer"));
+
+ try {
+ update("insert into Integer(_key, _val) values(3, 3),(1, 1)");
+ } catch (CacheException e) {
+ assertTrue(e.getCause() instanceof IgniteSQLException);
+ assertEquals(IgniteQueryErrorCode.DUPLICATE_KEY, ((IgniteSQLException)e.getCause()).statusCode());
+ }
+
+ assertEquals(asSet(asList(1, 1), asList(2, 2)), query("select * from Integer"));
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testMerge() throws Exception {
+ {
+ int cnt = update("merge into Integer(_key, _val) values(1, 1),(2, 2)");
+
+ assertEquals(2, cnt);
+ assertEquals(asSet(asList(1, 1), asList(2, 2)), query("select * from Integer"));
+ }
+
+ {
+ int cnt = update("merge into Integer(_key, _val) values(3, 3),(1, 1)");
+
+ assertEquals(2, cnt);
+ assertEquals(asSet(asList(1, 1), asList(2, 2), asList(3, 3)), query("select * from Integer"));
+ }
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testUpdate() throws Exception {
+ {
+ int cnt = update("update Integer set _val = 42 where _key = 42");
+
+ assertEquals(0, cnt);
+ assertTrue(query("select * from Integer").isEmpty());
+ }
+
+ update("insert into Integer(_key, _val) values(1, 1),(2, 2)");
+
+ {
+ int cnt = update("update Integer set _val = 42 where _key = 42");
+
+ assertEquals(0, cnt);
+ assertEquals(asSet(asList(1, 1), asList(2, 2)), query("select * from Integer"));
+ }
+
+ {
+ int cnt = update("update Integer set _val = 42 where _key >= 42");
+
+ assertEquals(0, cnt);
+ assertEquals(asSet(asList(1, 1), asList(2, 2)), query("select * from Integer"));
+ }
+
+ {
+ int cnt = update("update Integer set _val = 11 where _key = 1");
+
+ assertEquals(1, cnt);
+ assertEquals(asSet(asList(1, 11), asList(2, 2)), query("select * from Integer"));
+ }
+
+ {
+ int cnt = update("update Integer set _val = 12 where _key <= 2");
+
+ assertEquals(asSet(asList(1, 12), asList(2, 12)), query("select * from Integer"));
+ assertEquals(2, cnt);
+ }
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testDelete() throws Exception {
+ {
+ int cnt = update("delete from Integer where _key = 42");
+
+ assertEquals(0, cnt);
+ }
+
+ update("insert into Integer(_key, _val) values(1, 1),(2, 2)");
+
+ {
+ int cnt = update("delete from Integer where _key = 42");
+
+ assertEquals(0, cnt);
+ assertEquals(asSet(asList(1, 1), asList(2, 2)), query("select * from Integer"));
+ }
+
+ {
+ int cnt = update("delete from Integer where _key >= 42");
+
+ assertEquals(0, cnt);
+ assertEquals(asSet(asList(1, 1), asList(2, 2)), query("select * from Integer"));
+ }
+
+ {
+ int cnt = update("delete from Integer where _key = 1");
+
+ assertEquals(1, cnt);
+ assertEquals(asSet(asList(2, 2)), query("select * from Integer"));
+ }
+
+ {
+ int cnt = update("delete from Integer where _key <= 2");
+
+ assertTrue(query("select * from Integer").isEmpty());
+ assertEquals(1, cnt);
+ }
+ }
+
+ /**
+ * @param q Query.
+ * @return Row set.
+ */
+ private Set<List<?>> query(String q) {
+ return new HashSet<>(cache.query(new SqlFieldsQuery(q)).getAll());
+ }
+
+ /**
+ * @param q Query.
+ * @return Updated rows count.
+ */
+ private int update(String q) {
+ return Integer.parseInt(cache.query(new SqlFieldsQuery(q)).getAll().get(0).get(0).toString());
+ }
+
+ /** */
+ private Set<List<?>> asSet(List<?>... ls) {
+ return new HashSet<>(asList(ls));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccIteratorWithConcurrentJdbcTransactionTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccIteratorWithConcurrentJdbcTransactionTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccIteratorWithConcurrentJdbcTransactionTest.java
new file mode 100644
index 0000000..235d87f
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccIteratorWithConcurrentJdbcTransactionTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+/**
+ *
+ */
+public class CacheMvccIteratorWithConcurrentJdbcTransactionTest extends CacheMvccIteratorWithConcurrentTransactionTest {
+ /** {@inheritDoc} */
+ @Override boolean jdbcTx() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccLocalEntriesWithConcurrentJdbcTransactionTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccLocalEntriesWithConcurrentJdbcTransactionTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccLocalEntriesWithConcurrentJdbcTransactionTest.java
new file mode 100644
index 0000000..97c062f
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccLocalEntriesWithConcurrentJdbcTransactionTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+/**
+ *
+ */
+public class CacheMvccLocalEntriesWithConcurrentJdbcTransactionTest extends
+ CacheMvccLocalEntriesWithConcurrentTransactionTest {
+ /** {@inheritDoc} */
+ @Override boolean jdbcTx() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedBackupsTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedBackupsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedBackupsTest.java
new file mode 100644
index 0000000..71d832c
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedBackupsTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/** */
+public class CacheMvccPartitionedBackupsTest extends CacheMvccBackupsAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return PARTITIONED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSelectForUpdateQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSelectForUpdateQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSelectForUpdateQueryTest.java
new file mode 100644
index 0000000..12209ab
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSelectForUpdateQueryTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/** */
+public class CacheMvccPartitionedSelectForUpdateQueryTest extends CacheMvccSelectForUpdateQueryAbstractTest {
+ /** {@inheritDoc} */
+ public CacheMode cacheMode() {
+ return PARTITIONED;
+ }
+
+ /**
+ *
+ */
+ public void testSelectForUpdateDistributedSegmented() throws Exception {
+ doTestSelectForUpdateDistributed("PersonSeg", false);
+ }
+
+ /**
+ *
+ */
+ public void testSelectForUpdateLocalSegmented() throws Exception {
+ doTestSelectForUpdateLocal("PersonSeg", false);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlCoordinatorFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlCoordinatorFailoverTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlCoordinatorFailoverTest.java
new file mode 100644
index 0000000..1362b4a
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlCoordinatorFailoverTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SCAN;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.DML;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * SQL Mvcc coordinator failover test for partitioned caches.
+ */
+public class CacheMvccPartitionedSqlCoordinatorFailoverTest extends CacheMvccAbstractSqlCoordinatorFailoverTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.PARTITIONED;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAccountsTxSql_ClientServer_Backups2_CoordinatorFails() throws Exception {
+ accountsTxReadAll(4, 2, 2, DFLT_PARTITION_COUNT,
+ new InitIndexing(Integer.class, MvccTestAccount.class), true, SQL, DML, DFLT_TEST_TIME, RestartMode.RESTART_CRD);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAccountsTxSql_Server_Backups1_CoordinatorFails_Persistence() throws Exception {
+ persistence = true;
+
+ accountsTxReadAll(2, 0, 1, 64,
+ new InitIndexing(Integer.class, MvccTestAccount.class), true, SQL, DML, DFLT_TEST_TIME, RestartMode.RESTART_CRD);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAllGetAll_ClientServer_Backups3_RestartCoordinator_ScanDml() throws Exception {
+ putAllGetAll(RestartMode.RESTART_CRD , 5, 2, 3, DFLT_PARTITION_COUNT,
+ new InitIndexing(Integer.class, Integer.class), SCAN, DML);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAllGetAll_ClientServer_Backups1_RestartCoordinator_ScanDml_Persistence() throws Exception {
+ persistence = true;
+
+ putAllGetAll(RestartMode.RESTART_CRD , 2, 1, 2, DFLT_PARTITION_COUNT,
+ new InitIndexing(Integer.class, Integer.class), SCAN, DML);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAllGetAll_ClientServer_Backups2_RestartCoordinator_SqlDml_Persistence() throws Exception {
+ persistence = true;
+
+ putAllGetAll(RestartMode.RESTART_CRD, 4, 2, 2, 64,
+ new InitIndexing(Integer.class, Integer.class), SQL, DML);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAllGetAll_ClientServer_Backups1_RestartCoordinator_SqlDml() throws Exception {
+ putAllGetAll(RestartMode.RESTART_CRD, 2, 1, 1, 64,
+ new InitIndexing(Integer.class, Integer.class), SQL, DML);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testUpdate_N_Objects_ClientServer_Backups2_Sql() throws Exception {
+ updateNObjectsTest(7, 3, 2, 2, DFLT_PARTITION_COUNT, DFLT_TEST_TIME,
+ new InitIndexing(Integer.class, Integer.class), SQL, DML, RestartMode.RESTART_CRD);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testUpdate_N_Objects_ClientServer_Backups1_Sql_Persistence() throws Exception {
+ persistence = true;
+
+ updateNObjectsTest(10, 2, 1, 1, DFLT_PARTITION_COUNT, DFLT_TEST_TIME,
+ new InitIndexing(Integer.class, Integer.class), SQL, DML, RestartMode.RESTART_CRD);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSqlReadInProgressCoordinatorFails() throws Exception {
+ readInProgressCoordinatorFails(false, false, PESSIMISTIC, REPEATABLE_READ, SQL, DML, new InitIndexing(Integer.class, Integer.class));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSqlReadInsideTxInProgressCoordinatorFails() throws Exception {
+ readInProgressCoordinatorFails(false, true, PESSIMISTIC, REPEATABLE_READ, SQL, DML, new InitIndexing(Integer.class, Integer.class));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSqlReadInProgressCoordinatorFails_ReadDelay() throws Exception {
+ readInProgressCoordinatorFails(true, false, PESSIMISTIC, REPEATABLE_READ, SQL, DML, new InitIndexing(Integer.class, Integer.class));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSqlReadInsideTxInProgressCoordinatorFails_ReadDelay() throws Exception {
+ readInProgressCoordinatorFails(true, true, PESSIMISTIC, REPEATABLE_READ, SQL, DML, new InitIndexing(Integer.class, Integer.class));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReadInProgressCoordinatorFailsSimple_FromServer() throws Exception {
+ readInProgressCoordinatorFailsSimple(false, new InitIndexing(Integer.class, Integer.class), SQL, DML);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlQueriesTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlQueriesTest.java
new file mode 100644
index 0000000..e0b4a24
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlQueriesTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/** */
+public class CacheMvccPartitionedSqlQueriesTest extends CacheMvccSqlQueriesAbstractTest {
+ /** {@inheritDoc} */
+ protected CacheMode cacheMode() {
+ return PARTITIONED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlTxQueriesTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlTxQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlTxQueriesTest.java
new file mode 100644
index 0000000..199cfad
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlTxQueriesTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/** */
+public class CacheMvccPartitionedSqlTxQueriesTest extends CacheMvccSqlTxQueriesAbstractTest {
+ /** {@inheritDoc} */
+ protected CacheMode cacheMode() {
+ return PARTITIONED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlTxQueriesWithReducerTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlTxQueriesWithReducerTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlTxQueriesWithReducerTest.java
new file mode 100644
index 0000000..03de543
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlTxQueriesWithReducerTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/** */
+public class CacheMvccPartitionedSqlTxQueriesWithReducerTest extends CacheMvccSqlTxQueriesWithReducerAbstractTest {
+ /** {@inheritDoc} */
+ protected CacheMode cacheMode() {
+ return PARTITIONED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedBackupsTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedBackupsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedBackupsTest.java
new file mode 100644
index 0000000..02de0a3
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedBackupsTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/** */
+public class CacheMvccReplicatedBackupsTest extends CacheMvccBackupsAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return REPLICATED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSelectForUpdateQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSelectForUpdateQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSelectForUpdateQueryTest.java
new file mode 100644
index 0000000..a458319
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSelectForUpdateQueryTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/** */
+public class CacheMvccReplicatedSelectForUpdateQueryTest extends CacheMvccSelectForUpdateQueryAbstractTest {
+ /** {@inheritDoc} */
+ public CacheMode cacheMode() {
+ return REPLICATED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlCoordinatorFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlCoordinatorFailoverTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlCoordinatorFailoverTest.java
new file mode 100644
index 0000000..2f72bce
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlCoordinatorFailoverTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ * SQL Mvcc coordinator failover test for replicated caches.
+ */
+public class CacheMvccReplicatedSqlCoordinatorFailoverTest extends CacheMvccAbstractSqlCoordinatorFailoverTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.REPLICATED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlQueriesTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlQueriesTest.java
new file mode 100644
index 0000000..ba8a5c3
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlQueriesTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/** */
+public class CacheMvccReplicatedSqlQueriesTest extends CacheMvccSqlQueriesAbstractTest {
+ /** {@inheritDoc} */
+ protected CacheMode cacheMode() {
+ return REPLICATED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlTxQueriesTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlTxQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlTxQueriesTest.java
new file mode 100644
index 0000000..bde2c5d
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlTxQueriesTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.cache.Cache;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.transactions.Transaction;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/** */
+public class CacheMvccReplicatedSqlTxQueriesTest extends CacheMvccSqlTxQueriesAbstractTest {
+ /** {@inheritDoc} */
+ protected CacheMode cacheMode() {
+ return REPLICATED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ ccfgs = null;
+ ccfg = null;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicatedJoinPartitionedClient() throws Exception {
+ checkReplicatedJoinPartitioned(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicatedJoinPartitionedServer() throws Exception {
+ checkReplicatedJoinPartitioned(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void checkReplicatedJoinPartitioned(boolean client) throws Exception {
+ ccfgs = new CacheConfiguration[] {
+ cacheConfiguration(REPLICATED, FULL_SYNC, 0, DFLT_PARTITION_COUNT)
+ .setName("int")
+ .setIndexedTypes(Integer.class, Integer.class),
+ cacheConfiguration(PARTITIONED, FULL_SYNC, 2, DFLT_PARTITION_COUNT)
+ .setIndexedTypes(Integer.class,
+ CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue.class),
+ cacheConfiguration(REPLICATED, FULL_SYNC, 0, DFLT_PARTITION_COUNT)
+ .setName("target")
+ .setIndexedTypes(Integer.class, Integer.class)
+ };
+
+ startGridsMultiThreaded(3);
+
+ this.client = true;
+
+ startGrid(3);
+
+ Ignite node = client ? grid(3) : grid(0);
+
+ List<List<?>> r;
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ tx.timeout(TX_TIMEOUT);
+
+ r = runSql(node, "INSERT INTO \"int\".Integer(_key, _val) VALUES (1,1), (2,2), (3,3)");
+
+ assertEquals(3L, r.get(0).get(0));
+
+ tx.commit();
+ }
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ tx.timeout(TX_TIMEOUT);
+
+ r = runSql(node, "INSERT INTO \"default\".MvccTestSqlIndexValue(_key, idxVal1) " +
+ "VALUES (1,10), (2, 20), (3, 30)");
+
+ assertEquals(3L, r.get(0).get(0));
+
+ tx.commit();
+ }
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ tx.timeout(TX_TIMEOUT);
+
+ r = runSql(node, "INSERT INTO \"target\".Integer(_key, _val) " +
+ "SELECT a._key, a.idxVal1*b._val FROM \"default\".MvccTestSqlIndexValue a " +
+ "JOIN \"int\".Integer b ON a._key = b._key");
+
+ assertEquals(3L, r.get(0).get(0));
+
+ tx.commit();
+ }
+
+ for (int n = 0; n < 3; ++n) {
+ node = grid(n);
+
+ r = runSqlLocal(node, "SELECT _key, _val FROM \"target\".Integer ORDER BY _key");
+
+ assertEquals(3L, r.size());
+
+ assertEquals(1, r.get(0).get(0));
+ assertEquals(2, r.get(1).get(0));
+ assertEquals(3, r.get(2).get(0));
+
+ assertEquals(10, r.get(0).get(1));
+ assertEquals(40, r.get(1).get(1));
+ assertEquals(90, r.get(2).get(1));
+ }
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testReplicatedAndPartitionedUpdateSingleTransaction() throws Exception {
+ ccfgs = new CacheConfiguration[] {
+ cacheConfiguration(REPLICATED, FULL_SYNC, 0, DFLT_PARTITION_COUNT)
+ .setName("rep")
+ .setIndexedTypes(Integer.class, Integer.class),
+ cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT)
+ .setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class)
+ .setName("part"),
+ };
+
+ startGridsMultiThreaded(3);
+
+ client = true;
+
+ startGrid(3);
+
+ Random rnd = ThreadLocalRandom.current();
+
+ Ignite node = grid(rnd.nextInt(4));
+
+ List<List<?>> r;
+
+ Cache<Integer, Integer> repCache = node.cache("rep");
+
+ repCache.put(1, 1);
+ repCache.put(2, 2);
+ repCache.put(3, 3);
+
+ Cache<Integer, MvccTestSqlIndexValue> partCache = node.cache("part");
+
+ partCache.put(1, new MvccTestSqlIndexValue(1));
+ partCache.put(2, new MvccTestSqlIndexValue(2));
+ partCache.put(3, new MvccTestSqlIndexValue(3));
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ tx.timeout(TX_TIMEOUT);
+
+ r = runSql(node, "UPDATE \"rep\".Integer SET _val = _key * 10");
+
+ assertEquals(3L, r.get(0).get(0));
+
+ r = runSql(node, "UPDATE \"part\".MvccTestSqlIndexValue SET idxVal1 = _key * 10");
+
+ assertEquals(3L, r.get(0).get(0));
+
+ tx.commit();
+ }
+
+ r = runSql(node, "SELECT COUNT(1) FROM \"rep\".Integer r JOIN \"part\".MvccTestSqlIndexValue p" +
+ " ON r._key = p._key WHERE r._val = p.idxVal1");
+
+ assertEquals(3L, r.get(0).get(0));
+
+ for (int n = 0; n < 3; ++n) {
+ node = grid(n);
+
+ r = runSqlLocal(node, "SELECT _key, _val FROM \"rep\".Integer ORDER BY _key");
+
+ assertEquals(3L, r.size());
+
+ assertEquals(1, r.get(0).get(0));
+ assertEquals(2, r.get(1).get(0));
+ assertEquals(3, r.get(2).get(0));
+
+ assertEquals(10, r.get(0).get(1));
+ assertEquals(20, r.get(1).get(1));
+ assertEquals(30, r.get(2).get(1));
+ }
+ }
+
+ /**
+ * Run query.
+ *
+ * @param node Node.
+ * @param sqlText Query.
+ * @return Results.
+ */
+ private List<List<?>> runSql(Ignite node, String sqlText) {
+ GridQueryProcessor qryProc = ((IgniteEx)node).context().query();
+
+ return qryProc.querySqlFields(new SqlFieldsQuery(sqlText), false).getAll();
+ }
+
+ /**
+ * Run query locally.
+ *
+ * @param node Node.
+ * @param sqlText Query.
+ * @return Results.
+ */
+ private List<List<?>> runSqlLocal(Ignite node, String sqlText) {
+ GridQueryProcessor qryProc = ((IgniteEx)node).context().query();
+
+ return qryProc.querySqlFields(new SqlFieldsQuery(sqlText).setLocal(true), false).getAll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlTxQueriesWithReducerTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlTxQueriesWithReducerTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlTxQueriesWithReducerTest.java
new file mode 100644
index 0000000..173c43f
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlTxQueriesWithReducerTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/** */
+public class CacheMvccReplicatedSqlTxQueriesWithReducerTest extends CacheMvccSqlTxQueriesWithReducerAbstractTest {
+ /** {@inheritDoc} */
+ protected CacheMode cacheMode() {
+ return REPLICATED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccScanQueryWithConcurrentJdbcTransactionTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccScanQueryWithConcurrentJdbcTransactionTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccScanQueryWithConcurrentJdbcTransactionTest.java
new file mode 100644
index 0000000..7272def
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccScanQueryWithConcurrentJdbcTransactionTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+/**
+ *
+ */
+public class CacheMvccScanQueryWithConcurrentJdbcTransactionTest extends
+ CacheMvccScanQueryWithConcurrentTransactionTest {
+ /** {@inheritDoc} */
+ @Override boolean jdbcTx() {
+ return true;
+ }
+}